kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Tue, 05 Dec 2017 21:55:25 GMT
Thanks for responding, Colin.

bq. If we have a bunch of small fetch sessions and a bigger client comes
in, we might have to evict many small sessions to fit the bigger one.

Suppose there were N small fetch sessions and 1 big fetch session comes in.
If the plan is to use number of partitions to approximate heap consumption,
that should be good enough, IMHO.
Evicting only one of the N small fetch sessions may not release enough
memory since the total partition count would increase a lot.

Cheers

On Tue, Dec 5, 2017 at 1:44 PM, Colin McCabe <cmccabe@apache.org> wrote:

> On Tue, Dec 5, 2017, at 11:24, Ted Yu wrote:
> > bq. We also have a tunable for total number of cache slots. We never
> > cache
> > more than this number of incremental fetch sessions.
> >
> > Is it possible to manage the cache based on heap consumption instead of
> > number of slots ?
> > It seems heap estimation can be done by counting PartitionData (along
> > with overhead for related Map structure).
>
> Hi Ted,
>
> That's an interesting idea.  I think it starts to get complicated,
> though.
>
> For example, suppose we later implement incrementally adding partitions
> to the fetch session.  When a fetch session adds more partitions, it
> uses more memory.  So should this trigger an eviction?
>
> If we have a bunch of small fetch sessions and a bigger client comes in,
> we might have to evict many small sessions to fit the bigger one.  But
> we probably do want to fit the bigger one in, since bigger requests gain
> proportionally more from being incremental.
>
> [ Small digression: In general fetch requests have some fixed cost plus
> a variable cost based on the number of partitions.  The more partitions
> you add, the more the variable cost comes to dominate.  Therefore, it is
> especially good to make big fetch requests into incremental fetch
> requests.  Small fetch requests for one or two partitions may not gain
> much, since their cost is dominated by the fixed cost anyway (message
> header, TCP overhead, IP packet overhead, etc.) ]
>
> Overall, I would still lean towards limiting the number of incremental
> fetch sessions, rather than trying to create a per-partition data memory
> limit.  I think the complexity is probably not worth it.  The memory
> limit is more of a sanity check anyway, than a fine-grained limit.  If
> we can get the really big clients using incremental fetches, and the
> followers using incremental fetches, we have captured most of the
> benefits.  I'm curious if there is a more elegant way to limit
> per-partition that I may have missed, though?
>
> best,
> Colin
>
>
> >
> > Cheers
> >
> > On Tue, Dec 5, 2017 at 11:02 AM, Colin McCabe <cmccabe@apache.org>
> wrote:
> >
> > > On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote:
> > > > Hi Colin,
> > > >
> > > > Thanks for the response. A couple replies:
> > > >
> > > >
> > > > > I’m a bit ambivalent about letting the client choose the session
> > > > > timeout.  What if clients choose timeouts that are too long?
> Hmm....
> > > > > I do agree the timeout should be sized proportional to
> > > > > max.poll.interval.ms.
> > > >
> > > >
> > > > We have solved this in other cases by letting the broker enforce a
> > > > maximum timeout. After thinking about it a bit, it's probably
> overkill
> > > in this
> > > > case since the caching is just an optimization. Instead of stressing
> over
> > > > timeouts and such, I am actually wondering if we just need a
> reasonable
> > > > session cache eviction policy. For example, when the number of slots
> is
> > > > exceeded, perhaps you evict the session with the fewest partitions
> or the
> > > > one with the largest interval between fetches. We could give
> priority to
> > > > the replicas. Perhaps this might let us get rid of a few of the
> configs.
> > >
> > > I agree that it would be nice to get rid of the tunable for eviction
> > > time.  However, I'm concerned that if we do, we might run into cache
> > > thrashing.  For example, if we have N cache slots and N+1 clients that
> > > are all fetching continuously, we might have to evict a client on every
> > > single fetch.  It would be much better to give a cache slot to N
> clients
> > > and let the last client do full fetch requests.
> > >
> > > Perhaps we could mitigate this problem by evicting the smallest fetch
> > > session-- the one that is for the smallest number of partitions.  This
> > > would allow "big" clients that fetch many partitions (e.g. MirrorMaker)
> > > to get priority.  But then you run into the problem where someone
> > > fetches a huge number of partitions, and then goes away for a long
> time,
> > > and you never reuse that cache memory.
> > >
> > > How about this approach?  We have a tunable for minimum eviction time
> > > (default 2 minutes).  We cannot evict a client before this timeout has
> > > expired.  We also have a tunable for total number of cache slots.  We
> > > never cache more than this number of incremental fetch sessions.
> > >
> > > Sessions become eligible for eviction after 2 minutes, whether or not
> > > the session is active.
> > > Fetch Request A will evict Fetch Request B if and only if:
> > > 1. A has been active in the last 2 minutes and B has not, OR
> > > 2. A was made by a follower and B was made by a consumer, OR
> > > 3. A has more partitions than B, OR
> > > 4. A is newer than B
> > >
> > > Then, in a setup where consumers are fetching different numbers of
> > > partitions, we will eventually converge on giving incremental fetch
> > > sessions to the big consumers, and not to the small consumers.  In a
> > > setup where consumers are all of equal size but the cache is too small
> > > for all of them, we still thrash, but slowly.  Nobody can be evicted
> > > before their 2 minutes are up.  So in general, the overhead of the
> extra
> > > full requests is still low.  If someone makes a big request and then
> > > shuts down, they get cleaned up after 2 minutes, because of condition
> > > #1.  And there are only two tunables needed: cache size and eviction
> > > time.
> > >
> > > >
> > > > The main reason is if there is a bug in the incremental fetch
> feature.
> > > > >
> > > >
> > > > This was in response to my question about removing the consumer
> config.
> > > > And sure, any new feature may have bugs, but that's what we have
> testing
> > > for
> > > > ;). Users can always fall back to a previous version if there are any
> > > > major problems. As you know, it's tough removing configs once they
> are
> > > there,
> > > > so I think we should try to add them only if they make sense in the
> long
> > > > term.
> > >
> > > That's a fair point.  I guess if we do need to disable incremental
> > > fetches in production because of a bug, we can modify the broker
> > > configuration to do so (by setting 0 cache slots).
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Mon, Dec 4, 2017 at 11:06 PM, Colin McCabe <cmccabe@apache.org>
> > > wrote:
> > > >
> > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > > > >
> > > > > >
> > > > > > On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > > On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > > > >> Thanks for the explanation, Colin. A few more questions.
> > > > > > >>
> > > > > > >>> The session epoch is not complex.  It's just a number which
> > > > > increments
> > > > > > >>> on each incremental fetch.  The session epoch is also useful
> for
> > > > > > >>> debugging-- it allows you to match up requests and responses
> when
> > > > > > >>> looking at log files.
> > > > > > >> Currently each request in Kafka has a correlation id to help
> > > match the
> > > > > > >> requests and responses. Is epoch doing something differently?
> > > > > > > Hi Becket,
> > > > > > >
> > > > > > > The correlation ID is used within a single TCP session, to
> uniquely
> > > > > > > associate a request with a response.  The correlation ID is not
> > > unique
> > > > > > > (and has no meaning) outside the context of that single TCP
> > > session.
> > > > > > >
> > > > > > > Keep in mind, NetworkClient is in charge of TCP sessions, and
> > > generally
> > > > > > > tries to hide that information from the upper layers of the
> code.
> > > So
> > > > > > > when you submit a request to NetworkClient, you don't know if
> that
> > > > > > > request creates a TCP session, or reuses an existing one.
> > > > > > >>> Unfortunately, this doesn't work.  Imagine the client misses
> an
> > > > > > >>> increment fetch response about a partition.  And then the
> > > partition
> > > > > is
> > > > > > >>> never updated after that.  The client has no way to know
> about
> > > the
> > > > > > >>> partition, since it won't be included in any future
> incremental
> > > fetch
> > > > > > >>> responses.  And there are no offsets to compare, since the
> > > partition
> > > > > is
> > > > > > >>> simply omitted from the response.
> > > > > > >> I am curious about in which situation would the follower miss
> a
> > > > > response
> > > > > > >> of a partition. If the entire FetchResponse is lost (e.g.
> > > timeout),
> > > > > the
> > > > > > >> follower would disconnect and retry. That will result in
> sending a
> > > > > full
> > > > > > >> FetchRequest.
> > > > > > > Basically, you are proposing that we rely on TCP for reliable
> > > delivery
> > > > > > > in a distributed system.  That isn't a good idea for a bunch of
> > > > > > > different reasons.  First of all, TCP timeouts tend to be very
> > > long.
> > > > > So
> > > > > > > if the TCP session timing out is your error detection
> mechanism,
> > > you
> > > > > > > have to wait minutes for messages to timeout.  Of course, we
> add a
> > > > > > > timeout on top of that after which we declare the connection
> bad
> > > and
> > > > > > > manually close it.  But just because the session is closed on
> one
> > > end
> > > > > > > doesn't mean that the other end knows that it is closed.  So
> the
> > > leader
> > > > > > > may have to wait quite a long time before TCP decides that yes,
> > > > > > > connection X from the follower is dead and not coming back,
> even
> > > though
> > > > > > > gremlins ate the FIN packet which the follower attempted to
> > > translate.
> > > > > > > If the cache state is tied to that TCP session, we have to keep
> > > that
> > > > > > > cache around for a much longer time than we should.
> > > > > > Hi,
> > > > > >
> > > > > > I see this from a different perspective. The cache expiry time
> > > > > > has the same semantic as idle connection time in this scenario.
> > > > > > It is the time range we expect the client to come back an reuse
> > > > > > its broker side state. I would argue that on close we would get
> an
> > > > > > extra shot at cleaning up the session state early. As opposed to
> > > > > > always wait for that duration for expiry to happen.
> > > > >
> > > > > Hi Jan,
> > > > >
> > > > > The idea here is that the incremental fetch cache expiry time can
> be
> > > > > much shorter than the TCP session timeout.  In general the TCP
> session
> > > > > timeout is common to all TCP connections, and very long.  To make
> these
> > > > > numbers a little more concrete, the TCP session timeout is often
> > > > > configured to be 2 hours on Linux.  (See
> > > > > https://www.cyberciti.biz/tips/linux-increasing-or-
> > > decreasing-tcp-sockets-
> > > > > timeouts.html
> > > > > )  The timeout I was proposing for incremental fetch sessions was
> one
> > > or
> > > > > two minutes at most.
> > > > >
> > > > > >
> > > > > > > Secondly, from a software engineering perspective, it's not a
> good
> > > idea
> > > > > > > to try to tightly tie together TCP and our code.  We would
> have to
> > > > > > > rework how we interact with NetworkClient so that we are aware
> of
> > > > > things
> > > > > > > like TCP sessions closing or opening.  We would have to be
> careful
> > > > > > > preserve the ordering of incoming messages when doing things
> like
> > > > > > > putting incoming requests on to a queue to be processed by
> multiple
> > > > > > > threads.  It's just a lot of complexity to add, and there's no
> > > upside.
> > > > > > I see the point here. And I had a small chat with Dong Lin
> already
> > > > > > making me aware of this. I tried out the approaches and propose
> the
> > > > > > following:
> > > > > >
> > > > > > The client start and does a full fetch. It then does incremental
> > > fetches.
> > > > > > The connection to the broker dies and is re-established by
> > > NetworkClient
> > > > > > under the hood.
> > > > > > The broker sees an incremental fetch without having state =>
> returns
> > > > > > error:
> > > > > > Client sees the error, does a full fetch and goes back to
> > > incrementally
> > > > > > fetching.
> > > > > >
> > > > > > having this 1 additional error round trip is essentially the
> same as
> > > > > > when something
> > > > > > with the sessions or epoch changed unexpectedly to the client
> (say
> > > > > > expiry).
> > > > > >
> > > > > > So its nothing extra added but the conditions are easier to
> evaluate.
> > > > > > Especially since we do everything with NetworkClient. Other
> > > implementers
> > > > > > on the
> > > > > > protocol are free to optimizes this and do not do the errornours
> > > > > > roundtrip on the
> > > > > > new connection.
> > > > > > Its a great plus that the client can know when the error is gonna
> > > > > > happen. instead of
> > > > > > the server to always have to report back if something changes
> > > > > > unexpectedly for the client
> > > > >
> > > > > You are assuming that the leader and the follower agree that the
> TCP
> > > > > session drops at the same time.  When there are network problems,
> this
> > > > > may not be true.  The leader may still think the previous TCP
> session
> > > is
> > > > > active.  In that case, we have to keep the incremental fetch
> session
> > > > > state around until we learn otherwise (which could be up to that 2
> hour
> > > > > timeout I mentioned).  And if we get a new incoming incremental
> fetch
> > > > > request, we can't assume that it replaces the previous one,
> because the
> > > > > IDs will be different (the new one starts a new session).
> > > > >
> > > > > >
> > > > > > > Imagine that I made an argument that client IDs are "complex"
> and
> > > > > should
> > > > > > > be removed from our APIs.  After all, we can just look at the
> > > remote IP
> > > > > > > address and TCP port of each connection.  Would you think that
> was
> > > a
> > > > > > > good idea?  The client ID is useful when looking at logs.  For
> > > example,
> > > > > > > if a rebalance is having problems, you want to know what
> clients
> > > were
> > > > > > > having a problem.  So having the client ID field to guide you
> is
> > > > > > > actually much less "complex" in practice than not having an ID.
> > > > > > I still cant follow why the correlation idea will not help here.
> > > > > > Correlating logs with it usually works great. Even with primitive
> > > tools
> > > > > > like grep
> > > > >
> > > > > The correlation ID does help somewhat, but certainly not as much
> as a
> > > > > unique 64-bit ID.  The correlation ID is not unique in the broker,
> just
> > > > > unique to a single NetworkClient.  Simiarly, the correlation ID is
> not
> > > > > unique on the client side, if there are multiple Consumers, etc.
> > > > >
> > > > > >
> > > > > > > Similarly, if metadata responses had epoch numbers (simple
> > > incrementing
> > > > > > > numbers), we would not have to debug problems like clients
> > > accidentally
> > > > > > > getting old metadata from servers that had been partitioned off
> > > from
> > > > > the
> > > > > > > network for a while.  Clients would know the difference between
> > > old and
> > > > > > > new metadata.  So putting epochs in to the metadata request is
> much
> > > > > less
> > > > > > > "complex" operationally, even though it's an extra field in the
> > > > > request.
> > > > > > >   This has been discussed before on the mailing list.
> > > > > > >
> > > > > > > So I think the bottom line for me is that having the session
> ID and
> > > > > > > session epoch, while it adds two extra fields, reduces
> operational
> > > > > > > complexity and increases debuggability.  It avoids tightly
> > > coupling us
> > > > > > > to assumptions about reliable ordered delivery which tend to be
> > > > > violated
> > > > > > > in practice in multiple layers of the stack.  Finally, it
> avoids
> > > the
> > > > > > > necessity of refactoring NetworkClient.
> > > > > > So there is stacks out there that violate TCP guarantees? And
> > > software
> > > > > > still works? How can this be? Can you elaborate a little where
> this
> > > > > > can be violated? I am not very familiar with virtualized
> environments
> > > > > > but they can't really violate TCP contracts.
> > > > >
> > > > > TCP's guarantees of reliable, in-order transmission certainly can
> be
> > > > > violated.  For example, I once had to debug a cluster where a
> certain
> > > > > node had a network card which corrupted its transmissions
> occasionally.
> > > > > With all the layers of checksums, you would think that this was not
> > > > > possible, but it happened.  We occasionally got corrupted data
> written
> > > > > to disk on the other end because of it.  Even more frustrating, the
> > > data
> > > > > was not corrupted on disk on the sending node-- it was a bug in the
> > > > > network card driver that was injecting the errors.
> > > > >
> > > > > However, my point was not about TCP's guarantees being violated.
> My
> > > > > point is that TCP's guarantees are only one small building block to
> > > > > build a robust distributed system.  TCP basically just says that
> if you
> > > > > get any bytes from the stream, you will get the ones that were
> sent by
> > > > > the sender, in the order they were sent.  TCP does not guarantee
> that
> > > > > the bytes you send will get there.  It does not guarantee that if
> you
> > > > > close the connection, the other end will know about it in a timely
> > > > > fashion.  It does not guarantee that the bytes will be received in
> a
> > > > > certain timeframe, and certainly doesn't guarantee that if you
> send a
> > > > > byte on connection X and then on connection Y, that the remote end
> will
> > > > > read a byte on X before reading a byte on Y.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Hope this made my view clearer, especially the first part.
> > > > > >
> > > > > > Best Jan
> > > > > >
> > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >
> > > > > > >> If there is an error such as NotLeaderForPartition is
> > > > > > >> returned for some partitions, the follower can always send a
> full
> > > > > > >> FetchRequest. Is there a scenario that only some of the
> > > partitions in
> > > > > a
> > > > > > >> FetchResponse is lost?
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Jiangjie (Becket) Qin
> > > > > > >>
> > > > > > >>
> > > > > > >> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> cmccabe@apache.org>
> > > > > wrote:
> > > > > > >>
> > > > > > >>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > > > > >>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<
> > > cmccabe@apache.org>
> > > > > > >>> wrote:
> > > > > > >>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > > > >>>>>> Hey Colin,
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks much for the update. I have a few questions below:
> > > > > > >>>>>>
> > > > > > >>>>>> 1. I am not very sure that we need Fetch Session Epoch. It
> > > seems
> > > > > that
> > > > > > >>>>>> Fetch
> > > > > > >>>>>> Session Epoch is only needed to help leader distinguish
> > > between "a
> > > > > > >>> full
> > > > > > >>>>>> fetch request" and "a full fetch request and request a new
> > > > > > >>> incremental
> > > > > > >>>>>> fetch session". Alternatively, follower can also indicate
> "a
> > > full
> > > > > > >>> fetch
> > > > > > >>>>>> request and request a new incremental fetch session" by
> > > setting
> > > > > Fetch
> > > > > > >>>>>> Session ID to -1 without using Fetch Session Epoch. Does
> this
> > > make
> > > > > > >>> sense?
> > > > > > >>>>> Hi Dong,
> > > > > > >>>>>
> > > > > > >>>>> The fetch session epoch is very important for ensuring
> > > > > correctness.  It
> > > > > > >>>>> prevents corrupted or incomplete fetch data due to network
> > > > > reordering
> > > > > > >>> or
> > > > > > >>>>> loss.
> > > > > > >>>>>
> > > > > > >>>>> For example, consider a scenario where the follower sends a
> > > fetch
> > > > > > >>>>> request to the leader.  The leader responds, but the
> response
> > > is
> > > > > lost
> > > > > > >>>>> because of network problems which affected the TCP
> session.  In
> > > > > that
> > > > > > >>>>> case, the follower must establish a new TCP session and
> > > re-send the
> > > > > > >>>>> incremental fetch request.  But the leader does not know
> that
> > > the
> > > > > > >>>>> follower didn't receive the previous incremental fetch
> > > response.
> > > > > It is
> > > > > > >>>>> only the incremental fetch epoch which lets the leader know
> > > that it
> > > > > > >>>>> needs to resend that data, and not data which comes
> afterwards.
> > > > > > >>>>>
> > > > > > >>>>> You could construct similar scenarios with message
> reordering,
> > > > > > >>>>> duplication, etc.  Basically, this is a stateful protocol
> on an
> > > > > > >>>>> unreliable network, and you need to know whether the
> follower
> > > got
> > > > > the
> > > > > > >>>>> previous data you sent before you move on.  And you need to
> > > handle
> > > > > > >>>>> issues like duplicated or delayed requests.  These issues
> do
> > > not
> > > > > affect
> > > > > > >>>>> the full fetch request, because it is not stateful-- any
> full
> > > fetch
> > > > > > >>>>> request can be understood and properly responded to in
> > > isolation.
> > > > > > >>>>>
> > > > > > >>>> Thanks for the explanation. This makes sense. On the other
> hand
> > > I
> > > > > would
> > > > > > >>>> be interested in learning more about whether Becket's
> solution
> > > can
> > > > > help
> > > > > > >>>> simplify the protocol by not having the echo field and
> whether
> > > that
> > > > > is
> > > > > > >>>> worth doing.
> > > > > > >>> Hi Dong,
> > > > > > >>>
> > > > > > >>> I commented about this in the other thread.  A solution which
> > > doesn't
> > > > > > >>> maintain session information doesn't work here.
> > > > > > >>>
> > > > > > >>>>
> > > > > > >>>>>> 2. It is said that Incremental FetchRequest will include
> > > > > partitions
> > > > > > >>> whose
> > > > > > >>>>>> fetch offset or maximum number of fetch bytes has been
> > > changed. If
> > > > > > >>>>>> follower's logStartOffet of a partition has changed,
> should
> > > this
> > > > > > >>>>>> partition also be included in the next FetchRequest to the
> > > leader?
> > > > > > >>>>> Otherwise, it
> > > > > > >>>>>> may affect the handling of DeleteRecordsRequest because
> > > leader may
> > > > > > >>> not
> > > > > > >>>>> know
> > > > > > >>>>>> the corresponding data has been deleted on the follower.
> > > > > > >>>>> Yeah, the follower should include the partition if the
> > > > > logStartOffset
> > > > > > >>>>> has changed.  That should be spelled out on the KIP.
> Fixed.
> > > > > > >>>>>
> > > > > > >>>>>> 3. In the section "Per-Partition Data", a partition is not
> > > > > considered
> > > > > > >>>>>> dirty if its log start offset has changed. Later in the
> > > section
> > > > > > >>>>> "FetchRequest
> > > > > > >>>>>> Changes", it is said that incremental fetch responses will
> > > > > include a
> > > > > > >>>>>> partition if its logStartOffset has changed. It seems
> > > > > inconsistent.
> > > > > > >>> Can
> > > > > > >>>>>> you update the KIP to clarify it?
> > > > > > >>>>>>
> > > > > > >>>>> In the "Per-Partition Data" section, it does say that
> > > > > logStartOffset
> > > > > > >>>>> changes make a partition dirty, though, right?  The first
> > > bullet
> > > > > point
> > > > > > >>>>> is:
> > > > > > >>>>>
> > > > > > >>>>>> * The LogCleaner deletes messages, and this changes the
> log
> > > start
> > > > > > >>> offset
> > > > > > >>>>> of the partition on the leader., or
> > > > > > >>>>>
> > > > > > >>>> Ah I see. I think I didn't notice this because statement
> assumes
> > > > > that the
> > > > > > >>>> LogStartOffset in the leader only changes due to
> LogCleaner. In
> > > > > fact the
> > > > > > >>>> LogStartOffset can change on the leader due to either log
> > > retention
> > > > > and
> > > > > > >>>> DeleteRecordsRequest. I haven't verified whether LogCleaner
> can
> > > > > change
> > > > > > >>>> LogStartOffset though. It may be a bit better to just say
> that a
> > > > > > >>>> partition is considered dirty if LogStartOffset changes.
> > > > > > >>> I agree.  It should be straightforward to just resend the
> > > partition
> > > > > if
> > > > > > >>> logStartOffset changes.
> > > > > > >>>
> > > > > > >>>>>> 4. In "Fetch Session Caching" section, it is said that
> each
> > > broker
> > > > > > >>> has a
> > > > > > >>>>>> limited number of slots. How is this number determined?
> Does
> > > this
> > > > > > >>> require
> > > > > > >>>>>> a new broker config for this number?
> > > > > > >>>>> Good point.  I added two broker configuration parameters to
> > > control
> > > > > > >>> this
> > > > > > >>>>> number.
> > > > > > >>>>>
> > > > > > >>>> I am curious to see whether we can avoid some of these new
> > > configs.
> > > > > For
> > > > > > >>>> example, incremental.fetch.session.cache.slots.per.broker
> is
> > > > > probably
> > > > > > >>> not
> > > > > > >>>> necessary because if a leader knows that a FetchRequest
> comes
> > > from a
> > > > > > >>>> follower, we probably want the leader to always cache the
> > > > > information
> > > > > > >>>> from that follower. Does this make sense?
> > > > > > >>> Yeah, maybe we can avoid having
> > > > > > >>> incremental.fetch.session.cache.slots.per.broker.
> > > > > > >>>
> > > > > > >>>> Maybe we can discuss the config later after there is
> agreement
> > > on
> > > > > how the
> > > > > > >>>> protocol would look like.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>>> What is the error code if broker does
> > > > > > >>>>>> not have new log for the incoming FetchRequest?
> > > > > > >>>>> Hmm, is there a typo in this question?  Maybe you meant to
> ask
> > > what
> > > > > > >>>>> happens if there is no new cache slot for the incoming
> > > > > FetchRequest?
> > > > > > >>>>> That's not an error-- the incremental fetch session ID just
> > > gets
> > > > > set to
> > > > > > >>>>> 0, indicating no incremental fetch session was created.
> > > > > > >>>>>
> > > > > > >>>> Yeah there is a typo. You have answered my question.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>>> 5. Can you clarify what happens if follower adds a
> partition
> > > to
> > > > > the
> > > > > > >>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest?
> Does
> > > > > leader
> > > > > > >>>>>> needs to generate a new session for this
> ReplicaFetcherThread
> > > or
> > > > > > >>> does it
> > > > > > >>>>> re-use
> > > > > > >>>>>> the existing session?  If it uses a new session, is the
> old
> > > > > session
> > > > > > >>>>>> actively deleted from the slot?
> > > > > > >>>>> The basic idea is that you can't make changes, except by
> > > sending a
> > > > > full
> > > > > > >>>>> fetch request.  However, perhaps we can allow the client to
> > > re-use
> > > > > its
> > > > > > >>>>> existing session ID.  If the client sets sessionId = id,
> epoch
> > > =
> > > > > 0, it
> > > > > > >>>>> could re-initialize the session.
> > > > > > >>>>>
> > > > > > >>>> Yeah I agree with the basic idea. We probably want to
> understand
> > > > > more
> > > > > > >>>> detail about how this works later.
> > > > > > >>> Sounds good.  I updated the KIP with this information.  A
> > > > > > >>> re-initialization should be exactly the same as an
> > > initialization,
> > > > > > >>> except that it reuses an existing ID.
> > > > > > >>>
> > > > > > >>> best,
> > > > > > >>> Colin
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>>>> BTW, I think it may be useful if the KIP can include the
> > > example
> > > > > > >>> workflow
> > > > > > >>>>>> of how this feature will be used in case of partition
> change
> > > and
> > > > > so
> > > > > > >>> on.
> > > > > > >>>>> Yeah, that might help.
> > > > > > >>>>>
> > > > > > >>>>> best,
> > > > > > >>>>> Colin
> > > > > > >>>>>
> > > > > > >>>>>> Thanks,
> > > > > > >>>>>> Dong
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<
> > > cmccabe@apache.org
> > > > > >
> > > > > > >>>>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> I updated the KIP with the ideas we've been discussing.
> > > > > > >>>>>>>
> > > > > > >>>>>>> best,
> > > > > > >>>>>>> Colin
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > > > > > >>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > > > > > >>>>>>>>> Hi Colin, thank you  for this KIP, it can become a
> really
> > > > > > >>> useful
> > > > > > >>>>> thing.
> > > > > > >>>>>>>>> I just scanned through the discussion so far and
> wanted to
> > > > > > >>> start a
> > > > > > >>>>>>>>> thread to make as decision about keeping the
> > > > > > >>>>>>>>> cache with the Connection / Session or having some
> sort of
> > > UUID
> > > > > > >>>>> indN
> > > > > > >>>>>>> exed
> > > > > > >>>>>>>>> global Map.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Sorry if that has been settled already and I missed
> it. In
> > > this
> > > > > > >>>>> case
> > > > > > >>>>>>>>> could anyone point me to the discussion?
> > > > > > >>>>>>>> Hi Jan,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I don't think anyone has discussed the idea of tying the
> > > cache
> > > > > > >>> to an
> > > > > > >>>>>>>> individual TCP session yet.  I agree that since the
> cache is
> > > > > > >>>>> intended to
> > > > > > >>>>>>>> be used only by a single follower or client, it's an
> > > interesting
> > > > > > >>>>> thing
> > > > > > >>>>>>>> to think about.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I guess the obvious disadvantage is that whenever your
> TCP
> > > > > > >>> session
> > > > > > >>>>>>>> drops, you have to make a full fetch request rather
> than an
> > > > > > >>>>> incremental
> > > > > > >>>>>>>> one.  It's not clear to me how often this happens in
> > > practice --
> > > > > > >>> it
> > > > > > >>>>>>>> probably depends a lot on the quality of the network.
> From
> > > a
> > > > > > >>> code
> > > > > > >>>>>>>> perspective, it might also be a bit difficult to access
> data
> > > > > > >>>>> associated
> > > > > > >>>>>>>> with the Session from classes like KafkaApis (although
> we
> > > could
> > > > > > >>>>> refactor
> > > > > > >>>>>>>> it to make this easier).
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> It's also clear that even if we tie the cache to the
> > > session, we
> > > > > > >>>>> still
> > > > > > >>>>>>>> have to have limits on the number of caches we're
> willing to
> > > > > > >>> create.
> > > > > > >>>>>>>> And probably we should reserve some cache slots for each
> > > > > > >>> follower, so
> > > > > > >>>>>>>> that clients don't take all of them.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Id rather see a protocol in which the client is
> hinting the
> > > > > > >>> broker
> > > > > > >>>>>>> that,
> > > > > > >>>>>>>>> he is going to use the feature instead of a client
> > > > > > >>>>>>>>> realizing that the broker just offered the feature
> > > (regardless
> > > > > > >>> of
> > > > > > >>>>>>>>> protocol version which should only indicate that the
> > > feature
> > > > > > >>>>>>>>> would be usable).
> > > > > > >>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do
> think
> > > that
> > > > > > >>> the
> > > > > > >>>>>>>> server should have the option of not accepting
> incremental
> > > > > > >>> requests
> > > > > > >>>>> from
> > > > > > >>>>>>>> specific clients, in order to save memory space.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> This seems to work better with a per
> > > > > > >>>>>>>>> connection/session attached Metadata than with a Map
> and
> > > could
> > > > > > >>>>> allow
> > > > > > >>>>>>> for
> > > > > > >>>>>>>>> easier client implementations.
> > > > > > >>>>>>>>> It would also make Client-side code easier as there
> > > wouldn't
> > > > > > >>> be any
> > > > > > >>>>>>>>> Cache-miss error Messages to handle.
> > > > > > >>>>>>>> It is nice not to have to handle cache-miss responses, I
> > > agree.
> > > > > > >>>>>>>> However, TCP sessions aren't exposed to most of our
> > > client-side
> > > > > > >>> code.
> > > > > > >>>>>>>> For example, when the Producer creates a message and
> hands
> > > it
> > > > > > >>> off to
> > > > > > >>>>> the
> > > > > > >>>>>>>> NetworkClient, the NC will transparently re-connect and
> > > re-send
> > > > > a
> > > > > > >>>>>>>> message if the first send failed.  The higher-level code
> > > will
> > > > > > >>> not be
> > > > > > >>>>>>>> informed about whether the TCP session was
> re-established,
> > > > > > >>> whether an
> > > > > > >>>>>>>> existing TCP session was used, and so on.  So overall I
> > > would
> > > > > > >>> still
> > > > > > >>>>> lean
> > > > > > >>>>>>>> towards not coupling this to the TCP session...
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> best,
> > > > > > >>>>>>>> Colin
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>>    Thank you again for the KIP. And again, if this was
> > > > > clarified
> > > > > > >>>>> already
> > > > > > >>>>>>>>> please drop me a hint where I could read about it.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Best Jan
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> > > > > > >>>>>>>>>> Hi all,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> I created a KIP to improve the scalability and
> latency of
> > > > > > >>>>>>> FetchRequest:
> > > > > > >>>>>>>>>> https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> > > > > > >>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > > >>>>>>> Partition+Scalability
> > > > > > >>>>>>>>>> Please take a look.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> cheers,
> > > > > > >>>>>>>>>> Colin
> > > > > >
> > > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message