kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion
Date Sat, 10 Mar 2018 09:40:19 GMT
Hi Jun, thanks for your mail.

Thank you for your questions!
I think they are really good and tackle the core of the problem I see.

I will answer inline, mostly but still want to set the tone here.

The core strength of kafka is what Martin once called the 
kappa-Architecture. How does this work?
You have everything as a log as in kafka. When you need to change 
You create the new version of your application and leave it running in 
Once the new version is good you switch your users to use the new 

The online reshuffling effectively breaks this architecture and I think 
the switch in thinking here is more harmful
than any details about the partitioning function to allow such a change. 
I feel with my suggestion we are the closest to
the original and battle proven architecture and I can only warn to move 
away from it.

I might have forgotten something, sometimes its hard for me to getting 
all the thoughts captured in a mail, but I hope the comments inline will 
further make my concern clear, and put some emphasis on why I prefer my 
solution ;)

One thing we should all be aware of when discussing this, and I think 
Dong should have mentioned it (maybe he did).
We are not discussing all of this out of thin air but there is an effort 
in the Samza project.


To be clear. I think SEP-5 (state of last week, dont know if it adapted 
to this discussion) is on a way better path than KIP-253, and I can't 
really explain why.

Best Jan,

nice weekend everyone

On 09.03.2018 03:36, Jun Rao wrote:
> Hi, Jan,
> Thanks for the feedback. Just some comments on the earlier points that you
> mentioned.
> 50. You brought up the question of whether existing data needs to be copied
> during partition expansion. My understand of your view is that avoid
> copying existing data will be more efficient, but it doesn't work well with
> compacted topics since some keys in the original partitions will never be
> cleaned. It would be useful to understand your use case of compacted topics
> a bit more. In the common use case, the data volume in a compacted topic
> may not be large. So, I am not sure if there is a strong need to expand
> partitions in a compacted topic, at least initially.
I do agree. State is usually smaller. Update rates might be also 
competitively high.
Doing Log-compaction (even beeing very efficient and configurable) is 
also a more expensive operation than
just discarding old segments. Further if you want to use more consumers 
processing the events
you also have to grow the number of partitions. Especially for use-cases 
we do (KIP-213) a tiny state full
table might be very expensive to process if it joins against a huge table.

I can just say we have been in the spot of needing to grow log compacted 
topics. Mainly for processing power we can bring to the table.

Further i am not at all concerned about the extra spaced used by 
"garbage keys". I am more concerned about the correctness of innocent 
consumers. The logic becomes complicated. Say for streams one would need 
to load the record into state but not forward it the topology ( to have 
it available for shuffeling). I rather have it simple and a topic clean 
regardless if it still has its old partition count. Especially with 
multiple partitions growth's I think it becomes insanely hard to to this 
shuffle correct. Maybe Streams and Samza can do it. Especially if you do 
"hipster stream processing" 

This makes kafka way to complicated. With my approach I think its way 
simpler because the topic has no "history" in terms of partitioning but 
is always clean.

> 51. "Growing the topic by an integer factor does not require any state
> redistribution at all." Could you clarify this a bit more? Let's say you
> have a consumer app that computes the windowed count per key. If you double
> the number of partitions from 1 to 2 and grow the consumer instances from 1
> to 2, we would need to redistribute some of the counts to the new consumer
> instance. Regarding to linear hashing, it's true that it won't solve the
> problem with compacted topics. The main benefit is that it redistributes
> the keys in one partition to no more than two partitions, which could help
> redistribute the state.
You don't need to spin up a new consumer in this case. every consumer 
would just read every partition with the (partition % num_task) task. it 
will still have the previous data right there and can go on.

This sounds contradictory to what I said before, but please bear with me.
> 52. Good point on coordinating the expansion of 2 topics that need to be
> joined together. This is where the 2-phase partition expansion could
> potentially help. In the first phase, we could add new partitions to the 2
> topics one at a time but without publishing to the new patitions. Then, we
> can add new consumer instances to pick up the new partitions. In this
> transition phase, no reshuffling is needed since no data is coming from the
> new partitions. Finally, we can enable the publishing to the new
> partitions.
I think its even worse than you think. I would like to introduce the 
Term transitive copartitioning. Imagine
2 streams application. One joins (A,B) the other (B,C) then there is a 
transitive copartition requirement for
(A,C) to be copartitioned aswell. This can spread significantly and 
require many consumers to adapt at the same time.

It is also not entirely clear to me how you not need reshuffling in this 
case. If A has a record that never gets updated after the expansion and 
the coresponding B record moves to a new partition. How shall they meet 
w/o shuffle?
> 53. "Migrating consumer is a step that might be made completly unnecessary
> if - for example streams - takes the gcd as partitioning scheme instead of
> enforcing 1 to 1." Not sure that I fully understand this. I think you mean
> that a consumer application can run more instances than the number of
> partitions. In that case, the consumer can just repartitioning the input
> data according to the number of instances. This is possible, but just has
> the overhead of reshuffling the data.
No what I meant is ( that is also your question i think Mathias) that if 
you grow a topic by a factor.
Even if your processor is statefull you can can just assign all the 
multiples of the previous partition to
this consumer and the state to keep processing correctly will be present 
w/o any shuffling.

Say you have an assignment
Statefull consumer => partition
0 => 0
1 => 1
2 => 2

and you grow you topic by 4 you get,

0 => 0,3,6,9
1 => 1,4,7,10
2 => 2,5,8,11

Say your hashcode is 8. 8%3 => 2  before so consumer for partition 2 has it.
Now you you have 12 partitions so 8%12 => 8, so it goes into partition 8 
which is assigned to the same consumer
who had 2 before and therefore knows the key.

Userland reshuffeling is there as an options. And it does exactly what I 
suggest. And I think its the perfect strategie. All I am suggestion is 
broker side support to switch the producers to the newly partitioned 
topic. Then the old (to few partition topic) can go away.  Remember the 
list of steps in the beginning of this thread. If one has broker support 
for all where its required and streams support for those that aren’t 
necessarily. Then one has solved the problem.
I repeat it because I think its important. I am really happy that you 
brought that up! because its 100% what I want just with the differences 
to have an option to discard the to small topic later (after all 
consumers adapted). And to have order correct there. I need broker 
support managing the copy process + the produces and fence them against 
each other. I also repeat. the copy process can run for weeks in the 
worst case. Copying the data is not the longest task migrating consumers 
might very well be.
Once all consumers switched and copying is really up to date (think ISR 
like up to date) only then we stop the producer, wait for the copy to 
finish and use the new topic for producing.

After this the topic is perfect in shape. and no one needs to worry 
about complicated stuff. (old keys hanging around might arrive in some 
other topic later.....). can only imagine how many tricky bugs gonna 
arrive after someone had grown and shrunken is topic 10 times.

> 54. "The other thing I wanted to mention is that I believe the current
> suggestion (without copying data over) can be implemented in pure userland
> with a custom partitioner and a small feedbackloop from ProduceResponse =>
> Partitionier in coorporation with a change management system." I am not
> sure a customized partitioner itself solves the problem. We probably need
> some broker side support to enforce when the new partitions can be used. We
> also need some support on the consumer/kstream side to preserve the per key
> ordering and potentially migrate the processing state. This is not trivial
> and I am not sure if it's ideal to fully push to the application space.
Broker support is defenitly the preferred way here. I have nothing 
against broker support.
I tried to say that for what I would preffer - copying the data over, at 
least for log compacted topics -
I would require more broker support than the KIP currently offers.

> Jun
> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>> Hi Dong,
>> are you actually reading my emails, or are you just using the thread I
>> started for general announcements regarding the KIP?
>> I tried to argue really hard against linear hashing. Growing the topic by
>> an integer factor does not require any state redistribution at all. I fail
>> to see completely where linear hashing helps on log compacted topics.
>> If you are not willing to explain to me what I might be overlooking: that
>> is fine.
>> But I ask you to not reply to my emails then. Please understand my
>> frustration with this.
>> Best Jan
>> On 06.03.2018 19:38, Dong Lin wrote:
>>> Hi everyone,
>>> Thanks for all the comments! It appears that everyone prefers linear
>>> hashing because it reduces the amount of state that needs to be moved
>>> between consumers (for stream processing). The KIP has been updated to use
>>> linear hashing.
>>> Regarding the migration endeavor: it seems that migrating producer library
>>> to use linear hashing should be pretty straightforward without
>>> much operational endeavor. If we don't upgrade client library to use this
>>> KIP, we can not support in-order delivery after partition is changed
>>> anyway. Suppose we upgrade client library to use this KIP, if partition
>>> number is not changed, the key -> partition mapping will be exactly the
>>> same as it is now because it is still determined using murmur_hash(key) %
>>> original_partition_num. In other words, this change is backward
>>> compatible.
>>> Regarding the load distribution: if we use linear hashing, the load may be
>>> unevenly distributed because those partitions which are not split may
>>> receive twice as much traffic as other partitions that are split. This
>>> issue can be mitigated by creating topic with partitions that are several
>>> times the number of consumers. And there will be no imbalance if the
>>> partition number is always doubled. So this imbalance seems acceptable.
>>> Regarding storing the partition strategy as per-topic config: It seems not
>>> necessary since we can still use murmur_hash as the default hash function
>>> and additionally apply the linear hashing algorithm if the partition
>>> number
>>> has increased. Not sure if there is any use-case for producer to use a
>>> different hash function. Jason, can you check if there is some use-case
>>> that I missed for using the per-topic partition strategy?
>>> Regarding how to reduce latency (due to state store/load) in stream
>>> processing consumer when partition number changes: I need to read the
>>> Kafka
>>> Stream code to understand how Kafka Stream currently migrate state between
>>> consumers when the application is added/removed for a given job. I will
>>> reply after I finish reading the documentation and code.
>>> Thanks,
>>> Dong
>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <jason@confluent.io>
>>> wrote:
>>> Great discussion. I think I'm wondering whether we can continue to leave
>>>> Kafka agnostic to the partitioning strategy. The challenge is
>>>> communicating
>>>> the partitioning logic from producers to consumers so that the
>>>> dependencies
>>>> between each epoch can be determined. For the sake of discussion, imagine
>>>> you did something like the following:
>>>> 1. The name (and perhaps version) of a partitioning strategy is stored in
>>>> topic configuration when a topic is created.
>>>> 2. The producer looks up the partitioning strategy before writing to a
>>>> topic and includes it in the produce request (for fencing). If it doesn't
>>>> have an implementation for the configured strategy, it fails.
>>>> 3. The consumer also looks up the partitioning strategy and uses it to
>>>> determine dependencies when reading a new epoch. It could either fail or
>>>> make the most conservative dependency assumptions if it doesn't know how
>>>> to
>>>> implement the partitioning strategy. For the consumer, the new interface
>>>> might look something like this:
>>>> // Return the partition dependencies following an epoch bump
>>>> Map<Integer, List<Integer>> dependencies(int
>>>> numPartitionsBeforeEpochBump,
>>>> int numPartitionsAfterEpochBump)
>>>> The unordered case then is just a particular implementation which never
>>>> has
>>>> any epoch dependencies. To implement this, we would need some way for the
>>>> consumer to find out how many partitions there were in each epoch, but
>>>> maybe that's not too unreasonable.
>>>> Thanks,
>>>> Jason
>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
>>>> wrote:
>>>> Hi Dong
>>>>> thank you very much for your questions.
>>>>> regarding the time spend copying data across:
>>>>> It is correct that copying data from a topic with one partition mapping
>>>> to
>>>>> a topic with a different partition mapping takes way longer than we can
>>>>> stop producers. Tens of minutes is a very optimistic estimate here. Many
>>>>> people can not afford copy full steam and therefore will have some rate
>>>>> limiting in place, this can bump the timespan into the day's. The good
>>>> part
>>>>> is that the vast majority of the data can be copied while the producers
>>>> are
>>>>> still going. One can then, piggyback the consumers ontop of this
>>>> timeframe,
>>>>> by the method mentioned (provide them an mapping from their old offsets
>>>> to
>>>>> new offsets in their repartitioned topics. In that way we separate
>>>>> migration of consumers from migration of producers (decoupling these
>>>>> what kafka is strongest at). The time to actually swap over the
>>>>> producers
>>>>> should be kept minimal by ensuring that when a swap attempt is started
>>>> the
>>>>> consumer copying over should be very close to the log end and is
>>>>> expected
>>>>> to finish within the next fetch. The operation should have a time-out
>>>>> and
>>>>> should be "reattemtable".
>>>>> Importance of logcompaction:
>>>>> If a producer produces key A, to partiton 0, its forever gonna be there,
>>>>> unless it gets deleted. The record might sit in there for years. A new
>>>>> producer started with the new partitions will fail to delete the record
>>>> in
>>>>> the correct partition. Th record will be there forever and one can not
>>>>> reliable bootstrap new consumers. I cannot see how linear hashing can
>>>> solve
>>>>> this.
>>>>> Regarding your skipping of userland copying:
>>>>> 100%, copying the data across in userland is, as far as i can see, only
>>>>> a
>>>>> usecase for log compacted topics. Even for logcompaction + retentions
>>>>> should only be opt-in. Why did I bring it up? I think log compaction
>>>>> a
>>>>> very important feature to really embrace kafka as a "data plattform".
>>>>> The
>>>>> point I also want to make is that copying data this way is completely
>>>>> inline with the kafka architecture. it only consists of reading and
>>>> writing
>>>>> to topics.
>>>>> I hope it clarifies more why I think we should aim for more than the
>>>>> current KIP. I fear that once the KIP is done not much more effort will
>>>> be
>>>>> taken.
>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>>>>> Hey Jan,
>>>>>> In the current proposal, the consumer will be blocked on waiting
>>>>> other
>>>>> consumers of the group to consume up to a given offset. In most cases,
>>>>> all
>>>>> consumers should be close to the LEO of the partitions when the
>>>>> partition
>>>>> expansion happens. Thus the time waiting should not be long e.g. on the
>>>>>> order of seconds. On the other hand, it may take a long time to wait
>>>>>> for
>>>>>> the entire partition to be copied -- the amount of time is proportional
>>>>> to
>>>>> the amount of existing data in the partition, which can take tens of
>>>>>> minutes. So the amount of time that we stop consumers may not be
on the
>>>>>> same order of magnitude.
>>>>>> If we can implement this suggestion without copying data over in
>>>>>> userland, it will be much more valuable. Do you have ideas on how
>>>>> can
>>>>> be done?
>>>>>> Not sure why the current KIP not help people who depend on log
>>>>> compaction.
>>>>> Could you elaborate more on this point?
>>>>>> Thanks,
>>>>>> Dong
>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago.
>>>>>> com
>>>>>> wrote:
>>>>>> Hi Dong,
>>>>>>> I tried to focus on what the steps are one can currently perform
>>>>>>> expand
>>>>>>> or shrink a keyed topic while maintaining a top notch semantics.
>>>>>>> I can understand that there might be confusion about "stopping
>>>>>>> consumer". It is exactly the same as proposed in the KIP. there
>>>>>> to
>>>>> be
>>>>>>> a time the producers agree on the new partitioning. The extra
>>>>>> semantics I
>>>>> want to put in there is that we have a possibility to wait until all
>>>>>> the
>>>>> existing data
>>>>>>> is copied over into the new partitioning scheme. When I say stopping
>>>>>>> think more of having a memory barrier that ensures the ordering.
I am
>>>>>>> still
>>>>>>> aming for latencies  on the scale of leader failovers.
>>>>>>> Consumers have to explicitly adapt the new partitioning scheme
in the
>>>>>>> above scenario. The reason is that in these cases where you are
>>>>>> dependent
>>>>> on a particular partitioning scheme, you also have other topics that
>>>>>> have
>>>>> co-partition enforcements or the kind -frequently. Therefore all your
>>>>>>> other
>>>>>>> input topics might need to grow accordingly.
>>>>>>> What I was suggesting was to streamline all these operations
as best
>>>>>>> as
>>>>>>> possible to have "real" partition grow and shrinkage going on.
>>>>>> Migrating
>>>>> the producers to a new partitioning scheme can be much more streamlined
>>>>>>> with proper broker support for this. Migrating consumer is a
step that
>>>>>>> might be made completly unnecessary if - for example streams
- takes
>>>>>> the
>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect
>>>>>> consumers
>>>>> and other consumers should be fine anyways.
>>>>>>> I hope this makes more clear where I was aiming at. The rest
needs to
>>>>>> be
>>>>> figured out. The only danger i see is that when we are introducing this
>>>>>>> feature as supposed in the KIP, it wont help any people depending
>>>>>> log
>>>>> compaction.
>>>>>>> The other thing I wanted to mention is that I believe the current
>>>>>>> suggestion (without copying data over) can be implemented in
>>>>>>> userland
>>>>>>> with a custom partitioner and a small feedbackloop from
>>>>>>> ProduceResponse
>>>>>>> =>
>>>>>>> Partitionier in coorporation with a change management system.
>>>>>>> Best Jan
>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>>>>>>> Hey Jan,
>>>>>>>> I am not sure if it is acceptable for producer to be stopped
for a
>>>>>>>> while,
>>>>>>>> particularly for online application which requires low latency.
I am
>>>>>>>> also
>>>>>>>> not sure how consumers can switch to a new topic. Does user
>>>>>>> application
>>>>> needs to explicitly specify a different topic for producer/consumer to
>>>>>>>> subscribe to? It will be helpful for discussion if you can
>>>>>>> more
>>>>> detail on the interface change for this solution.
>>>>>>>> Thanks,
>>>>>>>> Dong
>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak<Jan.Filipiak@trivago.
>>>>>>> com
>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> just want to throw my though in. In general the functionality
is very
>>>>>>>>> usefull, we should though not try to find the architecture
to hard
>>>>>>>>> while
>>>>>>>>> implementing.
>>>>>>>>> The manual steps would be to
>>>>>>>>> create a new topic
>>>>>>>>> the mirrormake from the new old topic to the new topic
>>>>>>>>> wait for mirror making to catch up.
>>>>>>>>> then put the consumers onto the new topic
>>>>>>>>>         (having mirrormaker spit out a mapping from old
offsets to
>>>>>>>>> new
>>>>>>>>> offsets:
>>>>>>>>>             if topic is increased by factor X there is
gonna be a
>>>>>>>>> clean
>>>>>>>>> mapping from 1 offset in the old topic to X offsets in
the new
>>>>>>>>> topic,
>>>>>>>>>             if there is no factor then there is no chance
>>>>>>>>> generate a
>>>>>>>>> mapping that can be reasonable used for continuing)
>>>>>>>>>         make consumers stop at appropriate points and
>>>>>>>>> consumption
>>>>>>>>> with offsets from the mapping.
>>>>>>>>> have the producers stop for a minimal time.
>>>>>>>>> wait for mirrormaker to finish
>>>>>>>>> let producer produce with the new metadata.
>>>>>>>>> Instead of implementing the approach suggest in the KIP
which will
>>>>>>>>> leave
>>>>>>>>> log compacted topic completely crumbled and unusable.
>>>>>>>>> I would much rather try to build infrastructure to support
>>>>>>>>> mentioned
>>>>>>>>> above operations more smoothly.
>>>>>>>>> Especially having producers stop and use another topic
is difficult
>>>>>>>> and
>>>>> it would be nice if one can trigger "invalid metadata" exceptions for
>>>>>>>>> them
>>>>>>>>> and
>>>>>>>>> if one could give topics aliases so that their produces
with the old
>>>>>>>>> topic
>>>>>>>>> will arrive in the new topic.
>>>>>>>>> The downsides are obvious I guess ( having the same data
twice for
>>>>>>>> the
>>>>> transition period, but kafka tends to scale well with datasize). So
>>>>>>>>> its a
>>>>>>>>> nicer fit into the architecture.
>>>>>>>>> I further want to argument that the functionality by
the KIP can
>>>>>>>>> completely be implementing in "userland" with a custom
>>>>>>>> that
>>>>> handles the transition as needed. I would appreciate if someone could
>>>>>>>>> point
>>>>>>>>> out what a custom partitioner couldn't handle in this
>>>>>>>>> With the above approach, shrinking a topic becomes the
same steps.
>>>>>>>>> Without
>>>>>>>>> loosing keys in the discontinued partitions.
>>>>>>>>> Would love to hear what everyone thinks.
>>>>>>>>> Best Jan
>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>>>>>>> Hi all,
>>>>>>>>> I have created KIP-253: Support in-order message delivery
>>>>>>>>>> partition
>>>>>>>>>> expansion. See
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
>>>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
>>>>>>>>>> .
>>>>>>>>>> This KIP provides a way to allow messages of the
same key from the
>>>>>>>>>> same
>>>>>>>>>> producer to be consumed in the same order they are
produced even if
>>>>>>>>> we
>>>>> expand partition of the topic.
>>>>>>>>>> Thanks,
>>>>>>>>>> Dong

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message