kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion
Date Mon, 05 Mar 2018 12:51:53 GMT
Hi Dong

thank you very much for your questions.

regarding the time spend copying data across:
It is correct that copying data from a topic with one partition mapping 
to a topic with a different partition mapping takes way longer than we 
can stop producers. Tens of minutes is a very optimistic estimate here. 
Many people can not afford copy full steam and therefore will have some 
rate limiting in place, this can bump the timespan into the day's. The 
good part is that the vast majority of the data can be copied while the 
producers are still going. One can then, piggyback the consumers ontop 
of this timeframe, by the method mentioned (provide them an mapping from 
their old offsets to new offsets in their repartitioned topics. In that 
way we separate migration of consumers from migration of producers 
(decoupling these is what kafka is strongest at). The time to actually 
swap over the producers should be kept minimal by ensuring that when a 
swap attempt is started the consumer copying over should be very close 
to the log end and is expected to finish within the next fetch. The 
operation should have a time-out and should be "reattemtable".

Importance of logcompaction:
If a producer produces key A, to partiton 0, its forever gonna be there, 
unless it gets deleted. The record might sit in there for years. A new 
producer started with the new partitions will fail to delete the record 
in the correct partition. Th record will be there forever and one can 
not reliable bootstrap new consumers. I cannot see how linear hashing 
can solve this.

Regarding your skipping of userland copying:
100%, copying the data across in userland is, as far as i can see, only 
a usecase for log compacted topics. Even for logcompaction + retentions 
it should only be opt-in. Why did I bring it up? I think log compaction 
is a very important feature to really embrace kafka as a "data 
plattform". The point I also want to make is that copying data this way 
is completely inline with the kafka architecture. it only consists of 
reading and writing to topics.

I hope it clarifies more why I think we should aim for more than the 
current KIP. I fear that once the KIP is done not much more effort will 
be taken.

On 04.03.2018 02:28, Dong Lin wrote:
> Hey Jan,
> In the current proposal, the consumer will be blocked on waiting for other
> consumers of the group to consume up to a given offset. In most cases, all
> consumers should be close to the LEO of the partitions when the partition
> expansion happens. Thus the time waiting should not be long e.g. on the
> order of seconds. On the other hand, it may take a long time to wait for
> the entire partition to be copied -- the amount of time is proportional to
> the amount of existing data in the partition, which can take tens of
> minutes. So the amount of time that we stop consumers may not be on the
> same order of magnitude.
> If we can implement this suggestion without copying data over in purse
> userland, it will be much more valuable. Do you have ideas on how this can
> be done?
> Not sure why the current KIP not help people who depend on log compaction.
> Could you elaborate more on this point?
> Thanks,
> Dong
> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago.com>
> wrote:
>> Hi Dong,
>> I tried to focus on what the steps are one can currently perform to expand
>> or shrink a keyed topic while maintaining a top notch semantics.
>> I can understand that there might be confusion about "stopping the
>> consumer". It is exactly the same as proposed in the KIP. there needs to be
>> a time the producers agree on the new partitioning. The extra semantics I
>> want to put in there is that we have a possibility to wait until all the
>> existing data
>> is copied over into the new partitioning scheme. When I say stopping I
>> think more of having a memory barrier that ensures the ordering. I am still
>> aming for latencies  on the scale of leader failovers.
>> Consumers have to explicitly adapt the new partitioning scheme in the
>> above scenario. The reason is that in these cases where you are dependent
>> on a particular partitioning scheme, you also have other topics that have
>> co-partition enforcements or the kind -frequently. Therefore all your other
>> input topics might need to grow accordingly.
>> What I was suggesting was to streamline all these operations as best as
>> possible to have "real" partition grow and shrinkage going on. Migrating
>> the producers to a new partitioning scheme can be much more streamlined
>> with proper broker support for this. Migrating consumer is a step that
>> might be made completly unnecessary if - for example streams - takes the
>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect consumers
>> and other consumers should be fine anyways.
>> I hope this makes more clear where I was aiming at. The rest needs to be
>> figured out. The only danger i see is that when we are introducing this
>> feature as supposed in the KIP, it wont help any people depending on  log
>> compaction.
>> The other thing I wanted to mention is that I believe the current
>> suggestion (without copying data over) can be implemented in pure userland
>> with a custom partitioner and a small feedbackloop from ProduceResponse =>
>> Partitionier in coorporation with a change management system.
>> Best Jan
>> On 28.02.2018 07:13, Dong Lin wrote:
>>> Hey Jan,
>>> I am not sure if it is acceptable for producer to be stopped for a while,
>>> particularly for online application which requires low latency. I am also
>>> not sure how consumers can switch to a new topic. Does user application
>>> needs to explicitly specify a different topic for producer/consumer to
>>> subscribe to? It will be helpful for discussion if you can provide more
>>> detail on the interface change for this solution.
>>> Thanks,
>>> Dong
>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak<Jan.Filipiak@trivago.com>
>>> wrote:
>>> Hi,
>>>> just want to throw my though in. In general the functionality is very
>>>> usefull, we should though not try to find the architecture to hard while
>>>> implementing.
>>>> The manual steps would be to
>>>> create a new topic
>>>> the mirrormake from the new old topic to the new topic
>>>> wait for mirror making to catch up.
>>>> then put the consumers onto the new topic
>>>>       (having mirrormaker spit out a mapping from old offsets to new
>>>> offsets:
>>>>           if topic is increased by factor X there is gonna be a clean
>>>> mapping from 1 offset in the old topic to X offsets in the new topic,
>>>>           if there is no factor then there is no chance to generate a
>>>> mapping that can be reasonable used for continuing)
>>>>       make consumers stop at appropriate points and continue consumption
>>>> with offsets from the mapping.
>>>> have the producers stop for a minimal time.
>>>> wait for mirrormaker to finish
>>>> let producer produce with the new metadata.
>>>> Instead of implementing the approach suggest in the KIP which will leave
>>>> log compacted topic completely crumbled and unusable.
>>>> I would much rather try to build infrastructure to support the mentioned
>>>> above operations more smoothly.
>>>> Especially having producers stop and use another topic is difficult and
>>>> it would be nice if one can trigger "invalid metadata" exceptions for
>>>> them
>>>> and
>>>> if one could give topics aliases so that their produces with the old
>>>> topic
>>>> will arrive in the new topic.
>>>> The downsides are obvious I guess ( having the same data twice for the
>>>> transition period, but kafka tends to scale well with datasize). So its a
>>>> nicer fit into the architecture.
>>>> I further want to argument that the functionality by the KIP can
>>>> completely be implementing in "userland" with a custom partitioner that
>>>> handles the transition as needed. I would appreciate if someone could
>>>> point
>>>> out what a custom partitioner couldn't handle in this case?
>>>> With the above approach, shrinking a topic becomes the same steps.
>>>> Without
>>>> loosing keys in the discontinued partitions.
>>>> Would love to hear what everyone thinks.
>>>> Best Jan
>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>> Hi all,
>>>>> I have created KIP-253: Support in-order message delivery with partition
>>>>> expansion. See
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
>>>>> .
>>>>> This KIP provides a way to allow messages of the same key from the same
>>>>> producer to be consumed in the same order they are produced even if we
>>>>> expand partition of the topic.
>>>>> Thanks,
>>>>> Dong

View raw message