kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Fri, 08 Dec 2017 09:16:23 GMT
Hi,

sorry for the late reply, busy times :-/

I would ask you one thing maybe. Since the timeout
argument seems to be settled I have no further argument
form your side except the "i don't want to".

Can you see that connection.max.idle.max is the exact time
that expresses "We expect the client to be away for this long,
and come back and continue"?

also clarified some stuff inline

Best Jan




On 05.12.2017 23:14, Colin McCabe wrote:
> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
>> Hi Colin
>>
>> Addressing the topic of how to manage slots from the other thread.
>> With tcp connections all this comes for free essentially.
> Hi Jan,
>
> I don't think that it's accurate to say that cache management "comes for
> free" by coupling the incremental fetch session with the TCP session.
> When a new TCP session is started by a fetch request, you still have to
> decide whether to grant that request an incremental fetch session or
> not.  If your answer is that you always grant the request, I would argue
> that you do not have cache management.
First I would say, the client has a big say in this. If the client
is not going to issue incremental he shouldn't ask for a cache
when the client ask for the cache we still have all options to deny.

>
> I guess you could argue that timeouts are cache management, but I don't
> find that argument persuasive.  Anyone could just create a lot of TCP
> sessions and use a lot of resources, in that case.  So there is
> essentially no limit on memory use.  In any case, TCP sessions don't
> help us implement fetch session timeouts.
We still have all the options denying the request to keep the state.
What you want seems like a max connections / ip safeguard.
I can currently take down a broker with to many connections easily.


>> I still would argue we disable it by default and make a flag in the
>> broker to ask the leader to maintain the cache while replicating and also only
>> have it optional in consumers (default to off) so one can turn it on
>> where it really hurts.  MirrorMaker and audit consumers prominently.
> I agree with Jason's point from earlier in the thread.  Adding extra
> configuration knobs that aren't really necessary can harm usability.
> Certainly asking people to manually turn on a feature "where it really
> hurts" seems to fall in that category, when we could easily enable it
> automatically for them.
This doesn't make much sense to me. You also wanted to implement
a "turn of in case of bug"-knob. Having the client indicate if the feauture
will be used seems reasonable to me.,
>
>> Otherwise I left a few remarks in-line, which should help to understand
>> my view of the situation better
>>
>> Best Jan
>>
>>
>> On 05.12.2017 08:06, Colin McCabe wrote:
>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
>>>> On 03.12.2017 21:55, Colin McCabe wrote:
>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
>>>>>> Thanks for the explanation, Colin. A few more questions.
>>>>>>
>>>>>>> The session epoch is not complex.  It's just a number which increments
>>>>>>> on each incremental fetch.  The session epoch is also useful
for
>>>>>>> debugging-- it allows you to match up requests and responses
when
>>>>>>> looking at log files.
>>>>>> Currently each request in Kafka has a correlation id to help match
the
>>>>>> requests and responses. Is epoch doing something differently?
>>>>> Hi Becket,
>>>>>
>>>>> The correlation ID is used within a single TCP session, to uniquely
>>>>> associate a request with a response.  The correlation ID is not unique
>>>>> (and has no meaning) outside the context of that single TCP session.
>>>>>
>>>>> Keep in mind, NetworkClient is in charge of TCP sessions, and generally
>>>>> tries to hide that information from the upper layers of the code.  So
>>>>> when you submit a request to NetworkClient, you don't know if that
>>>>> request creates a TCP session, or reuses an existing one.
>>>>>>> Unfortunately, this doesn't work.  Imagine the client misses
an
>>>>>>> increment fetch response about a partition.  And then the partition
is
>>>>>>> never updated after that.  The client has no way to know about
the
>>>>>>> partition, since it won't be included in any future incremental
fetch
>>>>>>> responses.  And there are no offsets to compare, since the partition
is
>>>>>>> simply omitted from the response.
>>>>>> I am curious about in which situation would the follower miss a response
>>>>>> of a partition. If the entire FetchResponse is lost (e.g. timeout),
the
>>>>>> follower would disconnect and retry. That will result in sending
a full
>>>>>> FetchRequest.
>>>>> Basically, you are proposing that we rely on TCP for reliable delivery
>>>>> in a distributed system.  That isn't a good idea for a bunch of
>>>>> different reasons.  First of all, TCP timeouts tend to be very long.
 So
>>>>> if the TCP session timing out is your error detection mechanism, you
>>>>> have to wait minutes for messages to timeout.  Of course, we add a
>>>>> timeout on top of that after which we declare the connection bad and
>>>>> manually close it.  But just because the session is closed on one end
>>>>> doesn't mean that the other end knows that it is closed.  So the leader
>>>>> may have to wait quite a long time before TCP decides that yes,
>>>>> connection X from the follower is dead and not coming back, even though
>>>>> gremlins ate the FIN packet which the follower attempted to translate.
>>>>> If the cache state is tied to that TCP session, we have to keep that
>>>>> cache around for a much longer time than we should.
>>>> Hi,
>>>>
>>>> I see this from a different perspective. The cache expiry time
>>>> has the same semantic as idle connection time in this scenario.
>>>> It is the time range we expect the client to come back an reuse
>>>> its broker side state. I would argue that on close we would get an
>>>> extra shot at cleaning up the session state early. As opposed to
>>>> always wait for that duration for expiry to happen.
>>> Hi Jan,
>>>
>>> The idea here is that the incremental fetch cache expiry time can be
>>> much shorter than the TCP session timeout.  In general the TCP session
>>> timeout is common to all TCP connections, and very long.  To make these
>>> numbers a little more concrete, the TCP session timeout is often
>>> configured to be 2 hours on Linux.  (See
>>> https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html
>>> )  The timeout I was proposing for incremental fetch sessions was one or
>>> two minutes at most.
>> Currently this is taken care of by
>> connections.max.idle.ms on the broker and defaults to something of few
>> minutes.
> It is 10 minutes by default, which is longer than what we want the
> incremental fetch session timeout to be.  There's no reason to couple
> these two things.
>
>> Also something we could let the client change if we really wanted to.
>> So there is no need to worry about coupling our implementation to some
>> timeouts given by the OS, with TCP one always has full control over the worst
>> times + one gets the extra shot cleaning up early when the close comes through.
>> Which is the majority of the cases.
> In the majority of cases, the TCP session will be re-established.  In
> that case, we have to send a full fetch request rather than an
> incremental fetch request.
I actually have a hard time believing this. Do you have any numbers of
any existing production system? Is it the virtualisation layer cutting 
all the connections?
We see this only on application crashes and restarts where the app needs 
todo the full anyways
as it probably continues with stores offsets.
>
>>>>> Secondly, from a software engineering perspective, it's not a good idea
>>>>> to try to tightly tie together TCP and our code.  We would have to
>>>>> rework how we interact with NetworkClient so that we are aware of things
>>>>> like TCP sessions closing or opening.  We would have to be careful
>>>>> preserve the ordering of incoming messages when doing things like
>>>>> putting incoming requests on to a queue to be processed by multiple
>>>>> threads.  It's just a lot of complexity to add, and there's no upside.
>>>> I see the point here. And I had a small chat with Dong Lin already
>>>> making me aware of this. I tried out the approaches and propose the
>>>> following:
>>>>
>>>> The client start and does a full fetch. It then does incremental fetches.
>>>> The connection to the broker dies and is re-established by NetworkClient
>>>> under the hood.
>>>> The broker sees an incremental fetch without having state => returns
>>>> error:
>>>> Client sees the error, does a full fetch and goes back to incrementally
>>>> fetching.
>>>>
>>>> having this 1 additional error round trip is essentially the same as
>>>> when something
>>>> with the sessions or epoch changed unexpectedly to the client (say
>>>> expiry).
>>>>
>>>> So its nothing extra added but the conditions are easier to evaluate.
>>>> Especially since we do everything with NetworkClient. Other implementers
>>>> on the
>>>> protocol are free to optimizes this and do not do the errornours
>>>> roundtrip on the
>>>> new connection.
>>>> Its a great plus that the client can know when the error is gonna
>>>> happen. instead of
>>>> the server to always have to report back if something changes
>>>> unexpectedly for the client
>>> You are assuming that the leader and the follower agree that the TCP
>>> session drops at the same time.  When there are network problems, this
>>> may not be true.  The leader may still think the previous TCP session is
>>> active.  In that case, we have to keep the incremental fetch session
>>> state around until we learn otherwise (which could be up to that 2 hour
>>> timeout I mentioned).  And if we get a new incoming incremental fetch
>>> request, we can't assume that it replaces the previous one, because the
>>> IDs will be different (the new one starts a new session).
>> As mentioned, no reason to fear some time-outs out of our control
>>>>> Imagine that I made an argument that client IDs are "complex" and should
>>>>> be removed from our APIs.  After all, we can just look at the remote
IP
>>>>> address and TCP port of each connection.  Would you think that was a
>>>>> good idea?  The client ID is useful when looking at logs.  For example,
>>>>> if a rebalance is having problems, you want to know what clients were
>>>>> having a problem.  So having the client ID field to guide you is
>>>>> actually much less "complex" in practice than not having an ID.
>>>> I still cant follow why the correlation idea will not help here.
>>>> Correlating logs with it usually works great. Even with primitive tools
>>>> like grep
>>> The correlation ID does help somewhat, but certainly not as much as a
>>> unique 64-bit ID.  The correlation ID is not unique in the broker, just
>>> unique to a single NetworkClient.  Simiarly, the correlation ID is not
>>> unique on the client side, if there are multiple Consumers, etc.
>> Can always bump entropy in correlation IDs, never had a problem
>> of finding to many duplicates. Would be a different KIP though.
>>>>> Similarly, if metadata responses had epoch numbers (simple incrementing
>>>>> numbers), we would not have to debug problems like clients accidentally
>>>>> getting old metadata from servers that had been partitioned off from
the
>>>>> network for a while.  Clients would know the difference between old and
>>>>> new metadata.  So putting epochs in to the metadata request is much less
>>>>> "complex" operationally, even though it's an extra field in the request.
>>>>>     This has been discussed before on the mailing list.
>>>>>
>>>>> So I think the bottom line for me is that having the session ID and
>>>>> session epoch, while it adds two extra fields, reduces operational
>>>>> complexity and increases debuggability.  It avoids tightly coupling us
>>>>> to assumptions about reliable ordered delivery which tend to be violated
>>>>> in practice in multiple layers of the stack.  Finally, it  avoids the
>>>>> necessity of refactoring NetworkClient.
>>>> So there is stacks out there that violate TCP guarantees? And software
>>>> still works? How can this be? Can you elaborate a little where this
>>>> can be violated? I am not very familiar with virtualized environments
>>>> but they can't really violate TCP contracts.
>>> TCP's guarantees of reliable, in-order transmission certainly can be
>>> violated.  For example, I once had to debug a cluster where a certain
>>> node had a network card which corrupted its transmissions occasionally.
>>> With all the layers of checksums, you would think that this was not
>>> possible, but it happened.  We occasionally got corrupted data written
>>> to disk on the other end because of it.  Even more frustrating, the data
>>> was not corrupted on disk on the sending node-- it was a bug in the
>>> network card driver that was injecting the errors.
>> true, but your broker might aswell read a corrupted 600GB as size from
>> the network and die with OOM instantly.
> If you read 600 GB as the size from the network, you will not "die with
> OOM instantly."  That would be a bug.  Instead, you will notice that 600
> GB is greater than max.message.bytes, and close the connection.
We only check max.message.bytes to late to guard against consumer stalling.
we dont have a notion of max.networkpacket.size before we allocate the 
bytebuffer to read it into.

>
>> Optimizing for still having functional
>> software under this circumstances is not reasonable.
>> You want to get rid of such a
>> node ASAP and pray that zookeepers ticks get corrupted often enough
>> that it finally drops out of the cluster.
>>
>> There is a good reason that these kinda things
>> https://issues.apache.org/jira/browse/MESOS-4105
>> don't end up as kafka Jiras. In the end you can't run any software in
>> these containers anymore. Application layer checksums are a neat thing to
>> fail fast but trying to cope with this probably causes more bad than
>> good.  So I would argue that we shouldn't try this for the fetch requests.
> One of the goals of Apache Kafka is to be "a streaming platform...
> [that] lets you store streams of records in a fault-tolerant way."  For
> more information, see https://kafka.apache.org/intro .  Fault-tolerance
> is explicitly part of the goal of Kafka.  Prayer should be optional, not
> required, when running the software.
Yes, we need to fail ASAP when we read corrupted packages. It seemed
to me like you tried to make the case for pray and try to stay alive. Fault
tolerance here means. I am a fishy box i am going to let a good box handle
it and be silent until i get fixed up.
>
> Crashing because someone sent you a bad packet is not reasonable
> behavior.  It is a bug.  Similarly, bringing down the whole cluster,
> which could a hundred nodes, because someone had a bad network adapter
> is not reasonable behavior.  It is perhaps reasonable for the cluster to
> perform worse when hardware is having problems.  But that's a different
> discussion.
See above.
>
> best,
> Colin
>
>>
>>> However, my point was not about TCP's guarantees being violated.  My
>>> point is that TCP's guarantees are only one small building block to
>>> build a robust distributed system.  TCP basically just says that if you
>>> get any bytes from the stream, you will get the ones that were sent by
>>> the sender, in the order they were sent.  TCP does not guarantee that
>>> the bytes you send will get there.  It does not guarantee that if you
>>> close the connection, the other end will know about it in a timely
>>> fashion.
>> These are very powerful grantees and since we use TCP we should
>> piggy pack everything that is reasonable on to it. IMO there is no
>> need to reimplement correct sequencing again if you get that from
>> your transport layer. It saves you the complexity, it makes
>> you application behave way more naturally and your api easier to
>> understand.
>>
>> There is literally nothing the Kernel wont let you decide
>> especially not any timings. Only noticeable exception being TIME_WAIT
>> of usually 240 seconds but that already has little todo with the broker
>> itself and
>> if we are running out of usable ports because of this then expiring
>> fetch requests
>> wont help much anyways.
>>
>> I hope I could strengthen the trust you have in userland TCP connection
>> management. It is really powerful and can be exploited for maximum gains
>> without much risk in my opinion.
>>
>>
>>
>>> It does not guarantee that the bytes will be received in a
>>> certain timeframe, and certainly doesn't guarantee that if you send a
>>> byte on connection X and then on connection Y, that the remote end will
>>> read a byte on X before reading a byte on Y.
>> Noone expects this from two independent paths of any kind.
>>
>>> best,
>>> Colin
>>>
>>>> Hope this made my view clearer, especially the first part.
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>> best,
>>>>> Colin
>>>>>
>>>>>
>>>>>> If there is an error such as NotLeaderForPartition is
>>>>>> returned for some partitions, the follower can always send a full
>>>>>> FetchRequest. Is there a scenario that only some of the partitions
in a
>>>>>> FetchResponse is lost?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>>
>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<cmccabe@apache.org>
 wrote:
>>>>>>
>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<cmccabe@apache.org>
>>>>>>> wrote:
>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
>>>>>>>>>> Hey Colin,
>>>>>>>>>>
>>>>>>>>>> Thanks much for the update. I have a few questions
below:
>>>>>>>>>>
>>>>>>>>>> 1. I am not very sure that we need Fetch Session
Epoch. It seems that
>>>>>>>>>> Fetch
>>>>>>>>>> Session Epoch is only needed to help leader distinguish
between "a
>>>>>>> full
>>>>>>>>>> fetch request" and "a full fetch request and request
a new
>>>>>>> incremental
>>>>>>>>>> fetch session". Alternatively, follower can also
indicate "a full
>>>>>>> fetch
>>>>>>>>>> request and request a new incremental fetch session"
by setting Fetch
>>>>>>>>>> Session ID to -1 without using Fetch Session Epoch.
Does this make
>>>>>>> sense?
>>>>>>>>> Hi Dong,
>>>>>>>>>
>>>>>>>>> The fetch session epoch is very important for ensuring
correctness.  It
>>>>>>>>> prevents corrupted or incomplete fetch data due to network
reordering
>>>>>>> or
>>>>>>>>> loss.
>>>>>>>>>
>>>>>>>>> For example, consider a scenario where the follower sends
a fetch
>>>>>>>>> request to the leader.  The leader responds, but the
response is lost
>>>>>>>>> because of network problems which affected the TCP session.
 In that
>>>>>>>>> case, the follower must establish a new TCP session and
re-send the
>>>>>>>>> incremental fetch request.  But the leader does not know
that the
>>>>>>>>> follower didn't receive the previous incremental fetch
response.  It is
>>>>>>>>> only the incremental fetch epoch which lets the leader
know that it
>>>>>>>>> needs to resend that data, and not data which comes afterwards.
>>>>>>>>>
>>>>>>>>> You could construct similar scenarios with message reordering,
>>>>>>>>> duplication, etc.  Basically, this is a stateful protocol
on an
>>>>>>>>> unreliable network, and you need to know whether the
follower got the
>>>>>>>>> previous data you sent before you move on.  And you need
to handle
>>>>>>>>> issues like duplicated or delayed requests.  These issues
do not affect
>>>>>>>>> the full fetch request, because it is not stateful--
any full fetch
>>>>>>>>> request can be understood and properly responded to in
isolation.
>>>>>>>>>
>>>>>>>> Thanks for the explanation. This makes sense. On the other
hand I would
>>>>>>>> be interested in learning more about whether Becket's solution
can help
>>>>>>>> simplify the protocol by not having the echo field and whether
that is
>>>>>>>> worth doing.
>>>>>>> Hi Dong,
>>>>>>>
>>>>>>> I commented about this in the other thread.  A solution which
doesn't
>>>>>>> maintain session information doesn't work here.
>>>>>>>
>>>>>>>>>> 2. It is said that Incremental FetchRequest will
include partitions
>>>>>>> whose
>>>>>>>>>> fetch offset or maximum number of fetch bytes has
been changed. If
>>>>>>>>>> follower's logStartOffet of a partition has changed,
should this
>>>>>>>>>> partition also be included in the next FetchRequest
to the leader?
>>>>>>>>> Otherwise, it
>>>>>>>>>> may affect the handling of DeleteRecordsRequest because
leader may
>>>>>>> not
>>>>>>>>> know
>>>>>>>>>> the corresponding data has been deleted on the follower.
>>>>>>>>> Yeah, the follower should include the partition if the
logStartOffset
>>>>>>>>> has changed.  That should be spelled out on the KIP.
 Fixed.
>>>>>>>>>
>>>>>>>>>> 3. In the section "Per-Partition Data", a partition
is not considered
>>>>>>>>>> dirty if its log start offset has changed. Later
in the section
>>>>>>>>> "FetchRequest
>>>>>>>>>> Changes", it is said that incremental fetch responses
will include a
>>>>>>>>>> partition if its logStartOffset has changed. It seems
inconsistent.
>>>>>>> Can
>>>>>>>>>> you update the KIP to clarify it?
>>>>>>>>>>
>>>>>>>>> In the "Per-Partition Data" section, it does say that
logStartOffset
>>>>>>>>> changes make a partition dirty, though, right?  The first
bullet point
>>>>>>>>> is:
>>>>>>>>>
>>>>>>>>>> * The LogCleaner deletes messages, and this changes
the log start
>>>>>>> offset
>>>>>>>>> of the partition on the leader., or
>>>>>>>>>
>>>>>>>> Ah I see. I think I didn't notice this because statement
assumes that the
>>>>>>>> LogStartOffset in the leader only changes due to LogCleaner.
In fact the
>>>>>>>> LogStartOffset can change on the leader due to either log
retention and
>>>>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner
can change
>>>>>>>> LogStartOffset though. It may be a bit better to just say
that a
>>>>>>>> partition is considered dirty if LogStartOffset changes.
>>>>>>> I agree.  It should be straightforward to just resend the partition
if
>>>>>>> logStartOffset changes.
>>>>>>>
>>>>>>>>>> 4. In "Fetch Session Caching" section, it is said
that each broker
>>>>>>> has a
>>>>>>>>>> limited number of slots. How is this number determined?
Does this
>>>>>>> require
>>>>>>>>>> a new broker config for this number?
>>>>>>>>> Good point.  I added two broker configuration parameters
to control
>>>>>>> this
>>>>>>>>> number.
>>>>>>>>>
>>>>>>>> I am curious to see whether we can avoid some of these new
configs. For
>>>>>>>> example, incremental.fetch.session.cache.slots.per.broker
is probably
>>>>>>> not
>>>>>>>> necessary because if a leader knows that a FetchRequest comes
from a
>>>>>>>> follower, we probably want the leader to always cache the
information
>>>>>>>> from that follower. Does this make sense?
>>>>>>> Yeah, maybe we can avoid having
>>>>>>> incremental.fetch.session.cache.slots.per.broker.
>>>>>>>
>>>>>>>> Maybe we can discuss the config later after there is agreement
on how the
>>>>>>>> protocol would look like.
>>>>>>>>
>>>>>>>>
>>>>>>>>>> What is the error code if broker does
>>>>>>>>>> not have new log for the incoming FetchRequest?
>>>>>>>>> Hmm, is there a typo in this question?  Maybe you meant
to ask what
>>>>>>>>> happens if there is no new cache slot for the incoming
FetchRequest?
>>>>>>>>> That's not an error-- the incremental fetch session ID
just gets set to
>>>>>>>>> 0, indicating no incremental fetch session was created.
>>>>>>>>>
>>>>>>>> Yeah there is a typo. You have answered my question.
>>>>>>>>
>>>>>>>>
>>>>>>>>>> 5. Can you clarify what happens if follower adds
a partition to the
>>>>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest?
Does leader
>>>>>>>>>> needs to generate a new session for this ReplicaFetcherThread
or
>>>>>>> does it
>>>>>>>>> re-use
>>>>>>>>>> the existing session?  If it uses a new session,
is the old session
>>>>>>>>>> actively deleted from the slot?
>>>>>>>>> The basic idea is that you can't make changes, except
by sending a full
>>>>>>>>> fetch request.  However, perhaps we can allow the client
to re-use its
>>>>>>>>> existing session ID.  If the client sets sessionId =
id, epoch = 0, it
>>>>>>>>> could re-initialize the session.
>>>>>>>>>
>>>>>>>> Yeah I agree with the basic idea. We probably want to understand
more
>>>>>>>> detail about how this works later.
>>>>>>> Sounds good.  I updated the KIP with this information.  A
>>>>>>> re-initialization should be exactly the same as an initialization,
>>>>>>> except that it reuses an existing ID.
>>>>>>>
>>>>>>> best,
>>>>>>> Colin
>>>>>>>
>>>>>>>
>>>>>>>>>> BTW, I think it may be useful if the KIP can include
the example
>>>>>>> workflow
>>>>>>>>>> of how this feature will be used in case of partition
change and so
>>>>>>> on.
>>>>>>>>> Yeah, that might help.
>>>>>>>>>
>>>>>>>>> best,
>>>>>>>>> Colin
>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Dong
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe<cmccabe@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I updated the KIP with the ideas we've been discussing.
>>>>>>>>>>>
>>>>>>>>>>> best,
>>>>>>>>>>> Colin
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe
wrote:
>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak
wrote:
>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it
can become a really
>>>>>>> useful
>>>>>>>>> thing.
>>>>>>>>>>>>> I just scanned through the discussion
so far and wanted to
>>>>>>> start a
>>>>>>>>>>>>> thread to make as decision about keeping
the
>>>>>>>>>>>>> cache with the Connection / Session or
having some sort of UUID
>>>>>>>>> indN
>>>>>>>>>>> exed
>>>>>>>>>>>>> global Map.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sorry if that has been settled already
and I missed it. In this
>>>>>>>>> case
>>>>>>>>>>>>> could anyone point me to the discussion?
>>>>>>>>>>>> Hi Jan,
>>>>>>>>>>>>
>>>>>>>>>>>> I don't think anyone has discussed the idea
of tying the cache
>>>>>>> to an
>>>>>>>>>>>> individual TCP session yet.  I agree that
since the cache is
>>>>>>>>> intended to
>>>>>>>>>>>> be used only by a single follower or client,
it's an interesting
>>>>>>>>> thing
>>>>>>>>>>>> to think about.
>>>>>>>>>>>>
>>>>>>>>>>>> I guess the obvious disadvantage is that
whenever your TCP
>>>>>>> session
>>>>>>>>>>>> drops, you have to make a full fetch request
rather than an
>>>>>>>>> incremental
>>>>>>>>>>>> one.  It's not clear to me how often this
happens in practice --
>>>>>>> it
>>>>>>>>>>>> probably depends a lot on the quality of
the network.  From a
>>>>>>> code
>>>>>>>>>>>> perspective, it might also be a bit difficult
to access data
>>>>>>>>> associated
>>>>>>>>>>>> with the Session from classes like KafkaApis
(although we could
>>>>>>>>> refactor
>>>>>>>>>>>> it to make this easier).
>>>>>>>>>>>>
>>>>>>>>>>>> It's also clear that even if we tie the cache
to the session, we
>>>>>>>>> still
>>>>>>>>>>>> have to have limits on the number of caches
we're willing to
>>>>>>> create.
>>>>>>>>>>>> And probably we should reserve some cache
slots for each
>>>>>>> follower, so
>>>>>>>>>>>> that clients don't take all of them.
>>>>>>>>>>>>
>>>>>>>>>>>>> Id rather see a protocol in which the
client is hinting the
>>>>>>> broker
>>>>>>>>>>> that,
>>>>>>>>>>>>> he is going to use the feature instead
of a client
>>>>>>>>>>>>> realizing that the broker just offered
the feature (regardless
>>>>>>> of
>>>>>>>>>>>>> protocol version which should only indicate
that the feature
>>>>>>>>>>>>> would be usable).
>>>>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."
 I do think that
>>>>>>> the
>>>>>>>>>>>> server should have the option of not accepting
incremental
>>>>>>> requests
>>>>>>>>> from
>>>>>>>>>>>> specific clients, in order to save memory
space.
>>>>>>>>>>>>
>>>>>>>>>>>>> This seems to work better with a per
>>>>>>>>>>>>> connection/session attached Metadata
than with a Map and could
>>>>>>>>> allow
>>>>>>>>>>> for
>>>>>>>>>>>>> easier client implementations.
>>>>>>>>>>>>> It would also make Client-side code easier
as there wouldn't
>>>>>>> be any
>>>>>>>>>>>>> Cache-miss error Messages to handle.
>>>>>>>>>>>> It is nice not to have to handle cache-miss
responses, I agree.
>>>>>>>>>>>> However, TCP sessions aren't exposed to most
of our client-side
>>>>>>> code.
>>>>>>>>>>>> For example, when the Producer creates a
message and hands it
>>>>>>> off to
>>>>>>>>> the
>>>>>>>>>>>> NetworkClient, the NC will transparently
re-connect and re-send a
>>>>>>>>>>>> message if the first send failed.  The higher-level
code will
>>>>>>> not be
>>>>>>>>>>>> informed about whether the TCP session was
re-established,
>>>>>>> whether an
>>>>>>>>>>>> existing TCP session was used, and so on.
 So overall I would
>>>>>>> still
>>>>>>>>> lean
>>>>>>>>>>>> towards not coupling this to the TCP session...
>>>>>>>>>>>>
>>>>>>>>>>>> best,
>>>>>>>>>>>> Colin
>>>>>>>>>>>>
>>>>>>>>>>>>>      Thank you again for the KIP. And
again, if this was clarified
>>>>>>>>> already
>>>>>>>>>>>>> please drop me a hint where I could read
about it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I created a KIP to improve the scalability
and latency of
>>>>>>>>>>> FetchRequest:
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
>>>>>>>>>>> Partition+Scalability
>>>>>>>>>>>>>> Please take a look.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> cheers,
>>>>>>>>>>>>>> Colin


Mime
View raw message