kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability
Date Sun, 03 Dec 2017 09:18:14 GMT

On 02.12.2017 23:34, Colin McCabe wrote:
> On Thu, Nov 30, 2017, at 23:29, Jan Filipiak wrote:
>> Hi,
>> this discussion is going a little bit far from what I intended this
>> thread for.
>> I can see all of this beeing related.
>> To let you guys know what I am currently thinking is the following:
>> I do think the handling of Id's and epoch is rather complicated. I think
>> the complexity
>> comes from aiming for to much.
>> 1. Currently all the work is towards making fetchRequest
>> completely empty. This brings all sorts of pain with regards to the
>> broker actually needs
>> to know what he send even though it tries to use sendfile as much as
>> possible.
>> 2. Currently all the work is towards also making empty fetch request
>> across TCP sessions.
>> In this thread I aimed to relax our goals with regards to point 2.
>> Connection resets for us
>> are really the exceptions and I would argue, trying to introduce
>> complexity for sparing
>> 1 full request on connection reset is not worth it. Therefore I argued
>> to keep the Server
>> side information with the Session instead somewhere global. Its not
>> going to bring in the
>> results.
>> As the discussion unvields I also want to challenge our approach for
>> point 1.
>> I do not see a reason to introduce complexity (and
>>    especially on the fetch answer path). Did we consider that from the
>> client we just send the offsets
>> we want to fetch and skip the topic partition description and just use
>> the order to match the information
>> on the broker side again? This would also reduce the fetch sizes a lot
>> while skipping a ton of complexity.
> Hi Jan,
> We need to solve the problem of the fetch request taking
> O(num_partitions) space and time to process.  A solution that keeps the
> O(num_partitions) behavior, but improves it by a constant factor,
> doesn't really solve the problem.  And omitting some partition
> information, but leaving other partition information in place,
> definitely falls in that category, wouldn't you agree?  Also, as others
> have noted, if you omit the partition IDs, you run into a lot of
> problems surrounding changes in the partition membership.
> best,
> Colin
Hi Colin,

I agree that a fetch request sending only offsets still growths with the 
number of partitions.
Processing time, I can only follow as it comes to parsing, but I don't 
see a difference in the
work a broker has todo more for received offsets than cached offsets.

Given we still have the 100.000 partition case a fetchrequest as I 
suggest would savely
get below <1MB. How much of an improvement this is really depends on 
your set up.

Say you have all of these in 1 topic you are saving effectively maybe 
50% already.
As you increase topics and depending on how long you topic names are you get
extra savings.
In my playground cluster this is 160 topics, average 10 partitions, 2 
average and mean topic length is 54 and replication factor 2. This would 
result in
a saving of 5,5 bytes / topic-partition fetched. So from currently 21,5 
bytes per topic-partion
it would go down to basically 8, almost 2/3 saving. On our production 
cluster which has
a higher broker to replication factor ratio the savings are bigger. The 
Average of replicated partitions per
topic there is ~3 . This is roughly 75% percent saving in fetch request 
size. For us,
since we have many slowly changing smaller topics, varint encoding of 
offsets would give another big boost
as many fit into 2-3 bytes.

I do not quite understand what it means to omit partition-ids and 
changing ownership. The partition ID can be retrieved
by ordinal position from the brokers cache. The broker serving the fetch 
should not care if this consumer owns the partition in terms of its 
group membership. If the broker should no longer be the leader
of the partition he can return "not leader for partition" as usual. 
Maybe you can point me where this has been explained as I
couldn't really find a place where it got clear to me.

I think 75% saving and more is realistic and even though linear to the 
number of partitions fetch a very practical aproach that fits
the design principles "the consumer decides" a lot better. I am still 
trying to fully understand how the plan is to update the offsets broker
wise. No need to explain that here as I think I know where to look it 
up, I guess that is introduces a lot of complexity with sendfile and
an additional index lookup that I have a hard time believing it will pay 
off. Both in source code complexity and efficiency.

I intend to send you an answer on the other threads as soon as I get to 
it.  Hope this explains my view of
the size trade-off well enough. Would very much appreciate your opinion.

Best Jan

>> Hope these ideas are interesting
>> best Jan
>> On 01.12.2017 01:47, Becket Qin wrote:
>>> Hi Colin,
>>> Thanks for updating the KIP. I have two comments:
>>> 1. The session epoch seems introducing some complexity. It would be good if
>>> we don't have to maintain the epoch.
>>> 2. If all the partitions has data returned (even a few messages), the next
>>> fetch would be equivalent to a full request. This means the clusters with
>>> continuously small throughput may not save much from the incremental fetch.
>>> I am wondering if we can avoid session epoch maintenance and address the
>>> fetch efficiency in general with some modifications to the solution. Not
>>> sure if the following would work, but just want to give my ideas.
>>> To solve 1, the basic idea is to let the leader return the partition data
>>> with its expected client's position for each partition. If the client
>>> disagree with the leader's expectation, a full FetchRequest is then sent to
>>> ask the leader to update the client's position.
>>> To solve 2, when possible, we just let the leader to infer the clients
>>> position instead of asking the clients to provide the position, so the
>>> incremental fetch can be empty in most cases.
>>> More specifically, the protocol will have the following change.
>>> 1. Add a new flag called FullFetch to the FetchRequest.
>>>      1) A full FetchRequest is the same as the current FetchRequest with
>>> FullFetch=true.
>>>      2) An incremental FetchRequest is always empty with FullFetch=false.
>>> 2. Add a new field called ExpectedPosition(INT64) to each partition data in
>>> the FetchResponse.
>>> The leader logic:
>>> 1. The leader keeps a map from client-id (client-uuid) to the interested
>>> partitions of that client. For each interested partition, the leader keeps
>>> the client's position for that client.
>>> 2. When the leader receives a full fetch request (FullFetch=true), the
>>> leader
>>>       1) replaces the interested partitions for the client id with the
>>> partitions in that full fetch request.
>>>       2) updates the client position with the offset specified in that full
>>> fetch request.
>>>       3) if the client is a follower, update the high watermark, etc.
>>> 3. When the leader receives an incremental fetch request (typically empty),
>>> the leader returns the data from all the interested partitions (if any)
>>> according to the position in the interested partitions map.
>>> 4. In the FetchResponse, the leader will include an ExpectedFetchingOffset
>>> that the leader thinks the client is fetching at. The value is the client
>>> position of the partition in the interested partition map. This is just to
>>> confirm with the client that the client position in the leader is correct.
>>> 5. After sending back the FetchResponse, the leader updates the position of
>>> the client's interested partitions. (There may be some overhead for the
>>> leader to know of offsets, but I think the trick of returning at index
>>> entry boundary or log end will work efficiently).
>>> 6. The leader will expire the client interested partitions if the client
>>> hasn't fetch for some time. And if an incremental request is received when
>>> the map does not contain the client info, an error will be returned to the
>>> client to ask for a FullFetch.
>>> The clients logic:
>>> 1. Start with sending a full FetchRequest, including partitions and offsets.
>>> 2. When get a response, check the ExpectedOffsets in the fetch response and
>>> see if that matches the current log end.
>>>       1) If the ExpectedFetchOffset matches the current log end, the next
>>> fetch request will be an incremental fetch request.
>>>       2) if the ExpectedFetchOffset does not match the current log end, the
>>> next fetch request will be a full FetchRequest.
>>> 3. Whenever the partition offset is actively changed (e.g. consumer.seek(),
>>> follower log truncation, etc), a full FetchRequest is sent.
>>> 4. Whenever the interested partition set changes (e.g.
>>> consumer.subscribe()/assign() is called, replica reassignment happens), a
>>> full FetchRequest is sent.
>>> 5. Whenever the client needs to retry a fetch, a FullFetch is sent.
>>> The benefits of this approach are:
>>> 1. Regardless of the traffic pattern in the cluster, In most cases the
>>> fetch request will be empty.
>>> 2. No need to maintain session epochs.
>>> What do you think?
>>> Thanks,
>>> Jiangjie (Becket) Qin
>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe <cmccabe@apache.org> wrote:
>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
>>>>> Hey Colin,
>>>>> Thanks much for the update. I have a few questions below:
>>>>> 1. I am not very sure that we need Fetch Session Epoch. It seems that
>>>>> Fetch
>>>>> Session Epoch is only needed to help leader distinguish between "a full
>>>>> fetch request" and "a full fetch request and request a new incremental
>>>>> fetch session". Alternatively, follower can also indicate "a full fetch
>>>>> request and request a new incremental fetch session" by setting Fetch
>>>>> Session ID to -1 without using Fetch Session Epoch. Does this make sense?
>>>> Hi Dong,
>>>> The fetch session epoch is very important for ensuring correctness.  It
>>>> prevents corrupted or incomplete fetch data due to network reordering or
>>>> loss.
>>>> For example, consider a scenario where the follower sends a fetch
>>>> request to the leader.  The leader responds, but the response is lost
>>>> because of network problems which affected the TCP session.  In that
>>>> case, the follower must establish a new TCP session and re-send the
>>>> incremental fetch request.  But the leader does not know that the
>>>> follower didn't receive the previous incremental fetch response.  It is
>>>> only the incremental fetch epoch which lets the leader know that it
>>>> needs to resend that data, and not data which comes afterwards.
>>>> You could construct similar scenarios with message reordering,
>>>> duplication, etc.  Basically, this is a stateful protocol on an
>>>> unreliable network, and you need to know whether the follower got the
>>>> previous data you sent before you move on.  And you need to handle
>>>> issues like duplicated or delayed requests.  These issues do not affect
>>>> the full fetch request, because it is not stateful-- any full fetch
>>>> request can be understood and properly responded to in isolation.
>>>>> 2. It is said that Incremental FetchRequest will include partitions whose
>>>>> fetch offset or maximum number of fetch bytes has been changed. If
>>>>> follower's logStartOffet of a partition has changed, should this
>>>>> partition also be included in the next FetchRequest to the leader?
>>>> Otherwise, it
>>>>> may affect the handling of DeleteRecordsRequest because leader may not
>>>> know
>>>>> the corresponding data has been deleted on the follower.
>>>> Yeah, the follower should include the partition if the logStartOffset
>>>> has changed.  That should be spelled out on the KIP.  Fixed.
>>>>> 3. In the section "Per-Partition Data", a partition is not considered
>>>>> dirty if its log start offset has changed. Later in the section
>>>> "FetchRequest
>>>>> Changes", it is said that incremental fetch responses will include a
>>>>> partition if its logStartOffset has changed. It seems inconsistent. Can
>>>>> you update the KIP to clarify it?
>>>> In the "Per-Partition Data" section, it does say that logStartOffset
>>>> changes make a partition dirty, though, right?  The first bullet point
>>>> is:
>>>>> * The LogCleaner deletes messages, and this changes the log start offset
>>>> of the partition on the leader., or
>>>>> 4. In "Fetch Session Caching" section, it is said that each broker has
>>>>> limited number of slots. How is this number determined? Does this require
>>>>> a new broker config for this number?
>>>> Good point.  I added two broker configuration parameters to control this
>>>> number.
>>>>> What is the error code if broker does
>>>>> not have new log for the incoming FetchRequest?
>>>> Hmm, is there a typo in this question?  Maybe you meant to ask what
>>>> happens if there is no new cache slot for the incoming FetchRequest?
>>>> That's not an error-- the incremental fetch session ID just gets set to
>>>> 0, indicating no incremental fetch session was created.
>>>>> 5. Can you clarify what happens if follower adds a partition to the
>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader
>>>>> needs to generate a new session for this ReplicaFetcherThread or does
>>>> re-use
>>>>> the existing session?  If it uses a new session, is the old session
>>>>> actively deleted from the slot?
>>>> The basic idea is that you can't make changes, except by sending a full
>>>> fetch request.  However, perhaps we can allow the client to re-use its
>>>> existing session ID.  If the client sets sessionId = id, epoch = 0, it
>>>> could re-initialize the session.
>>>>> BTW, I think it may be useful if the KIP can include the example workflow
>>>>> of how this feature will be used in case of partition change and so on.
>>>> Yeah, that might help.
>>>> best,
>>>> Colin
>>>>> Thanks,
>>>>> Dong
>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe <cmccabe@apache.org>
>>>>> wrote:
>>>>>> I updated the KIP with the ideas we've been discussing.
>>>>>> best,
>>>>>> Colin
>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
>>>>>>>> Hi Colin, thank you  for this KIP, it can become a really
>>>> thing.
>>>>>>>> I just scanned through the discussion so far and wanted to
start a
>>>>>>>> thread to make as decision about keeping the
>>>>>>>> cache with the Connection / Session or having some sort of
>>>> indN
>>>>>> exed
>>>>>>>> global Map.
>>>>>>>> Sorry if that has been settled already and I missed it. In
>>>> case
>>>>>>>> could anyone point me to the discussion?
>>>>>>> Hi Jan,
>>>>>>> I don't think anyone has discussed the idea of tying the cache
to an
>>>>>>> individual TCP session yet.  I agree that since the cache is
>>>> intended to
>>>>>>> be used only by a single follower or client, it's an interesting
>>>> thing
>>>>>>> to think about.
>>>>>>> I guess the obvious disadvantage is that whenever your TCP session
>>>>>>> drops, you have to make a full fetch request rather than an
>>>> incremental
>>>>>>> one.  It's not clear to me how often this happens in practice
-- it
>>>>>>> probably depends a lot on the quality of the network.  From a
>>>>>>> perspective, it might also be a bit difficult to access data
>>>> associated
>>>>>>> with the Session from classes like KafkaApis (although we could
>>>> refactor
>>>>>>> it to make this easier).
>>>>>>> It's also clear that even if we tie the cache to the session,
>>>> still
>>>>>>> have to have limits on the number of caches we're willing to
>>>>>>> And probably we should reserve some cache slots for each follower,
>>>>>>> that clients don't take all of them.
>>>>>>>> Id rather see a protocol in which the client is hinting the
>>>>>> that,
>>>>>>>> he is going to use the feature instead of a client
>>>>>>>> realizing that the broker just offered the feature (regardless
>>>>>>>> protocol version which should only indicate that the feature
>>>>>>>> would be usable).
>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do think that
>>>>>>> server should have the option of not accepting incremental requests
>>>> from
>>>>>>> specific clients, in order to save memory space.
>>>>>>>> This seems to work better with a per
>>>>>>>> connection/session attached Metadata than with a Map and
>>>> allow
>>>>>> for
>>>>>>>> easier client implementations.
>>>>>>>> It would also make Client-side code easier as there wouldn't
be any
>>>>>>>> Cache-miss error Messages to handle.
>>>>>>> It is nice not to have to handle cache-miss responses, I agree.
>>>>>>> However, TCP sessions aren't exposed to most of our client-side
>>>>>>> For example, when the Producer creates a message and hands it
off to
>>>> the
>>>>>>> NetworkClient, the NC will transparently re-connect and re-send
>>>>>>> message if the first send failed.  The higher-level code will
not be
>>>>>>> informed about whether the TCP session was re-established, whether
>>>>>>> existing TCP session was used, and so on.  So overall I would
>>>> lean
>>>>>>> towards not coupling this to the TCP session...
>>>>>>> best,
>>>>>>> Colin
>>>>>>>>     Thank you again for the KIP. And again, if this was clarified
>>>> already
>>>>>>>> please drop me a hint where I could read about it.
>>>>>>>> Best Jan
>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
>>>>>>>>> Hi all,
>>>>>>>>> I created a KIP to improve the scalability and latency
>>>>>> FetchRequest:
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
>>>>>> Partition+Scalability
>>>>>>>>> Please take a look.
>>>>>>>>> cheers,
>>>>>>>>> Colin

View raw message