kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion
Date Fri, 09 Mar 2018 21:23:36 GMT
Thanks for your comment Clemens. It make sense what you are saying.
However, your described pattern is to split partitions and use linear
hashing to avoid random key distribution. But this is what Jan thinks we
should not do...

Also, I just picked an example with 2 -> 3 partitions, but if you don't
use linear hashing I think the same issue occurs if you double the
number of partitions.

I am in favor of using linear hashing. Still think, it is also useful to
split single partitions, too, in case load is not balanced and some
partitions are hot spots while others are "idle".


On 3/9/18 5:41 AM, Clemens Valiente wrote:
> I think it's fair to assume that topics will always be increased by an integer factor
- e.g. from 2 partitions to 4 partitions. Then the mapping is much easier.
> Why anyone would increase partitions by lass than x2 is a mystery to me. If your two
partitions cannot handle the load, then with three partitions each one will still get 67%
of that load which is still way too dangerous.
> So in your case we go from
> part1: A B C D
> part2: E F G H
> to
> part1: A C
> part2: B D
> part3: E F
> part4: G H
> ________________________________
> From: Matthias J. Sax <matthias@confluent.io>
> Sent: 09 March 2018 07:53
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion
> @Jan: You suggest to copy the data from one topic to a new topic, and
> provide an "offset mapping" from the old to the new topic for the
> consumers. I don't quite understand how this would work.
> Let's say there are 2 partitions in the original topic and 3 partitions
> in the new topic. If we assume that we don't use linear hashing as you
> suggest, there is no guarantee how data will be distributed in the new
> topic and also no guarantee about ordering of records in the new topic.
> Example (I hope I got it right -- please correct me if it's wrong)
> A B C D
> E F G H
> could be copied to:
> A C H
> B E F
> D G
> If the consumer was at offset 1 and 2 in the first topic how would the
> mapping be computed? We need to enures that B C D as well as G H are
> read after the switch. Thus, offset would need to be 1 0 0. I am not
> sure how this would be computed?
> Furthermore, I want to point out that the new offsets would imply that E
> is consumed a second time by the consumer. E and F were consumed
> originally, but E is copied after B that was not yet consumed.
> Or is there a way that we can ensure that this "flip" does never happen
> while we copy the data?
> -Matthias
> On 3/8/18 10:32 PM, Matthias J. Sax wrote:
>> As I just mentioned joins:
>> For Kafka Streams it might also be required to change the partition
>> count for multiple topics in a coordinated way that allows to maintain
>> the co-partitioning property that Kafka Streams uses to computed joins.
>> Any thoughts how this could be handled?
>> -Matthias
>> On 3/8/18 10:08 PM, Matthias J. Sax wrote:
>>> Jun,
>>> There is one more case: non-windowed aggregations. For windowed
>>> aggregation, the changelog topic will be compact+delete. However, for
>>> non-windowed aggregation the policy is compact only.
>>> Even if we assume that windowed aggregations are dominant and
>>> non-windowed aggregation are used rarely, it seems to be bad to not
>>> support the feature is a non-windowed aggregation is used. Also,
>>> non-windowed aggregation volume depends on input-stream volume that
>>> might be high.
>>> Furthermore, we support stream-table join and this requires that the
>>> stream and the table are co-partitioned. Thus, even if the table would
>>> have lower volume but the stream must be scaled out, we also need to
>>> scale out the table to preserve co-partitioning.
>>> -Matthias
>>> On 3/8/18 6:44 PM, Jun Rao wrote:
>>>> Hi, Matthis,
>>>> My understanding is that in KStream, the only case when a changelog topic
>>>> needs to be compacted is when the corresponding input is a KTable. In all
>>>> other cases, the changelog topics are of compacted + deletion. So, if most
>>>> KTables are not high volume, there may not be a need to expand its
>>>> partitions and therefore the partitions of the corresponding changelog
>>>> topic.
>>>> Thanks,
>>>> Jun
>>>> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matthias@confluent.io>
>>>> wrote:
>>>>> Jun,
>>>>> thanks for your comment. This should actually work for Streams, because
>>>>> we don't rely on producer "hashing" but specify the partition number
>>>>> explicitly on send().
>>>>> About not allowing to change the number of partition for changelog
>>>>> topics: for Streams, this seems to imply that we need to create a second
>>>>> changelog topic for each store with the new partition count. However,
>>>>> would be unclear when/if we can delete the old topic. Thus, it moves
>>>>> "problem" into the application layer. It's hard to judge for me atm what
>>>>> the impact would be, but it's something we should pay attention to.
>>>>> -Matthias
>>>>> On 3/6/18 3:45 PM, Jun Rao wrote:
>>>>>> Hi, Mattias,
>>>>>> Regarding your comment "If it would be time-delay based, it might
>>>>>> problematic
>>>>>> for Kafka Streams: if we get the information that the new input
>>>>> partitions
>>>>>> are available for producing, we need to enable the new changelog
>>>>> partitions
>>>>>> for producing, too. If those would not be available yet, because
>>>>>> time-delay did not trigger yet, it would be problematic to avoid
>>>>>> crashing.", could you just enable the changelog topic to write to
its new
>>>>>> partitions immediately?  The input topic can be configured with a
>>>>> in
>>>>>> writing to the new partitions. Initially, there won't be new data
>>>>> produced
>>>>>> into the newly added partitions in the input topic. However, we could
>>>>>> prebuild the state for the new input partition and write the state
>>>>> changes
>>>>>> to the corresponding new partitions in the changelog topic.
>>>>>> Hi, Jan,
>>>>>> For a compacted topic, garbage collecting the old keys in the existing
>>>>>> partitions after partition expansion can be tricky as your pointed
out. A
>>>>>> few options here. (a) Let brokers exchange keys across brokers during
>>>>>> compaction. This will add complexity on the broker side. (b) Build
>>>>>> external tool that scans the compacted topic and drop the prefix
of a
>>>>>> partition if all records in the prefix are removable. The admin can
>>>>>> run this tool when the unneeded space needs to be reclaimed. (c)
>>>>>> support partition change in a compacted topic. This might be ok since
>>>>> most
>>>>>> compacted topics are not high volume.
>>>>>> Thanks,
>>>>>> Jun
>>>>>> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindong28@gmail.com>
>>>>>>> Hi everyone,
>>>>>>> Thanks for all the comments! It appears that everyone prefers
>>>>>>> hashing because it reduces the amount of state that needs to
be moved
>>>>>>> between consumers (for stream processing). The KIP has been updated
>>>>> use
>>>>>>> linear hashing.
>>>>>>> Regarding the migration endeavor: it seems that migrating producer
>>>>> library
>>>>>>> to use linear hashing should be pretty straightforward without
>>>>>>> much operational endeavor. If we don't upgrade client library
to use
>>>>> this
>>>>>>> KIP, we can not support in-order delivery after partition is
>>>>>>> anyway. Suppose we upgrade client library to use this KIP, if
>>>>>>> number is not changed, the key -> partition mapping will be
exactly the
>>>>>>> same as it is now because it is still determined using murmur_hash(key)
>>>>> %
>>>>>>> original_partition_num. In other words, this change is backward
>>>>> compatible.
>>>>>>> Regarding the load distribution: if we use linear hashing, the
load may
>>>>> be
>>>>>>> unevenly distributed because those partitions which are not split
>>>>>>> receive twice as much traffic as other partitions that are split.
>>>>>>> issue can be mitigated by creating topic with partitions that
>>>>> several
>>>>>>> times the number of consumers. And there will be no imbalance
if the
>>>>>>> partition number is always doubled. So this imbalance seems acceptable.
>>>>>>> Regarding storing the partition strategy as per-topic config:
It seems
>>>>> not
>>>>>>> necessary since we can still use murmur_hash as the default hash
>>>>> function
>>>>>>> and additionally apply the linear hashing algorithm if the partition
>>>>> number
>>>>>>> has increased. Not sure if there is any use-case for producer
to use a
>>>>>>> different hash function. Jason, can you check if there is some
>>>>>>> that I missed for using the per-topic partition strategy?
>>>>>>> Regarding how to reduce latency (due to state store/load) in
>>>>>>> processing consumer when partition number changes: I need to
read the
>>>>> Kafka
>>>>>>> Stream code to understand how Kafka Stream currently migrate
>>>>> between
>>>>>>> consumers when the application is added/removed for a given job.
I will
>>>>>>> reply after I finish reading the documentation and code.
>>>>>>> Thanks,
>>>>>>> Dong
>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <jason@confluent.io>
>>>>>>> wrote:
>>>>>>>> Great discussion. I think I'm wondering whether we can continue
>>>>> leave
>>>>>>>> Kafka agnostic to the partitioning strategy. The challenge
>>>>>>> communicating
>>>>>>>> the partitioning logic from producers to consumers so that
>>>>>>> dependencies
>>>>>>>> between each epoch can be determined. For the sake of discussion,
>>>>> imagine
>>>>>>>> you did something like the following:
>>>>>>>> 1. The name (and perhaps version) of a partitioning strategy
is stored
>>>>> in
>>>>>>>> topic configuration when a topic is created.
>>>>>>>> 2. The producer looks up the partitioning strategy before
writing to a
>>>>>>>> topic and includes it in the produce request (for fencing).
If it
>>>>> doesn't
>>>>>>>> have an implementation for the configured strategy, it fails.
>>>>>>>> 3. The consumer also looks up the partitioning strategy and
uses it to
>>>>>>>> determine dependencies when reading a new epoch. It could
either fail
>>>>> or
>>>>>>>> make the most conservative dependency assumptions if it doesn't
>>>>> how
>>>>>>> to
>>>>>>>> implement the partitioning strategy. For the consumer, the
>>>>> interface
>>>>>>>> might look something like this:
>>>>>>>> // Return the partition dependencies following an epoch bump
>>>>>>>> Map<Integer, List<Integer>> dependencies(int
>>>>>>> numPartitionsBeforeEpochBump,
>>>>>>>> int numPartitionsAfterEpochBump)
>>>>>>>> The unordered case then is just a particular implementation
which never
>>>>>>> has
>>>>>>>> any epoch dependencies. To implement this, we would need
some way for
>>>>> the
>>>>>>>> consumer to find out how many partitions there were in each
epoch, but
>>>>>>>> maybe that's not too unreasonable.
>>>>>>>> Thanks,
>>>>>>>> Jason
>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <Jan.Filipiak@trivago.com
>>>>>>>> wrote:
>>>>>>>>> Hi Dong
>>>>>>>>> thank you very much for your questions.
>>>>>>>>> regarding the time spend copying data across:
>>>>>>>>> It is correct that copying data from a topic with one
>>>>> mapping
>>>>>>>> to
>>>>>>>>> a topic with a different partition mapping takes way
longer than we
>>>>> can
>>>>>>>>> stop producers. Tens of minutes is a very optimistic
estimate here.
>>>>>>> Many
>>>>>>>>> people can not afford copy full steam and therefore will
have some
>>>>> rate
>>>>>>>>> limiting in place, this can bump the timespan into the
day's. The good
>>>>>>>> part
>>>>>>>>> is that the vast majority of the data can be copied while
>>>>> producers
>>>>>>>> are
>>>>>>>>> still going. One can then, piggyback the consumers ontop
of this
>>>>>>>> timeframe,
>>>>>>>>> by the method mentioned (provide them an mapping from
their old
>>>>> offsets
>>>>>>>> to
>>>>>>>>> new offsets in their repartitioned topics. In that way
we separate
>>>>>>>>> migration of consumers from migration of producers (decoupling
>>>>> is
>>>>>>>>> what kafka is strongest at). The time to actually swap
over the
>>>>>>> producers
>>>>>>>>> should be kept minimal by ensuring that when a swap attempt
is started
>>>>>>>> the
>>>>>>>>> consumer copying over should be very close to the log
end and is
>>>>>>> expected
>>>>>>>>> to finish within the next fetch. The operation should
have a time-out
>>>>>>> and
>>>>>>>>> should be "reattemtable".
>>>>>>>>> Importance of logcompaction:
>>>>>>>>> If a producer produces key A, to partiton 0, its forever
gonna be
>>>>>>> there,
>>>>>>>>> unless it gets deleted. The record might sit in there
for years. A new
>>>>>>>>> producer started with the new partitions will fail to
delete the
>>>>> record
>>>>>>>> in
>>>>>>>>> the correct partition. Th record will be there forever
and one can not
>>>>>>>>> reliable bootstrap new consumers. I cannot see how linear
hashing can
>>>>>>>> solve
>>>>>>>>> this.
>>>>>>>>> Regarding your skipping of userland copying:
>>>>>>>>> 100%, copying the data across in userland is, as far
as i can see,
>>>>>>> only a
>>>>>>>>> usecase for log compacted topics. Even for logcompaction
+ retentions
>>>>>>> it
>>>>>>>>> should only be opt-in. Why did I bring it up? I think
log compaction
>>>>>>> is a
>>>>>>>>> very important feature to really embrace kafka as a "data
>>>>>>> The
>>>>>>>>> point I also want to make is that copying data this way
is completely
>>>>>>>>> inline with the kafka architecture. it only consists
of reading and
>>>>>>>> writing
>>>>>>>>> to topics.
>>>>>>>>> I hope it clarifies more why I think we should aim for
more than the
>>>>>>>>> current KIP. I fear that once the KIP is done not much
more effort
>>>>> will
>>>>>>>> be
>>>>>>>>> taken.
>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>>>>>>>>>> Hey Jan,
>>>>>>>>>> In the current proposal, the consumer will be blocked
on waiting for
>>>>>>>> other
>>>>>>>>>> consumers of the group to consume up to a given offset.
In most
>>>>> cases,
>>>>>>>> all
>>>>>>>>>> consumers should be close to the LEO of the partitions
when the
>>>>>>>> partition
>>>>>>>>>> expansion happens. Thus the time waiting should not
be long e.g. on
>>>>>>> the
>>>>>>>>>> order of seconds. On the other hand, it may take
a long time to wait
>>>>>>> for
>>>>>>>>>> the entire partition to be copied -- the amount of
time is
>>>>>>> proportional
>>>>>>>> to
>>>>>>>>>> the amount of existing data in the partition, which
can take tens of
>>>>>>>>>> minutes. So the amount of time that we stop consumers
may not be on
>>>>>>> the
>>>>>>>>>> same order of magnitude.
>>>>>>>>>> If we can implement this suggestion without copying
data over in
>>>>> purse
>>>>>>>>>> userland, it will be much more valuable. Do you have
ideas on how
>>>>> this
>>>>>>>> can
>>>>>>>>>> be done?
>>>>>>>>>> Not sure why the current KIP not help people who
depend on log
>>>>>>>> compaction.
>>>>>>>>>> Could you elaborate more on this point?
>>>>>>>>>> Thanks,
>>>>>>>>>> Dong
>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago.
>>>>>>> com
>>>>>>>>>> wrote:
>>>>>>>>>> Hi Dong,
>>>>>>>>>>> I tried to focus on what the steps are one can
currently perform to
>>>>>>>>>>> expand
>>>>>>>>>>> or shrink a keyed topic while maintaining a top
notch semantics.
>>>>>>>>>>> I can understand that there might be confusion
about "stopping the
>>>>>>>>>>> consumer". It is exactly the same as proposed
in the KIP. there
>>>>> needs
>>>>>>>> to
>>>>>>>>>>> be
>>>>>>>>>>> a time the producers agree on the new partitioning.
The extra
>>>>>>>> semantics I
>>>>>>>>>>> want to put in there is that we have a possibility
to wait until all
>>>>>>>> the
>>>>>>>>>>> existing data
>>>>>>>>>>> is copied over into the new partitioning scheme.
When I say stopping
>>>>>>> I
>>>>>>>>>>> think more of having a memory barrier that ensures
the ordering. I
>>>>> am
>>>>>>>>>>> still
>>>>>>>>>>> aming for latencies  on the scale of leader failovers.
>>>>>>>>>>> Consumers have to explicitly adapt the new partitioning
scheme in
>>>>> the
>>>>>>>>>>> above scenario. The reason is that in these cases
where you are
>>>>>>>> dependent
>>>>>>>>>>> on a particular partitioning scheme, you also
have other topics that
>>>>>>>> have
>>>>>>>>>>> co-partition enforcements or the kind -frequently.
Therefore all
>>>>> your
>>>>>>>>>>> other
>>>>>>>>>>> input topics might need to grow accordingly.
>>>>>>>>>>> What I was suggesting was to streamline all these
operations as best
>>>>>>> as
>>>>>>>>>>> possible to have "real" partition grow and shrinkage
going on.
>>>>>>>> Migrating
>>>>>>>>>>> the producers to a new partitioning scheme can
be much more
>>>>>>> streamlined
>>>>>>>>>>> with proper broker support for this. Migrating
consumer is a step
>>>>>>> that
>>>>>>>>>>> might be made completly unnecessary if - for
example streams - takes
>>>>>>>> the
>>>>>>>>>>> gcd as partitioning scheme instead of enforcing
1 to 1. Connect
>>>>>>>> consumers
>>>>>>>>>>> and other consumers should be fine anyways.
>>>>>>>>>>> I hope this makes more clear where I was aiming
at. The rest needs
>>>>> to
>>>>>>>> be
>>>>>>>>>>> figured out. The only danger i see is that when
we are introducing
>>>>>>> this
>>>>>>>>>>> feature as supposed in the KIP, it wont help
any people depending on
>>>>>>>> log
>>>>>>>>>>> compaction.
>>>>>>>>>>> The other thing I wanted to mention is that I
believe the current
>>>>>>>>>>> suggestion (without copying data over) can be
implemented in pure
>>>>>>>>>>> userland
>>>>>>>>>>> with a custom partitioner and a small feedbackloop
>>>>>>> ProduceResponse
>>>>>>>>>>> =>
>>>>>>>>>>> Partitionier in coorporation with a change management
>>>>>>>>>>> Best Jan
>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>>>>>>>>>>> Hey Jan,
>>>>>>>>>>>> I am not sure if it is acceptable for producer
to be stopped for a
>>>>>>>>>>>> while,
>>>>>>>>>>>> particularly for online application which
requires low latency. I
>>>>> am
>>>>>>>>>>>> also
>>>>>>>>>>>> not sure how consumers can switch to a new
topic. Does user
>>>>>>>> application
>>>>>>>>>>>> needs to explicitly specify a different topic
for producer/consumer
>>>>>>> to
>>>>>>>>>>>> subscribe to? It will be helpful for discussion
if you can provide
>>>>>>>> more
>>>>>>>>>>>> detail on the interface change for this solution.
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Dong
>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
>>>>> Filipiak<Jan.Filipiak@trivago.
>>>>>>>> com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> just want to throw my though in. In general
the functionality is
>>>>>>> very
>>>>>>>>>>>>> usefull, we should though not try to
find the architecture to hard
>>>>>>>>>>>>> while
>>>>>>>>>>>>> implementing.
>>>>>>>>>>>>> The manual steps would be to
>>>>>>>>>>>>> create a new topic
>>>>>>>>>>>>> the mirrormake from the new old topic
to the new topic
>>>>>>>>>>>>> wait for mirror making to catch up.
>>>>>>>>>>>>> then put the consumers onto the new topic
>>>>>>>>>>>>>       (having mirrormaker spit out a
mapping from old offsets to
>>>>>>> new
>>>>>>>>>>>>> offsets:
>>>>>>>>>>>>>           if topic is increased by factor
X there is gonna be a
>>>>>>> clean
>>>>>>>>>>>>> mapping from 1 offset in the old topic
to X offsets in the new
>>>>>>> topic,
>>>>>>>>>>>>>           if there is no factor then
there is no chance to
>>>>>>> generate a
>>>>>>>>>>>>> mapping that can be reasonable used for
>>>>>>>>>>>>>       make consumers stop at appropriate
points and continue
>>>>>>>>>>>>> consumption
>>>>>>>>>>>>> with offsets from the mapping.
>>>>>>>>>>>>> have the producers stop for a minimal
>>>>>>>>>>>>> wait for mirrormaker to finish
>>>>>>>>>>>>> let producer produce with the new metadata.
>>>>>>>>>>>>> Instead of implementing the approach
suggest in the KIP which will
>>>>>>>>>>>>> leave
>>>>>>>>>>>>> log compacted topic completely crumbled
and unusable.
>>>>>>>>>>>>> I would much rather try to build infrastructure
to support the
>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>> above operations more smoothly.
>>>>>>>>>>>>> Especially having producers stop and
use another topic is
>>>>> difficult
>>>>>>>> and
>>>>>>>>>>>>> it would be nice if one can trigger "invalid
metadata" exceptions
>>>>>>> for
>>>>>>>>>>>>> them
>>>>>>>>>>>>> and
>>>>>>>>>>>>> if one could give topics aliases so that
their produces with the
>>>>>>> old
>>>>>>>>>>>>> topic
>>>>>>>>>>>>> will arrive in the new topic.
>>>>>>>>>>>>> The downsides are obvious I guess ( having
the same data twice for
>>>>>>>> the
>>>>>>>>>>>>> transition period, but kafka tends to
scale well with datasize).
>>>>> So
>>>>>>>>>>>>> its a
>>>>>>>>>>>>> nicer fit into the architecture.
>>>>>>>>>>>>> I further want to argument that the functionality
by the KIP can
>>>>>>>>>>>>> completely be implementing in "userland"
with a custom partitioner
>>>>>>>> that
>>>>>>>>>>>>> handles the transition as needed. I would
appreciate if someone
>>>>>>> could
>>>>>>>>>>>>> point
>>>>>>>>>>>>> out what a custom partitioner couldn't
handle in this case?
>>>>>>>>>>>>> With the above approach, shrinking a
topic becomes the same steps.
>>>>>>>>>>>>> Without
>>>>>>>>>>>>> loosing keys in the discontinued partitions.
>>>>>>>>>>>>> Would love to hear what everyone thinks.
>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>> I have created KIP-253: Support in-order
message delivery with
>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>> expansion. See
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
>>>>>>>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> This KIP provides a way to allow
messages of the same key from
>>>>> the
>>>>>>>>>>>>>> same
>>>>>>>>>>>>>> producer to be consumed in the same
order they are produced even
>>>>>>> if
>>>>>>>> we
>>>>>>>>>>>>>> expand partition of the topic.
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Dong

View raw message