kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Sat, 23 Dec 2017 17:15:43 GMT
Hi Colin,

Thanks for the explanation. I want to clarify a bit more on my thoughts.

I am fine with having a separate discussion as long as the follow-up
discussion will be incremental on top of this KIP instead of override the
protocol in this KIP.

I completely agree this KIP is useful by itself. That being said, we want
to avoid falling into a "local optimal" solution by just saying because it
solves the problem in this scope. I think we should also think if the
solution aligns with a "global optimal" (systematic optimal) solution as
well. That is why I brought up other considerations. If they turned out to
be orthogonal and should be addressed separately, that's good. But at least
it is worth thinking about the potential connections between those things.

One example of such related consideration is the following two seemingly
unrelated things:

1. I might have missed the discussion, but it seems the concern of the
clients doing frequent pause and resume is still not addressed. Since this
is a pretty common use case for applications that want to have flow
control, or have prioritized consumption, or get consumption fairness, we
probably want to see how to handle this case. One of the solution might be
a long-lived session id spanning the clients' life time.

2. KAFKA-6029. The key problem is that the leader wants to know if a fetch
request is from a shutting down broker or from a restarted broker.

The connection between those two issues is that both of them could be
addressed by having a life-long session id for each client (or fetcher, to
be more accurate). This may indicate that having a life long session id
might be a "global optimal" solution so it should be considered in this
KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either
introduce a broker epoch unnecessarily (which will not be used by the
consumers at all) or override what we do in this KIP.

BTW, to clarify, the main purpose of returning the data at the index
boundary was to get the same benefit of efficient incremental fetch for
both low vol and high vol partitions, which is directly related to the
primary goal in this KIP. The other things (such as avoiding binary search)
are just potential additional gain, and they are also brought up to see if
that could be a "global optimal" solution.

Some other replies below.
>In order for improvements to succeed, I think that it's important to
clearly define the scope and goals.  One good example of this was the
AdminClient KIP.  We deliberately avoiding ?>discussing new administrative
RPCs in that KIP, in order to limit the scope.  This kept the discussion
focused on the user interfaces and configuration, rather than on the
details of possible >new RPCs.  Once the KIP was completed, it was easy for
us to add new RPCs later in separate KIPs.
Hmm, why is AdminClient is related? All the discussion are about how to
make fetch more efficient, right?

>Finally, it's not clear that the approach you are proposing is the right
way to go.  I think we would need to have a lot more discussion about it.
One very big disadvantage is that it couples >what we send back on the wire
tightly to what is on the disk.  It's not clear that we want to do that.
What if we want to change how things are stored in the future?  How does
this work with >clients' own concept of fetch sizes?  And so on, and so
on.  This needs its own discussion thread.
That might be true. However, the index file by definition is for the files
stored on the disk. So if we decide to change the storage layer to
something else, it seems natural to use some other suitable ways to get the
offsets efficiently.

>There are a lot of simpler solutions that might work as well or better.
For example, each partition could keep an in-memory LRU cache of the most
recently used offset to file position >mappings.  Or we could have a thread
periodically touch the latest page or two of memory in the index file for
each partition, to make sure that it didn't fall out of the cache.  In some
offline >discussions, some of these approaches have looked quite
promising.  I've even seen some good performance numbers for prototypes.
In any case, it's a separate problem which needs its >own KIP, I think.
Those are indeed separate discussions. I was not intended to discuss them
in this KIP. Sorry about the confusion.

Thanks and Merry Christmas,

Jiangjie (Becket) Qin


On Sat, Dec 23, 2017 at 1:16 AM, Colin McCabe <cmccabe@apache.org> wrote:

> On Fri, Dec 22, 2017, at 14:31, Becket Qin wrote:
> > >>
> > >> The point I want to make is that avoiding doing binary search on index
> > >> file and avoid reading the log segments during fetch has some
> additional
> > >> benefits. So if the solution works for the current KIP, it might be a
> > >> better choice.
> >
> > >Let's discuss this in a follow-on KIP.
> >
> > If the discussion will potentially change the protocol in the current
> > proposal. Would it be better to discuss it now instead of in a follow-up
> > KIP so we don't have some protocol that immediately requires a change.
>
> Hi Becket,
>
> I think that the problem that you are discussing is different than the
> problem this KIP is designed to address.  This KIP is targeted at
> eliminating the wastefulness of re-transmitting information about
> partitions that haven't changed in every FetchRequest and FetchResponse.
> The problem you are discussing is dealing with situations where the index
> file or the data file is not in the page cache, and therefore we take a
> page fault when doing an index lookup.
>
> This KIP is useful and valuable on its own.  For example, if you have
> brokers in a public cloud in different availability zones, you may wish to
> minimize the network traffic between them.  Therefore, you don't want every
> FetchRequest between brokers to be a full FetchRequest.  In that case, this
> KIP is very valuable.
>
> In order for improvements to succeed, I think that it's important to
> clearly define the scope and goals.  One good example of this was the
> AdminClient KIP.  We deliberately avoiding discussing new administrative
> RPCs in that KIP, in order to limit the scope.  This kept the discussion
> focused on the user interfaces and configuration, rather than on the
> details of possible new RPCs.  Once the KIP was completed, it was easy for
> us to add new RPCs later in separate KIPs.
>
> While it's clear that there is probably even more we could do to optimize
> fetch requests, making them incremental seems like a good first cut.  I
> deliberately avoided changing the replication protocol in this KIP, because
> I think that it's a big enough change as-is.  If we want to change the
> replication protocol in the future, there is nothing preventing us... and
> this change will be a useful starting point.
>
> Finally, it's not clear that the approach you are proposing is the right
> way to go.  I think we would need to have a lot more discussion about it.
> One very big disadvantage is that it couples what we send back on the wire
> tightly to what is on the disk.  It's not clear that we want to do that.
> What if we want to change how things are stored in the future?  How does
> this work with clients' own concept of fetch sizes?  And so on, and so on.
> This needs its own discussion thread.
>
> There are a lot of simpler solutions that might work as well or better.
> For example, each partition could keep an in-memory LRU cache of the most
> recently used offset to file position mappings.  Or we could have a thread
> periodically touch the latest page or two of memory in the index file for
> each partition, to make sure that it didn't fall out of the cache.  In some
> offline discussions, some of these approaches have looked quite promising.
> I've even seen some good performance numbers for prototypes.  In any case,
> it's a separate problem which needs its own KIP, I think.
>
> best,
> Colin
>
> >
> >
> > On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe <colin@cmccabe.xyz> wrote:
> >
> > > On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote:
> > > > Sorry for coming back at this so late.
> > > >
> > > >
> > > >
> > > > On 11.12.2017 07:12, Colin McCabe wrote:
> > > > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> > > > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> > > > >>> Hi,
> > > > >>>
> > > > >>> sorry for the late reply, busy times :-/
> > > > >>>
> > > > >>> I would ask you one thing maybe. Since the timeout
> > > > >>> argument seems to be settled I have no further argument
> > > > >>> form your side except the "i don't want to".
> > > > >>>
> > > > >>> Can you see that connection.max.idle.max is the exact time
> > > > >>> that expresses "We expect the client to be away for this long,
> > > > >>> and come back and continue"?
> > > > >> Hi Jan,
> > > > >>
> > > > >> Sure, connection.max.idle.max is the exact time that we want to
> keep
> > > > >> around a TCP session.  TCP sessions are relatively cheap, so we
> can
> > > > >> afford to keep them around for 10 minutes by default.  Incremental
> > > fetch
> > > > >> state is less cheap, so we want to set a shorter timeout for it.
> We
> > > > >> also want new TCP sessions to be able to reuse an existing
> incremental
> > > > >> fetch session rather than creating a new one and waiting for the
> old
> > > one
> > > > >> to time out.
> > > > >>
> > > > >>> also clarified some stuff inline
> > > > >>>
> > > > >>> Best Jan
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On 05.12.2017 23:14, Colin McCabe wrote:
> > > > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > >>>>> Hi Colin
> > > > >>>>>
> > > > >>>>> Addressing the topic of how to manage slots from the other
> thread.
> > > > >>>>> With tcp connections all this comes for free essentially.
> > > > >>>> Hi Jan,
> > > > >>>>
> > > > >>>> I don't think that it's accurate to say that cache management
> > > "comes for
> > > > >>>> free" by coupling the incremental fetch session with the TCP
> > > session.
> > > > >>>> When a new TCP session is started by a fetch request, you still
> > > have to
> > > > >>>> decide whether to grant that request an incremental fetch
> session or
> > > > >>>> not.  If your answer is that you always grant the request, I
> would
> > > argue
> > > > >>>> that you do not have cache management.
> > > > >>> First I would say, the client has a big say in this. If the
> client
> > > > >>> is not going to issue incremental he shouldn't ask for a cache
> > > > >>> when the client ask for the cache we still have all options to
> deny.
> > > > >> To put it simply, we have to have some cache management above and
> > > beyond
> > > > >> just giving out an incremental fetch session to anyone who has a
> TCP
> > > > >> session.  Therefore, caching does not become simpler if you
> couple the
> > > > >> fetch session to the TCP session.
> > > > Simply giving out an fetch session for everyone with a connection is
> too
> > > > simple,
> > > > but I think it plays well into the idea of consumers choosing to use
> the
> > > > feature
> > > > therefore only enabling where it brings maximum gains
> > > > (replicas,MirrorMakers)
> > > > >>
> > > > >>>> I guess you could argue that timeouts are cache management, but
> I
> > > don't
> > > > >>>> find that argument persuasive.  Anyone could just create a lot
> of
> > > TCP
> > > > >>>> sessions and use a lot of resources, in that case.  So there is
> > > > >>>> essentially no limit on memory use.  In any case, TCP sessions
> don't
> > > > >>>> help us implement fetch session timeouts.
> > > > >>> We still have all the options denying the request to keep the
> state.
> > > > >>> What you want seems like a max connections / ip safeguard.
> > > > >>> I can currently take down a broker with to many connections
> easily.
> > > > >>>
> > > > >>>
> > > > >>>>> I still would argue we disable it by default and make a flag
> in the
> > > > >>>>> broker to ask the leader to maintain the cache while
> replicating
> > > and also only
> > > > >>>>> have it optional in consumers (default to off) so one can turn
> it
> > > on
> > > > >>>>> where it really hurts.  MirrorMaker and audit consumers
> > > prominently.
> > > > >>>> I agree with Jason's point from earlier in the thread.  Adding
> extra
> > > > >>>> configuration knobs that aren't really necessary can harm
> usability.
> > > > >>>> Certainly asking people to manually turn on a feature "where it
> > > really
> > > > >>>> hurts" seems to fall in that category, when we could easily
> enable
> > > it
> > > > >>>> automatically for them.
> > > > >>> This doesn't make much sense to me.
> > > > >> There are no tradeoffs to think about from the client's point of
> view:
> > > > >> it always wants an incremental fetch session.  So there is no
> benefit
> > > to
> > > > >> making the clients configure an extra setting.  Updating and
> managing
> > > > >> client configurations is also more difficult than managing broker
> > > > >> configurations for most users.
> > > > >>
> > > > >>> You also wanted to implement
> > > > >>> a "turn of in case of bug"-knob. Having the client indicate if
> the
> > > > >>> feauture will be used seems reasonable to me.,
> > > > >> True.  However, if there is a bug, we could also roll back the
> client,
> > > > >> so having this configuration knob is not strictly required.
> > > > >>
> > > > >>>>> Otherwise I left a few remarks in-line, which should help to
> > > understand
> > > > >>>>> my view of the situation better
> > > > >>>>>
> > > > >>>>> Best Jan
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 05.12.2017 08:06, Colin McCabe wrote:
> > > > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > >>>>>>>>> Thanks for the explanation, Colin. A few more questions.
> > > > >>>>>>>>>
> > > > >>>>>>>>>> The session epoch is not complex.  It's just a number
> which
> > > increments
> > > > >>>>>>>>>> on each incremental fetch.  The session epoch is also
> useful
> > > for
> > > > >>>>>>>>>> debugging-- it allows you to match up requests and
> responses
> > > when
> > > > >>>>>>>>>> looking at log files.
> > > > >>>>>>>>> Currently each request in Kafka has a correlation id to
> help
> > > match the
> > > > >>>>>>>>> requests and responses. Is epoch doing something
> differently?
> > > > >>>>>>>> Hi Becket,
> > > > >>>>>>>>
> > > > >>>>>>>> The correlation ID is used within a single TCP session, to
> > > uniquely
> > > > >>>>>>>> associate a request with a response.  The correlation ID is
> not
> > > unique
> > > > >>>>>>>> (and has no meaning) outside the context of that single TCP
> > > session.
> > > > >>>>>>>>
> > > > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP sessions,
> and
> > > generally
> > > > >>>>>>>> tries to hide that information from the upper layers of the
> > > code.  So
> > > > >>>>>>>> when you submit a request to NetworkClient, you don't know
> if
> > > that
> > > > >>>>>>>> request creates a TCP session, or reuses an existing one.
> > > > >>>>>>>>>> Unfortunately, this doesn't work.  Imagine the client
> misses
> > > an
> > > > >>>>>>>>>> increment fetch response about a partition.  And then the
> > > partition is
> > > > >>>>>>>>>> never updated after that.  The client has no way to know
> > > about the
> > > > >>>>>>>>>> partition, since it won't be included in any future
> > > incremental fetch
> > > > >>>>>>>>>> responses.  And there are no offsets to compare, since the
> > > partition is
> > > > >>>>>>>>>> simply omitted from the response.
> > > > >>>>>>>>> I am curious about in which situation would the follower
> miss
> > > a response
> > > > >>>>>>>>> of a partition. If the entire FetchResponse is lost (e.g.
> > > timeout), the
> > > > >>>>>>>>> follower would disconnect and retry. That will result in
> > > sending a full
> > > > >>>>>>>>> FetchRequest.
> > > > >>>>>>>> Basically, you are proposing that we rely on TCP for
> reliable
> > > delivery
> > > > >>>>>>>> in a distributed system.  That isn't a good idea for a
> bunch of
> > > > >>>>>>>> different reasons.  First of all, TCP timeouts tend to be
> very
> > > long.  So
> > > > >>>>>>>> if the TCP session timing out is your error detection
> > > mechanism, you
> > > > >>>>>>>> have to wait minutes for messages to timeout.  Of course, we
> > > add a
> > > > >>>>>>>> timeout on top of that after which we declare the connection
> > > bad and
> > > > >>>>>>>> manually close it.  But just because the session is closed
> on
> > > one end
> > > > >>>>>>>> doesn't mean that the other end knows that it is closed.  So
> > > the leader
> > > > >>>>>>>> may have to wait quite a long time before TCP decides that
> yes,
> > > > >>>>>>>> connection X from the follower is dead and not coming back,
> > > even though
> > > > >>>>>>>> gremlins ate the FIN packet which the follower attempted to
> > > translate.
> > > > >>>>>>>> If the cache state is tied to that TCP session, we have to
> keep
> > > that
> > > > >>>>>>>> cache around for a much longer time than we should.
> > > > >>>>>>> Hi,
> > > > >>>>>>>
> > > > >>>>>>> I see this from a different perspective. The cache expiry
> time
> > > > >>>>>>> has the same semantic as idle connection time in this
> scenario.
> > > > >>>>>>> It is the time range we expect the client to come back an
> reuse
> > > > >>>>>>> its broker side state. I would argue that on close we would
> get
> > > an
> > > > >>>>>>> extra shot at cleaning up the session state early. As
> opposed to
> > > > >>>>>>> always wait for that duration for expiry to happen.
> > > > >>>>>> Hi Jan,
> > > > >>>>>>
> > > > >>>>>> The idea here is that the incremental fetch cache expiry time
> can
> > > be
> > > > >>>>>> much shorter than the TCP session timeout.  In general the TCP
> > > session
> > > > >>>>>> timeout is common to all TCP connections, and very long.  To
> make
> > > these
> > > > >>>>>> numbers a little more concrete, the TCP session timeout is
> often
> > > > >>>>>> configured to be 2 hours on Linux.  (See
> > > > >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or-
> > > decreasing-tcp-sockets-timeouts.html
> > > > >>>>>> )  The timeout I was proposing for incremental fetch sessions
> was
> > > one or
> > > > >>>>>> two minutes at most.
> > > > >>>>> Currently this is taken care of by
> > > > >>>>> connections.max.idle.ms on the broker and defaults to
> something
> > > of few
> > > > >>>>> minutes.
> > > > >>>> It is 10 minutes by default, which is longer than what we want
> the
> > > > >>>> incremental fetch session timeout to be.  There's no reason to
> > > couple
> > > > >>>> these two things.
> > > > >>>>
> > > > >>>>> Also something we could let the client change if we really
> wanted
> > > to.
> > > > >>>>> So there is no need to worry about coupling our implementation
> to
> > > some
> > > > >>>>> timeouts given by the OS, with TCP one always has full control
> > > over the worst
> > > > >>>>> times + one gets the extra shot cleaning up early when the
> close
> > > comes through.
> > > > >>>>> Which is the majority of the cases.
> > > > >>>> In the majority of cases, the TCP session will be
> re-established.
> > > In
> > > > >>>> that case, we have to send a full fetch request rather than an
> > > > >>>> incremental fetch request.
> > > > >>> I actually have a hard time believing this. Do you have any
> numbers
> > > of
> > > > >>> any existing production system? Is it the virtualisation layer
> > > cutting
> > > > >>> all the connections?
> > > > >>> We see this only on application crashes and restarts where the
> app
> > > needs
> > > > >>> todo the full anyways
> > > > >>> as it probably continues with stores offsets.
> > > > >> Yes, TCP connections get dropped.  It happens very often in
> production
> > > > >> clusters, actually.  When I was working on Hadoop, one of the most
> > > > >> common questions I heard from newcomers was "why do I see so many
> > > > >> EOFException messages in the logs"?  The other thing that happens
> a
> > > lot
> > > > >> is DNS outages or slowness.  Public clouds seem to have even more
> > > > >> unstable networks than the on-premise clusters.  I am not sure why
> > > that
> > > > >> is.
> > > > Hadoop has a wiki page on exactly this
> > > > https://wiki.apache.org/hadoop/EOFException
> > > >
> > > > besides user errors they have servers crashing and actually loss of
> > > > connection high on their list.
> > > > In the case of "server goes away" the cache goes with it. So nothing
> to
> > > > argue about the cache beeing reused by
> > > > a new connection.
> > > >
> > > > Can you make an argument at which point the epoch would be updated
> > > > broker side to maximise re-usage of the cache on
> > > > lost connections. In many cases the epoch would go out of sync and we
> > > > would need a full fetch anyways. Am I mistaken here?
> > >
> > > The current proposal is that the server can accept multiple requests
> in a
> > > row with the same sequence number.
> > >
> > > Colin
> > >
> > > >
> > > >
> > > >
> > > >
> > > > >>
> > > > >>>>>>>> Secondly, from a software engineering perspective, it's not
> a
> > > good idea
> > > > >>>>>>>> to try to tightly tie together TCP and our code.  We would
> have
> > > to
> > > > >>>>>>>> rework how we interact with NetworkClient so that we are
> aware
> > > of things
> > > > >>>>>>>> like TCP sessions closing or opening.  We would have to be
> > > careful
> > > > >>>>>>>> preserve the ordering of incoming messages when doing things
> > > like
> > > > >>>>>>>> putting incoming requests on to a queue to be processed by
> > > multiple
> > > > >>>>>>>> threads.  It's just a lot of complexity to add, and there's
> no
> > > upside.
> > > > >>>>>>> I see the point here. And I had a small chat with Dong Lin
> > > already
> > > > >>>>>>> making me aware of this. I tried out the approaches and
> propose
> > > the
> > > > >>>>>>> following:
> > > > >>>>>>>
> > > > >>>>>>> The client start and does a full fetch. It then does
> incremental
> > > fetches.
> > > > >>>>>>> The connection to the broker dies and is re-established by
> > > NetworkClient
> > > > >>>>>>> under the hood.
> > > > >>>>>>> The broker sees an incremental fetch without having state =>
> > > returns
> > > > >>>>>>> error:
> > > > >>>>>>> Client sees the error, does a full fetch and goes back to
> > > incrementally
> > > > >>>>>>> fetching.
> > > > >>>>>>>
> > > > >>>>>>> having this 1 additional error round trip is essentially the
> > > same as
> > > > >>>>>>> when something
> > > > >>>>>>> with the sessions or epoch changed unexpectedly to the client
> > > (say
> > > > >>>>>>> expiry).
> > > > >>>>>>>
> > > > >>>>>>> So its nothing extra added but the conditions are easier to
> > > evaluate.
> > > > >>>>>>> Especially since we do everything with NetworkClient. Other
> > > implementers
> > > > >>>>>>> on the
> > > > >>>>>>> protocol are free to optimizes this and do not do the
> errornours
> > > > >>>>>>> roundtrip on the
> > > > >>>>>>> new connection.
> > > > >>>>>>> Its a great plus that the client can know when the error is
> gonna
> > > > >>>>>>> happen. instead of
> > > > >>>>>>> the server to always have to report back if something changes
> > > > >>>>>>> unexpectedly for the client
> > > > >>>>>> You are assuming that the leader and the follower agree that
> the
> > > TCP
> > > > >>>>>> session drops at the same time.  When there are network
> problems,
> > > this
> > > > >>>>>> may not be true.  The leader may still think the previous TCP
> > > session is
> > > > >>>>>> active.  In that case, we have to keep the incremental fetch
> > > session
> > > > >>>>>> state around until we learn otherwise (which could be up to
> that
> > > 2 hour
> > > > >>>>>> timeout I mentioned).  And if we get a new incoming
> incremental
> > > fetch
> > > > >>>>>> request, we can't assume that it replaces the previous one,
> > > because the
> > > > >>>>>> IDs will be different (the new one starts a new session).
> > > > >>>>> As mentioned, no reason to fear some time-outs out of our
> control
> > > > >>>>>>>> Imagine that I made an argument that client IDs are
> "complex"
> > > and should
> > > > >>>>>>>> be removed from our APIs.  After all, we can just look at
> the
> > > remote IP
> > > > >>>>>>>> address and TCP port of each connection.  Would you think
> that
> > > was a
> > > > >>>>>>>> good idea?  The client ID is useful when looking at logs.
> For
> > > example,
> > > > >>>>>>>> if a rebalance is having problems, you want to know what
> > > clients were
> > > > >>>>>>>> having a problem.  So having the client ID field to guide
> you is
> > > > >>>>>>>> actually much less "complex" in practice than not having an
> ID.
> > > > >>>>>>> I still cant follow why the correlation idea will not help
> here.
> > > > >>>>>>> Correlating logs with it usually works great. Even with
> > > primitive tools
> > > > >>>>>>> like grep
> > > > >>>>>> The correlation ID does help somewhat, but certainly not as
> much
> > > as a
> > > > >>>>>> unique 64-bit ID.  The correlation ID is not unique in the
> > > broker, just
> > > > >>>>>> unique to a single NetworkClient.  Simiarly, the correlation
> ID
> > > is not
> > > > >>>>>> unique on the client side, if there are multiple Consumers,
> etc.
> > > > >>>>> Can always bump entropy in correlation IDs, never had a problem
> > > > >>>>> of finding to many duplicates. Would be a different KIP though.
> > > > >>>>>>>> Similarly, if metadata responses had epoch numbers (simple
> > > incrementing
> > > > >>>>>>>> numbers), we would not have to debug problems like clients
> > > accidentally
> > > > >>>>>>>> getting old metadata from servers that had been partitioned
> off
> > > from the
> > > > >>>>>>>> network for a while.  Clients would know the difference
> between
> > > old and
> > > > >>>>>>>> new metadata.  So putting epochs in to the metadata request
> is
> > > much less
> > > > >>>>>>>> "complex" operationally, even though it's an extra field in
> the
> > > request.
> > > > >>>>>>>>      This has been discussed before on the mailing list.
> > > > >>>>>>>>
> > > > >>>>>>>> So I think the bottom line for me is that having the
> session ID
> > > and
> > > > >>>>>>>> session epoch, while it adds two extra fields, reduces
> > > operational
> > > > >>>>>>>> complexity and increases debuggability.  It avoids tightly
> > > coupling us
> > > > >>>>>>>> to assumptions about reliable ordered delivery which tend
> to be
> > > violated
> > > > >>>>>>>> in practice in multiple layers of the stack.  Finally, it
> > > avoids the
> > > > >>>>>>>> necessity of refactoring NetworkClient.
> > > > >>>>>>> So there is stacks out there that violate TCP guarantees? And
> > > software
> > > > >>>>>>> still works? How can this be? Can you elaborate a little
> where
> > > this
> > > > >>>>>>> can be violated? I am not very familiar with virtualized
> > > environments
> > > > >>>>>>> but they can't really violate TCP contracts.
> > > > >>>>>> TCP's guarantees of reliable, in-order transmission certainly
> can
> > > be
> > > > >>>>>> violated.  For example, I once had to debug a cluster where a
> > > certain
> > > > >>>>>> node had a network card which corrupted its transmissions
> > > occasionally.
> > > > >>>>>> With all the layers of checksums, you would think that this
> was
> > > not
> > > > >>>>>> possible, but it happened.  We occasionally got corrupted data
> > > written
> > > > >>>>>> to disk on the other end because of it.  Even more
> frustrating,
> > > the data
> > > > >>>>>> was not corrupted on disk on the sending node-- it was a bug
> in
> > > the
> > > > >>>>>> network card driver that was injecting the errors.
> > > > >>>>> true, but your broker might aswell read a corrupted 600GB as
> size
> > > from
> > > > >>>>> the network and die with OOM instantly.
> > > > >>>> If you read 600 GB as the size from the network, you will not
> "die
> > > with
> > > > >>>> OOM instantly."  That would be a bug.  Instead, you will notice
> > > that 600
> > > > >>>> GB is greater than max.message.bytes, and close the connection.
> > > > >>> We only check max.message.bytes to late to guard against consumer
> > > > >>> stalling.
> > > > >>> we dont have a notion of max.networkpacket.size before we
> allocate
> > > the
> > > > >>> bytebuffer to read it into.
> > > > >> "network packets" are not the same thing as "kafka RPCs."  One
> Kafka
> > > RPC
> > > > >> could take up mutiple ethernet packets.
> > > > >>
> > > > >> Also, max.message.bytes has nothing to do with "consumer
> stalling" --
> > > > >> you are probably thinking about some of the fetch request
> > > > >> configurations.  max.message.bytes is used by the RPC system to
> figure
> > > > >> out whether to read the full incoming RP
> > > > > Whoops, this is incorrect.  I was thinking about
> > > > > "socket.request.max.bytes" rather than "max.message.bytes."  Sorry
> > > about
> > > > > that.  See Ismael's email as well.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >> best,
> > > > >> Colin
> > > > >>
> > > > >>>>> Optimizing for still having functional
> > > > >>>>> software under this circumstances is not reasonable.
> > > > >>>>> You want to get rid of such a
> > > > >>>>> node ASAP and pray that zookeepers ticks get corrupted often
> enough
> > > > >>>>> that it finally drops out of the cluster.
> > > > >>>>>
> > > > >>>>> There is a good reason that these kinda things
> > > > >>>>> https://issues.apache.org/jira/browse/MESOS-4105
> > > > >>>>> don't end up as kafka Jiras. In the end you can't run any
> software
> > > in
> > > > >>>>> these containers anymore. Application layer checksums are a
> neat
> > > thing to
> > > > >>>>> fail fast but trying to cope with this probably causes more bad
> > > than
> > > > >>>>> good.  So I would argue that we shouldn't try this for the
> fetch
> > > requests.
> > > > >>>> One of the goals of Apache Kafka is to be "a streaming
> platform...
> > > > >>>> [that] lets you store streams of records in a fault-tolerant
> way."
> > > For
> > > > >>>> more information, see https://kafka.apache.org/intro .
> > > Fault-tolerance
> > > > >>>> is explicitly part of the goal of Kafka.  Prayer should be
> > > optional, not
> > > > >>>> required, when running the software.
> > > > >>> Yes, we need to fail ASAP when we read corrupted packages. It
> seemed
> > > > >>> to me like you tried to make the case for pray and try to stay
> alive.
> > > > >>> Fault
> > > > >>> tolerance here means. I am a fishy box i am going to let a good
> box
> > > > >>> handle
> > > > >>> it and be silent until i get fixed up.
> > > > >>>> Crashing because someone sent you a bad packet is not reasonable
> > > > >>>> behavior.  It is a bug.  Similarly, bringing down the whole
> cluster,
> > > > >>>> which could a hundred nodes, because someone had a bad network
> > > adapter
> > > > >>>> is not reasonable behavior.  It is perhaps reasonable for the
> > > cluster to
> > > > >>>> perform worse when hardware is having problems.  But that's a
> > > different
> > > > >>>> discussion.
> > > > >>> See above.
> > > > >>>> best,
> > > > >>>> Colin
> > > > >>>>
> > > > >>>>>> However, my point was not about TCP's guarantees being
> violated.
> > > My
> > > > >>>>>> point is that TCP's guarantees are only one small building
> block
> > > to
> > > > >>>>>> build a robust distributed system.  TCP basically just says
> that
> > > if you
> > > > >>>>>> get any bytes from the stream, you will get the ones that were
> > > sent by
> > > > >>>>>> the sender, in the order they were sent.  TCP does not
> guarantee
> > > that
> > > > >>>>>> the bytes you send will get there.  It does not guarantee
> that if
> > > you
> > > > >>>>>> close the connection, the other end will know about it in a
> timely
> > > > >>>>>> fashion.
> > > > >>>>> These are very powerful grantees and since we use TCP we should
> > > > >>>>> piggy pack everything that is reasonable on to it. IMO there
> is no
> > > > >>>>> need to reimplement correct sequencing again if you get that
> from
> > > > >>>>> your transport layer. It saves you the complexity, it makes
> > > > >>>>> you application behave way more naturally and your api easier
> to
> > > > >>>>> understand.
> > > > >>>>>
> > > > >>>>> There is literally nothing the Kernel wont let you decide
> > > > >>>>> especially not any timings. Only noticeable exception being
> > > TIME_WAIT
> > > > >>>>> of usually 240 seconds but that already has little todo with
> the
> > > broker
> > > > >>>>> itself and
> > > > >>>>> if we are running out of usable ports because of this then
> expiring
> > > > >>>>> fetch requests
> > > > >>>>> wont help much anyways.
> > > > >>>>>
> > > > >>>>> I hope I could strengthen the trust you have in userland TCP
> > > connection
> > > > >>>>> management. It is really powerful and can be exploited for
> maximum
> > > gains
> > > > >>>>> without much risk in my opinion.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>> It does not guarantee that the bytes will be received in a
> > > > >>>>>> certain timeframe, and certainly doesn't guarantee that if you
> > > send a
> > > > >>>>>> byte on connection X and then on connection Y, that the remote
> > > end will
> > > > >>>>>> read a byte on X before reading a byte on Y.
> > > > >>>>> Noone expects this from two independent paths of any kind.
> > > > >>>>>
> > > > >>>>>> best,
> > > > >>>>>> Colin
> > > > >>>>>>
> > > > >>>>>>> Hope this made my view clearer, especially the first part.
> > > > >>>>>>>
> > > > >>>>>>> Best Jan
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>> best,
> > > > >>>>>>>> Colin
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> If there is an error such as NotLeaderForPartition is
> > > > >>>>>>>>> returned for some partitions, the follower can always send
> a
> > > full
> > > > >>>>>>>>> FetchRequest. Is there a scenario that only some of the
> > > partitions in a
> > > > >>>>>>>>> FetchResponse is lost?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Jiangjie (Becket) Qin
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > cmccabe@apache.org>  wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> > > cmccabe@apache.org>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > >>>>>>>>>>>>> Hey Colin,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks much for the update. I have a few questions
> below:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch Session
> Epoch. It
> > > seems that
> > > > >>>>>>>>>>>>> Fetch
> > > > >>>>>>>>>>>>> Session Epoch is only needed to help leader distinguish
> > > between "a
> > > > >>>>>>>>>> full
> > > > >>>>>>>>>>>>> fetch request" and "a full fetch request and request a
> new
> > > > >>>>>>>>>> incremental
> > > > >>>>>>>>>>>>> fetch session". Alternatively, follower can also
> indicate
> > > "a full
> > > > >>>>>>>>>> fetch
> > > > >>>>>>>>>>>>> request and request a new incremental fetch session" by
> > > setting Fetch
> > > > >>>>>>>>>>>>> Session ID to -1 without using Fetch Session Epoch.
> Does
> > > this make
> > > > >>>>>>>>>> sense?
> > > > >>>>>>>>>>>> Hi Dong,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> The fetch session epoch is very important for ensuring
> > > correctness.  It
> > > > >>>>>>>>>>>> prevents corrupted or incomplete fetch data due to
> network
> > > reordering
> > > > >>>>>>>>>> or
> > > > >>>>>>>>>>>> loss.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> For example, consider a scenario where the follower
> sends a
> > > fetch
> > > > >>>>>>>>>>>> request to the leader.  The leader responds, but the
> > > response is lost
> > > > >>>>>>>>>>>> because of network problems which affected the TCP
> > > session.  In that
> > > > >>>>>>>>>>>> case, the follower must establish a new TCP session and
> > > re-send the
> > > > >>>>>>>>>>>> incremental fetch request.  But the leader does not know
> > > that the
> > > > >>>>>>>>>>>> follower didn't receive the previous incremental fetch
> > > response.  It is
> > > > >>>>>>>>>>>> only the incremental fetch epoch which lets the leader
> know
> > > that it
> > > > >>>>>>>>>>>> needs to resend that data, and not data which comes
> > > afterwards.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> You could construct similar scenarios with message
> > > reordering,
> > > > >>>>>>>>>>>> duplication, etc.  Basically, this is a stateful
> protocol
> > > on an
> > > > >>>>>>>>>>>> unreliable network, and you need to know whether the
> > > follower got the
> > > > >>>>>>>>>>>> previous data you sent before you move on.  And you
> need to
> > > handle
> > > > >>>>>>>>>>>> issues like duplicated or delayed requests.  These
> issues
> > > do not affect
> > > > >>>>>>>>>>>> the full fetch request, because it is not stateful-- any
> > > full fetch
> > > > >>>>>>>>>>>> request can be understood and properly responded to in
> > > isolation.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for the explanation. This makes sense. On the
> other
> > > hand I would
> > > > >>>>>>>>>>> be interested in learning more about whether Becket's
> > > solution can help
> > > > >>>>>>>>>>> simplify the protocol by not having the echo field and
> > > whether that is
> > > > >>>>>>>>>>> worth doing.
> > > > >>>>>>>>>> Hi Dong,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I commented about this in the other thread.  A solution
> which
> > > doesn't
> > > > >>>>>>>>>> maintain session information doesn't work here.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest will
> include
> > > partitions
> > > > >>>>>>>>>> whose
> > > > >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes has been
> > > changed. If
> > > > >>>>>>>>>>>>> follower's logStartOffet of a partition has changed,
> > > should this
> > > > >>>>>>>>>>>>> partition also be included in the next FetchRequest to
> the
> > > leader?
> > > > >>>>>>>>>>>> Otherwise, it
> > > > >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest because
> > > leader may
> > > > >>>>>>>>>> not
> > > > >>>>>>>>>>>> know
> > > > >>>>>>>>>>>>> the corresponding data has been deleted on the
> follower.
> > > > >>>>>>>>>>>> Yeah, the follower should include the partition if the
> > > logStartOffset
> > > > >>>>>>>>>>>> has changed.  That should be spelled out on the KIP.
> Fixed.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a partition is
> not
> > > considered
> > > > >>>>>>>>>>>>> dirty if its log start offset has changed. Later in the
> > > section
> > > > >>>>>>>>>>>> "FetchRequest
> > > > >>>>>>>>>>>>> Changes", it is said that incremental fetch responses
> will
> > > include a
> > > > >>>>>>>>>>>>> partition if its logStartOffset has changed. It seems
> > > inconsistent.
> > > > >>>>>>>>>> Can
> > > > >>>>>>>>>>>>> you update the KIP to clarify it?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>> In the "Per-Partition Data" section, it does say that
> > > logStartOffset
> > > > >>>>>>>>>>>> changes make a partition dirty, though, right?  The
> first
> > > bullet point
> > > > >>>>>>>>>>>> is:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this changes the
> > > log start
> > > > >>>>>>>>>> offset
> > > > >>>>>>>>>>>> of the partition on the leader., or
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> Ah I see. I think I didn't notice this because statement
> > > assumes that the
> > > > >>>>>>>>>>> LogStartOffset in the leader only changes due to
> LogCleaner.
> > > In fact the
> > > > >>>>>>>>>>> LogStartOffset can change on the leader due to either log
> > > retention and
> > > > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether
> LogCleaner
> > > can change
> > > > >>>>>>>>>>> LogStartOffset though. It may be a bit better to just say
> > > that a
> > > > >>>>>>>>>>> partition is considered dirty if LogStartOffset changes.
> > > > >>>>>>>>>> I agree.  It should be straightforward to just resend the
> > > partition if
> > > > >>>>>>>>>> logStartOffset changes.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is said that
> > > each broker
> > > > >>>>>>>>>> has a
> > > > >>>>>>>>>>>>> limited number of slots. How is this number determined?
> > > Does this
> > > > >>>>>>>>>> require
> > > > >>>>>>>>>>>>> a new broker config for this number?
> > > > >>>>>>>>>>>> Good point.  I added two broker configuration
> parameters to
> > > control
> > > > >>>>>>>>>> this
> > > > >>>>>>>>>>>> number.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> I am curious to see whether we can avoid some of these
> new
> > > configs. For
> > > > >>>>>>>>>>> example, incremental.fetch.session.
> cache.slots.per.broker
> > > is probably
> > > > >>>>>>>>>> not
> > > > >>>>>>>>>>> necessary because if a leader knows that a FetchRequest
> > > comes from a
> > > > >>>>>>>>>>> follower, we probably want the leader to always cache the
> > > information
> > > > >>>>>>>>>>> from that follower. Does this make sense?
> > > > >>>>>>>>>> Yeah, maybe we can avoid having
> > > > >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Maybe we can discuss the config later after there is
> > > agreement on how the
> > > > >>>>>>>>>>> protocol would look like.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>>> What is the error code if broker does
> > > > >>>>>>>>>>>>> not have new log for the incoming FetchRequest?
> > > > >>>>>>>>>>>> Hmm, is there a typo in this question?  Maybe you meant
> to
> > > ask what
> > > > >>>>>>>>>>>> happens if there is no new cache slot for the incoming
> > > FetchRequest?
> > > > >>>>>>>>>>>> That's not an error-- the incremental fetch session ID
> just
> > > gets set to
> > > > >>>>>>>>>>>> 0, indicating no incremental fetch session was created.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> Yeah there is a typo. You have answered my question.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>>> 5. Can you clarify what happens if follower adds a
> > > partition to the
> > > > >>>>>>>>>>>>> ReplicaFetcherThread after receiving
> LeaderAndIsrRequest?
> > > Does leader
> > > > >>>>>>>>>>>>> needs to generate a new session for this
> > > ReplicaFetcherThread or
> > > > >>>>>>>>>> does it
> > > > >>>>>>>>>>>> re-use
> > > > >>>>>>>>>>>>> the existing session?  If it uses a new session, is the
> > > old session
> > > > >>>>>>>>>>>>> actively deleted from the slot?
> > > > >>>>>>>>>>>> The basic idea is that you can't make changes, except by
> > > sending a full
> > > > >>>>>>>>>>>> fetch request.  However, perhaps we can allow the
> client to
> > > re-use its
> > > > >>>>>>>>>>>> existing session ID.  If the client sets sessionId = id,
> > > epoch = 0, it
> > > > >>>>>>>>>>>> could re-initialize the session.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> Yeah I agree with the basic idea. We probably want to
> > > understand more
> > > > >>>>>>>>>>> detail about how this works later.
> > > > >>>>>>>>>> Sounds good.  I updated the KIP with this information.  A
> > > > >>>>>>>>>> re-initialization should be exactly the same as an
> > > initialization,
> > > > >>>>>>>>>> except that it reuses an existing ID.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> best,
> > > > >>>>>>>>>> Colin
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can include
> the
> > > example
> > > > >>>>>>>>>> workflow
> > > > >>>>>>>>>>>>> of how this feature will be used in case of partition
> > > change and so
> > > > >>>>>>>>>> on.
> > > > >>>>>>>>>>>> Yeah, that might help.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> best,
> > > > >>>>>>>>>>>> Colin
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>> Dong
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<
> > > cmccabe@apache.org>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I updated the KIP with the ideas we've been
> discussing.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> best,
> > > > >>>>>>>>>>>>>> Colin
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > > > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > > > >>>>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it can become a
> > > really
> > > > >>>>>>>>>> useful
> > > > >>>>>>>>>>>> thing.
> > > > >>>>>>>>>>>>>>>> I just scanned through the discussion so far and
> wanted
> > > to
> > > > >>>>>>>>>> start a
> > > > >>>>>>>>>>>>>>>> thread to make as decision about keeping the
> > > > >>>>>>>>>>>>>>>> cache with the Connection / Session or having some
> sort
> > > of UUID
> > > > >>>>>>>>>>>> indN
> > > > >>>>>>>>>>>>>> exed
> > > > >>>>>>>>>>>>>>>> global Map.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Sorry if that has been settled already and I missed
> it.
> > > In this
> > > > >>>>>>>>>>>> case
> > > > >>>>>>>>>>>>>>>> could anyone point me to the discussion?
> > > > >>>>>>>>>>>>>>> Hi Jan,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I don't think anyone has discussed the idea of tying
> the
> > > cache
> > > > >>>>>>>>>> to an
> > > > >>>>>>>>>>>>>>> individual TCP session yet.  I agree that since the
> > > cache is
> > > > >>>>>>>>>>>> intended to
> > > > >>>>>>>>>>>>>>> be used only by a single follower or client, it's an
> > > interesting
> > > > >>>>>>>>>>>> thing
> > > > >>>>>>>>>>>>>>> to think about.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that whenever
> your
> > > TCP
> > > > >>>>>>>>>> session
> > > > >>>>>>>>>>>>>>> drops, you have to make a full fetch request rather
> than
> > > an
> > > > >>>>>>>>>>>> incremental
> > > > >>>>>>>>>>>>>>> one.  It's not clear to me how often this happens in
> > > practice --
> > > > >>>>>>>>>> it
> > > > >>>>>>>>>>>>>>> probably depends a lot on the quality of the network.
> > > From a
> > > > >>>>>>>>>> code
> > > > >>>>>>>>>>>>>>> perspective, it might also be a bit difficult to
> access
> > > data
> > > > >>>>>>>>>>>> associated
> > > > >>>>>>>>>>>>>>> with the Session from classes like KafkaApis
> (although
> > > we could
> > > > >>>>>>>>>>>> refactor
> > > > >>>>>>>>>>>>>>> it to make this easier).
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> It's also clear that even if we tie the cache to the
> > > session, we
> > > > >>>>>>>>>>>> still
> > > > >>>>>>>>>>>>>>> have to have limits on the number of caches we're
> > > willing to
> > > > >>>>>>>>>> create.
> > > > >>>>>>>>>>>>>>> And probably we should reserve some cache slots for
> each
> > > > >>>>>>>>>> follower, so
> > > > >>>>>>>>>>>>>>> that clients don't take all of them.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Id rather see a protocol in which the client is
> hinting
> > > the
> > > > >>>>>>>>>> broker
> > > > >>>>>>>>>>>>>> that,
> > > > >>>>>>>>>>>>>>>> he is going to use the feature instead of a client
> > > > >>>>>>>>>>>>>>>> realizing that the broker just offered the feature
> > > (regardless
> > > > >>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>> protocol version which should only indicate that the
> > > feature
> > > > >>>>>>>>>>>>>>>> would be usable).
> > > > >>>>>>>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do
> > > think that
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> server should have the option of not accepting
> > > incremental
> > > > >>>>>>>>>> requests
> > > > >>>>>>>>>>>> from
> > > > >>>>>>>>>>>>>>> specific clients, in order to save memory space.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> This seems to work better with a per
> > > > >>>>>>>>>>>>>>>> connection/session attached Metadata than with a Map
> > > and could
> > > > >>>>>>>>>>>> allow
> > > > >>>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>> easier client implementations.
> > > > >>>>>>>>>>>>>>>> It would also make Client-side code easier as there
> > > wouldn't
> > > > >>>>>>>>>> be any
> > > > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle.
> > > > >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss
> responses, I
> > > agree.
> > > > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most of our
> > > client-side
> > > > >>>>>>>>>> code.
> > > > >>>>>>>>>>>>>>> For example, when the Producer creates a message and
> > > hands it
> > > > >>>>>>>>>> off to
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently re-connect
> and
> > > re-send a
> > > > >>>>>>>>>>>>>>> message if the first send failed.  The higher-level
> code
> > > will
> > > > >>>>>>>>>> not be
> > > > >>>>>>>>>>>>>>> informed about whether the TCP session was
> > > re-established,
> > > > >>>>>>>>>> whether an
> > > > >>>>>>>>>>>>>>> existing TCP session was used, and so on.  So
> overall I
> > > would
> > > > >>>>>>>>>> still
> > > > >>>>>>>>>>>> lean
> > > > >>>>>>>>>>>>>>> towards not coupling this to the TCP session...
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> best,
> > > > >>>>>>>>>>>>>>> Colin
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>       Thank you again for the KIP. And again, if
> this
> > > was clarified
> > > > >>>>>>>>>>>> already
> > > > >>>>>>>>>>>>>>>> please drop me a hint where I could read about it.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Best Jan
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability and
> latency
> > > of
> > > > >>>>>>>>>>>>>> FetchRequest:
> > > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> > > > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+
> FetchRequests+to+Increase+
> > > > >>>>>>>>>>>>>> Partition+Scalability
> > > > >>>>>>>>>>>>>>>>> Please take a look.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> cheers,
> > > > >>>>>>>>>>>>>>>>> Colin
> > > >
> > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message