kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <...@confluent.io>
Subject Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion
Date Tue, 27 Mar 2018 05:12:44 GMT
Hi, John,

Thanks for the comments.

For your thought 1, I agree that it's better to decouple the design of core
and Streams with respect to re-partitioning. From the state management
perspective, a streaming application can be (1) stateless, (2) stateful and
maintaining the states in a global data store, (3) stateful and maintaining
the states in a local store. If an application does (3), it probably will
be using a stream processing library such as KStreams since managing the
local state in the application can be tricky. When using KStreams to manage
local states, currently the main issue is that the number of tasks is
coupled with the number of partitions. Since each task has a separate local
state, if the number of input partitions changes, one needs to rebuild the
local state immediately, which affects latency. So, we need to solution in
KStreams to decouple the two somehow. From the core's perspective, it seems
that it first needs to support per-key ordering since it's required in all
3 use cases above. Next, it would be ideal for it to support some kind of
notification to the consumer when a key switches partitions. Then, for (2),
the application can use this opportunity to do things like flushing the
value for a given key to the global store.

The rest of your thoughts are mostly about the specific implementation of
backfilling existing data to new partitions. To me, backfilling existing
data mainly helps with new applications with case (3). So, if we want to do
that, it seems it's better done in KStreams, instead of core. We also need
a solution for the existing consumers. Then, we should also consider if new
applications can just use the same approach for existing consumers.

Jun


On Mon, Mar 26, 2018 at 11:16 AM, John Roesler <john@confluent.io> wrote:

> Hi all,
>
> I hope you don't mind if I throw some thoughts in...
>
> For some reason, Jan's proposal wasn't really clear to me until this last
> message, and now that (I think) I understand it, I actually like it quite a
> lot.
>
> Thought 1:
> As much as I like Streams, I don't think it should become a requirement to
> pull off something like partition expansion. It would be a bummer if folks
> who otherwise have no use case for Streams have to run an trivial app
> *just* to get partition expansion.
>
> Assuming we get a nice partition-expansion design out of this conversation,
> It may or may not be the case that Streams will still decide to
> re-partition its inputs, but I'd like to keep those two designs (Core and
> Streams) independent (personal opinion, fwiw). It also may or may not be
> the case that we would try to split the local state versus re-building the
> affected operators from the new (post-split) topics. It would be nice to be
> able to optimize the process of scaling Streams independently of Core.
>
> Thought 2:
> Regarding who might manage this special backfill consumer, I wonder if the
> answer might actually be that that logic should be in the producer.
>
> Consider that only the producer knows the partition function, so locating
> backfill anywhere else requires sharing that logic, which would be a
> bummer.
>
> Also, if you think about the "producer" in the broader sense, as the
> organization that produces the data in a particular topic, that "producer"
> controls everything about the topic: the name, the number of partitions,
> the schema, and all of the configs including compaction and retention. This
> "producer" will be the person responsible for splitting/merging partitions
> anyway, so introducing a new component is asking that "producer" to own and
> operate more pieces, whereas placing inside the KafkaProducer keeps their
> operational burden the same.
>
> Thought 3:
> I don't think we want to think about actually creating a new topic, but
> instead something like a "generation" or "version" of a topic.
>
> Different generations of a topic can have different numbers of partitions,
> and also different offsets. Consumers can currently start with the earliest
> available offset. Similarly, with generations, consumers could start off
> with the earliest available in the earliest available generation. The
> retention policy still applies, so when the log cleaner deletes the last
> segment from a generation, that whole generation becomes unavailable, and
> new consumers start off on the next generation.
>
> This also makes the backfill job well defined: the range it's copying
> *from* has a start and an end; the range it's copying *to* is out of bounds
> for new writes. When backfill finishes copying all the records from
> generation 0 to the beginning of generation 1, it instructs the broker that
> generation 1, offset x is now the earliest available offset and all of
> generation 0's files can be deleted. The metadata tracking how to
> transition from gen0 to gen1 can also be deleted, as it's no longer
> meaningful.
>
> Note that for compacted topics, you really want to think of the number of
> partitions in the topic as the number of partitions in the first available
> partition. I.e., I think it's simplifying to assume that consumers may not
> be able to split their state. If I'm a consumer, and I don't have enough
> parallelism, I think I'd go to the producer and request an expansion. The
> producer would do the expansion and kick off the backfill. My existing
> consumers get assigned the new generations partitions such that each
> consumer gets the same set of records. When the backfill completes, *then*
> I can start up my new crop of more granular consumers. Being able to split
> and merge state when I hit topic generation barriers would obviously be
> nice, but I don't think it's universally available, so it's important to
> have a solid design around async backfill with notifications.
>
> Thought 4:
> I *think* Jan's approach works fine even if you want to split just one
> partition, instead of increasing the number of partitions by a factor.
> Plus, you could split the partition into any number of new partitions. It
> all depends on the partition function, which again comes back to it being
> nice to place the responsibility for all these decisions squarely with the
> producer. Maybe the default hash function requires them to double the
> partition count, but if they want to be able to split single partitions,
> they can use a hash function that makes that possible.
>
> We also need to remember that some topics don't use hashing at all to
> assign partitions, but round-robin instead. In this case, the partition
> assignment for a record is clearly arbitrary. In this case, I could see
> wanting to just increase the partition count, say from 2 to 3, and achieve
> a uniform re-distribution. This is the only use case that isn't well served
> by the "partition split" mentality. If you wanted a uniform re-distribution
> in this case, you would have to go from 2 to 4. Or, you could always create
> a truly new topic and get all your consumers to start consuming that one
> instead.
>
> Thought 5:
> This whole operation can be really straightforward if we're a little
> careful with the offsets. I think for expansion, you want to start the new
> generation off with the end offset of the old generation, and for merging,
> you want to add the end offsets of the two old-gen parent partitions. This
> way, you know that there will be enough room at the beginning of the
> partition for a potential backfill either way.
>
> In other words:
> Let's assume MyTopic has 3 partitions, and we're splitting partition 0 into
> two:
> MyTopic[gen=0,part=0,endOffset=50] -> (
> MyTopic[gen=1,part=0,endOffset=50], MyTopic[gen=1,part=3,endOffset=50]
> )
> MyTopic[gen=0,part=1,endOffset=41] -> ( MyTopic[gen=1,part=1,endOffset=41]
> )
> MyTopic[gen=0,part=2,endOffset=63] -> ( MyTopic[gen=1,part=2,endOffset=63]
> )
>
> *Maybe* the producer of MyTopic decides to backfill the split of part=0:
> 1) For each record, they run it though their partition function to decide
> between part=0 and part=3, instructing the broker to copy that record to
> the new generation's partition with the *same* offset.
> 2) When they reach the end of [gen=0,part=0], they instruct the broker that
> MyTopic[gen=1,part=1] now begins at offset=0 and MyTopic[gen=1,part=3] now
> begins at offset=1 (for example), and they also mark gen=0,part=0 as
> defunct.
>
> Assuming 4 Consumers: if they started reading during gen=0, they might have
> the following assignment:
> * consumer1: in gen=0, [part=0], transitioning to [part=0,part=3] at
> offset=50
> * consumer2: [part=1]
> * consumer3: [part=2]
> * consumer4: []
>
> Now, if they started reading during gen=1 after gen=0 becomes defunct, they
> might have this assignment:
> * consumer1: [part=0]
> * consumer2: [part=1]
> * consumer3: [part=2]
> * consumer4: [part=3]
>
> Just a quick note about the possibility of splitting without starting
> over... if consumer1 knows how to split its task, either because it's
> stateless or because it has splittable state, it certainly can and "send"
> part=3 to consumer4. So this avenue is still open.
>
> Thought 5.5:
> I think it's worth also sketching out the merge operation, as I'm picturing
> it.
> Let's assume we've got 4 partitions, and we want to merge part=1 and
> part=2:
> MyTopic[gen=0,part=0,endOffset=50] -> MyTopic[gen=1,part=0,endOffset=50]
> MyTopic[gen=0,part=1,endOffset=12] -> MyTopic[gen=1,part=1,endOffset=(12 +
> 23 = 35)]
> MyTopic[gen=0,part=2,endOffset=23] -> (see gen=1,part=1)
> MyTopic[gen=0,part=3,endOffset=41] -> MyTopic[gen=1,part=3,endOffset=41]
>
> Now, I think no matter how you cut it, this is going to be a tough
> transition for stateful consumers to handle. The good news is that the
> brokers can pull off a merge backfill very efficiently, by just copying
> part=2's segments into part=1 and using a little metadata to remember to
> add 12 to all the offsets it reads out of these segments. So, it might just
> have to be good enough to make merge super fast and then stateful consumers
> will have to rebuild their state for the affected partitions. Producers can
> be warned that while splitting is fine, merging may break their consumers.
>
> Thought 6:
> To pull off the split backfill, the broker will have to stream all the
> records to the producer's backfill worker, which is a bummer, but note that
> the backfill worker doesn't have to stream them back. It is sufficient just
> to send back the new partition assignment and offset for each (old
> partition,offset), since the records are going to keep their old timestamp,
> key, and value. The broker can then do its best to merge consecutive ranges
> and do efficient filesystem copies. Hopefully, the fact that backfill is
> async and producers and consumers can just continue working in the new
> generation while it happens, in combination with the fact that we only have
> to stream the data out and not back in to the broker makes this tolerable.
>
> Likewise, for partitions not undergoing a split can trivially backfill into
> the new generation by copying, moving, or hardlinking the segment files
> over, depending on the implementer's preference. The point is, it can be
> done without streaming that data back to the producer.
>
>
> All done!
> Now that (I think) I understand Jan's proposal, I agree it's a better way
> forward for the project than pulling Streams in to perform the repartition
> operation. Hopefully, my comments add a little to the conversation.
>
> Thanks,
> John
>
> On Fri, Mar 23, 2018 at 9:15 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>
> > Hi Jun,
> >
> > thanks for your great reply, even though I only managed to throw a few
> > sentences in, because of time last time. I think you are starting to
> > understand me pretty well. There is still some minor things that need to
> be
> > flattened out. I already had these points previously but I try to
> rephrase
> > them into answers to your questions for clarity.
> >
> > On 23.03.2018 01:43, Jun Rao wrote:
> >
> >> Hi, Jan,
> >>
> >> Thanks for the reply.
> >>
> >> Ok. So your idea is to have a special consumer reshuffle the data to a
> new
> >> set of partitions. Each consumer group will not increase its tasks
> >> immediately. We just need to make sure that each consumer instance is
> >> assigned the set of new partitions covering the same set of keys. At
> some
> >> point later, each consumer group can decide independently when to
> increase
> >> the number of tasks. At that point, the consumer will need to rebuild
> the
> >> local state for the new tasks. Is that right?
> >>
> > This has multiple aspects. The middle part
> >
> >  We just need to make sure that each consumer instance is
> > assigned the set of new partitions covering the same set of keys
> >
> > is probably the most important one here. This is essentially what the
> > second part of my idea is about.
> > We need a protocol that allows consumers to switch from old topic (old
> > partiton count) to the new topic (new partion count). (If it really is a
> > new topic or not depends on the implementation -I like to model it as
> such
> > as its easier in my head). The protocol I envision is something like
> this:
> > Per partition there is information from the old offset to the new offset.
> >
> > topicOldMapping-0 : 50 => [0:25,2:25]
> > topicOldMapping-1 : 60 => [1:15,3:45]
> >
> > A consumer can get a hold on this information and stop at 50 and 60 and
> > continue at 25,25,15,45
> >
> > The same transition works when the data is not copied. a mapping would
> > look like
> >
> > topicOldMapping-0 : 50 => [0:0,2:0]
> > topicOldMapping-1 : 60 => [1:0,3:0]
> >
> > No data would have been copied and 50 and 60 are the endoffsets and we
> > would start at the very beginning of the new topic 0,0,0,0
> >
> >
> >> I agree that decoupling the number of tasks in a consumer group from the
> >> number of partitions in the input topic is a good idea. This allows each
> >> consumer group to change its degree of parallelism independently. There
> >> are
> >> different ways to achieve such decoupling. You suggested one approach,
> >> which works when splitting partitions. Not sure if this works in
> general,
> >> for example when merging partitions.
> >>
> > Merging is the same.
> >
> > topicOldMapping-0 : 25 => [0:50]
> > topicOldMapping-1 : 25 => [1:50]
> > topicOldMapping-2 : 15 => [0:50]
> > topicOldMapping-3 : 45 => [1:50]
> >
> > generating this mapping is trivial again if we do not copy data. (end
> > offsets point to start offsets)
> > If we do copy data the consumer will create the mapping.
> >
> >
> >> Once a consumer group decides to change its degree of parallelism, it
> >> needs
> >> to rebuild the local state for the new tasks. We can either rebuild the
> >> states from the input topic or from the local state (through change
> >> capture
> >> topic). I think we agreed that rebuilding from the local state is more
> >> efficient. It also seems that it's better to let the KStreams library do
> >> the state rebuilding, instead of each application. Would you agree?
> >>
> > If you want todo by RPC, you are changing the running application without
> > having a new good one. This
> > is against the kappa achitecture I would not recommend that.
> >
> > If you replay the changelog and only poll records that are for your
> > partition. You have the problem of knowing
> > which offset from the input topic your current state relates to.
> >
> > If you rebuild you could leave the old running and just wait for the new
> > to be _good_ then change who's output you
> > show to the customers.
> >
> >
> >> About the special consumer. I guess that's only needed if one wants to
> >> recopy the existing data? However, if we can truly decouple the tasks in
> >> the consumer group from the partitions in the input topic, it seems
> there
> >> is no need for copying existing data? It's also not clear to me who will
> >> start/stop this special consumer.
> >>
> > Who starts and stops it is also not very clear to me. I do not have
> strong
> > opinions.
> > The thing is that I am looking for an explanation how you can have a
> > logcompacted topic working without copying.
> > I agree that for running consumers its no problem as they are already
> past
> > the history. But the whole purpose of Log compaction
> > is to be able to bootstrap new consumers. They are completely lost with a
> > topic expanded without repartitioning.
> > The topic will be broken forever. and this is not acceptable.
> >
> > This is why I am so intrigued to model the problem as described because
> it
> > has no overhead for the no copy path while it allows
> > to also perform a copy.
> >
> > State handling wise one has also all the options. Exactly the 3 you
> > mentioned I guess. its just that this statestore RPC is a bad idea
> > and it was only invented to allow for optimisations that are better not
> > done.Not to say they are premature ;)
> >
> > I hope it makes it clear
> >
> > best jan
> >
> >
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <Jan.Filipiak@trivago.com
> >
> >> wrote:
> >>
> >> Hi Jun,
> >>>
> >>> I was really seeing progress in our conversation but your latest reply
> is
> >>> just devastating.
> >>> I though we were getting close being on the same page now it feels like
> >>> we
> >>> are in different libraries.
> >>>
> >>> I just quickly slam my answers in here. If they are to brief I am sorry
> >>> give me a ping and try to go into details more.
> >>> Just want to show that your pro/cons listing is broken.
> >>>
> >>> Best Jan
> >>>
> >>> and want to get rid of this horrible compromise
> >>>
> >>>
> >>> On 19.03.2018 05:48, Jun Rao wrote:
> >>>
> >>> Hi, Jan,
> >>>>
> >>>> Thanks for the discussion. Great points.
> >>>>
> >>>> Let me try to summarize the approach that you are proposing. On the
> >>>> broker
> >>>> side, we reshuffle the existing data in a topic from current
> partitions
> >>>> to
> >>>> the new partitions. Once the reshuffle fully catches up, switch the
> >>>> consumers to start consuming from the new partitions. If a consumer
> >>>> needs
> >>>> to rebuild its local state (due to partition changes), let the
> consumer
> >>>> rebuild its state by reading all existing data from the new
> partitions.
> >>>> Once all consumers have switches over, cut over the producer to the
> new
> >>>> partitions.
> >>>>
> >>>> The pros for this approach are that :
> >>>> 1. There is just one way to rebuild the local state, which is simpler.
> >>>>
> >>>> true thanks
> >>>
> >>> The cons for this approach are:
> >>>> 1. Need to copy existing data.
> >>>>
> >>>> Very unfair and not correct. It does not require you to copy over
> >>> existing
> >>> data. It _allows_ you to copy all existing data.
> >>>
> >>> 2. The cutover of the producer is a bit complicated since it needs to
> >>>
> >>>> coordinate with all consumer groups.
> >>>>
> >>>> Also not true. I explicitly tried to make clear that there is only one
> >>> special consumer (in the case of actually copying data) coordination is
> >>> required.
> >>>
> >>> 3. The rebuilding of the state in the consumer is from the input topic,
> >>>> which can be more expensive than rebuilding from the existing state.
> >>>>
> >>>> true, but rebuilding state is only required if you want to increase
> >>> processing power, so we assume this is at hand.
> >>>
> >>> 4. The broker potentially has to know the partitioning function. If
> this
> >>>> needs to be customized at the topic level, it can be a bit messy.
> >>>>
> >>>> I would argue against having the operation being performed by the
> >>> broker.
> >>> This was not discussed yet but if you see my original email i suggested
> >>> otherwise from the beginning.
> >>>
> >>> Here is an alternative approach by applying your idea not in the
> broker,
> >>>> but in the consumer. When new partitions are added, we don't move
> >>>> existing
> >>>> data. In KStreams, we first reshuffle the new input data to a new
> topic
> >>>> T1
> >>>> with the old number of partitions and feed T1's data to the rest of
> the
> >>>> pipeline. In the meantime, KStreams reshuffles all existing data of
> the
> >>>> change capture topic to another topic C1 with the new number of
> >>>> partitions.
> >>>> We can then build the state of the new tasks from C1. Once the new
> >>>> states
> >>>> have been fully built, we can cut over the consumption to the input
> >>>> topic
> >>>> and delete T1. This approach works with compacted topic too. If an
> >>>> application reads from the beginning of a compacted topic, the
> consumer
> >>>> will reshuffle the portion of the input when the number of partitions
> >>>> doesn't match the number of tasks.
> >>>>
> >>>> We all wipe this idea from our heads instantly. Mixing Ideas from an
> >>> argument is not a resolution strategy
> >>> just leads to horrible horrible software.
> >>>
> >>>
> >>> The pros of this approach are:
> >>>> 1. No need to copy existing data.
> >>>> 2. Each consumer group can cut over to the new partitions
> independently.
> >>>> 3. The state is rebuilt from the change capture topic, which is
> cheaper
> >>>> than rebuilding from the input topic.
> >>>> 4. Only the KStreams job needs to know the partitioning function.
> >>>>
> >>>> The cons of this approach are:
> >>>> 1. Potentially the same input topic needs to be reshuffled more than
> >>>> once
> >>>> in different consumer groups during the transition phase.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <
> Jan.Filipiak@trivago.com
> >>>> >
> >>>> wrote:
> >>>>
> >>>> Hi Jun,
> >>>>
> >>>>> thank you for following me on these thoughts. It was important to me
> to
> >>>>> feel that kind of understanding for my arguments.
> >>>>>
> >>>>> What I was hoping for (I mentioned this earlier) is that we can model
> >>>>> the
> >>>>> case where we do not want to copy the data the exact same way as the
> >>>>> case
> >>>>> when we do copy the data. Maybe you can peek into the mails before to
> >>>>> see
> >>>>> more details for this.
> >>>>>
> >>>>> This means we have the same mechanism to transfer consumer groups to
> >>>>> switch topic. The offset mapping that would be generated would even
> be
> >>>>> simpler End Offset of the Old topic => offset 0 off all the
> partitions
> >>>>> of
> >>>>> the new topic. Then we could model the transition of a non-copy
> >>>>> expansion
> >>>>> the exact same way as a copy-expansion.
> >>>>>
> >>>>> I know this only works when topic growth by a factor. But the
> benefits
> >>>>> of
> >>>>> only growing by a factor are to strong anyways. See Clemens's hint
> and
> >>>>> remember that state reshuffling is entirely not needed if one doesn't
> >>>>> want
> >>>>> to grow processing power.
> >>>>>
> >>>>> I think these benefits should be clear, and that there is basically
> no
> >>>>> downside to what is currently at hand but just makes everything easy.
> >>>>>
> >>>>> One thing you need to know is. that if you do not offer rebuilding a
> >>>>> log
> >>>>> compacted topic like i suggest that even if you have consumer state
> >>>>> reshuffling. The topic is broken and can not be used to bootstrap new
> >>>>> consumers. They don't know if they need to apply a key from and old
> >>>>> partition or not. This is a horrible downside I haven't seen a
> solution
> >>>>> for
> >>>>> in the email conversation.
> >>>>>
> >>>>> I argue to:
> >>>>>
> >>>>> Only grow topic by a factor always.
> >>>>> Have the "no copy consumer" transition as the trivial case of the
> "copy
> >>>>> consumer transition".
> >>>>> If processors needs to be scaled, let them rebuild from the new topic
> >>>>> and
> >>>>> leave the old running in the mean time.
> >>>>> Do not implement key shuffling in streams.
> >>>>>
> >>>>> I hope I can convince you especially with the fact how I want to
> handle
> >>>>> consumer transition. I think
> >>>>> you didn't quite understood me there before. I think the term "new
> >>>>> topic"
> >>>>> intimidated you a little.
> >>>>> How we solve this on disc doesn't really matter, If the data goes
> into
> >>>>> the
> >>>>> same Dir or a different Dir or anything. I do think that it needs to
> >>>>> involve at least rolling a new segment for the existing partitions.
> >>>>> But most of the transitions should work without restarting consumers.
> >>>>> (newer consumers with support for this). But with new topic i just
> >>>>> meant
> >>>>> the topic that now has a different partition count. Plenty of ways to
> >>>>> handle that (versions, aliases)
> >>>>>
> >>>>> Hope I can further get my idea across.
> >>>>>
> >>>>> Best Jan
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 14.03.2018 02:45, Jun Rao wrote:
> >>>>>
> >>>>> Hi, Jan,
> >>>>>
> >>>>>> Thanks for sharing your view.
> >>>>>>
> >>>>>> I agree with you that recopying the data potentially makes the state
> >>>>>> management easier since the consumer can just rebuild its state from
> >>>>>> scratch (i.e., no need for state reshuffling).
> >>>>>>
> >>>>>> On the flip slide, I saw a few disadvantages of the approach that
> you
> >>>>>> suggested. (1) Building the state from the input topic from scratch
> is
> >>>>>> in
> >>>>>> general less efficient than state reshuffling. Let's say one
> computes
> >>>>>> a
> >>>>>> count per key from an input topic. The former requires reading all
> >>>>>> existing
> >>>>>> records in the input topic whereas the latter only requires reading
> >>>>>> data
> >>>>>> proportional to the number of unique keys. (2) The switching of the
> >>>>>> topic
> >>>>>> needs modification to the application. If there are many
> applications
> >>>>>> on a
> >>>>>> topic, coordinating such an effort may not be easy. Also, it's not
> >>>>>> clear
> >>>>>> how to enforce exactly-once semantic during the switch. (3) If a
> topic
> >>>>>> doesn't need any state management, recopying the data seems
> wasteful.
> >>>>>> In
> >>>>>> that case, in place partition expansion seems more desirable.
> >>>>>>
> >>>>>> I understand your concern about adding complexity in KStreams. But,
> >>>>>> perhaps
> >>>>>> we could iterate on that a bit more to see if it can be simplified.
> >>>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
> >>>>>> Jan.Filipiak@trivago.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> Hi Jun,
> >>>>>>
> >>>>>> I will focus on point 61 as I think its _the_ fundamental part that
> I
> >>>>>>> cant
> >>>>>>> get across at the moment.
> >>>>>>>
> >>>>>>> Kafka is the platform to have state materialized multiple times
> from
> >>>>>>> one
> >>>>>>> input. I emphasize this: It is the building block in architectures
> >>>>>>> that
> >>>>>>> allow you to
> >>>>>>> have your state maintained multiple times. You put a message in
> once,
> >>>>>>> and
> >>>>>>> you have it pop out as often as you like. I believe you understand
> >>>>>>> this.
> >>>>>>>
> >>>>>>> Now! The path of thinking goes the following: I am using apache
> kafka
> >>>>>>> and
> >>>>>>> I _want_ my state multiple times. What am I going todo?
> >>>>>>>
> >>>>>>> A) Am I going to take my state that I build up, plunge some sort of
> >>>>>>> RPC
> >>>>>>> layer ontop of it, use that RPC layer to throw my records across
> >>>>>>> instances?
> >>>>>>> B) Am I just going to read the damn message twice?
> >>>>>>>
> >>>>>>> Approach A is fundamentally flawed and a violation of all that is
> >>>>>>> good
> >>>>>>> and
> >>>>>>> holy in kafka deployments. I can not understand how this Idea can
> >>>>>>> come
> >>>>>>> in
> >>>>>>> the first place.
> >>>>>>> (I do understand: IQ in streams, they polluted the kafka streams
> >>>>>>> codebase
> >>>>>>> really bad already. It is not funny! I think they are equally
> flawed
> >>>>>>> as
> >>>>>>> A)
> >>>>>>>
> >>>>>>> I say, we do what Kafka is good at. We repartition the topic once.
> We
> >>>>>>> switch the consumers.
> >>>>>>> (Those that need more partitions are going to rebuild their state
> in
> >>>>>>> multiple partitions by reading the new topic, those that don't just
> >>>>>>> assign
> >>>>>>> the new partitions properly)
> >>>>>>> We switch producers. Done!
> >>>>>>>
> >>>>>>> The best thing! It is trivial, hipster stream processor will have
> an
> >>>>>>> easy
> >>>>>>> time with that aswell. Its so super simple. And simple IS good!
> >>>>>>> It is what kafka was build todo. It is how we do it today. All I am
> >>>>>>> saying
> >>>>>>> is that a little broker help doing the producer swap is super
> useful.
> >>>>>>>
> >>>>>>> For everyone interested in why kafka is so powerful with approach
> B,
> >>>>>>> please watch https://youtu.be/bEbeZPVo98c?t=1633
> >>>>>>> I already looked up a good point in time, I think after 5 minutes
> the
> >>>>>>> "state" topic is handled and you should be able to understand me
> >>>>>>> and inch better.
> >>>>>>>
> >>>>>>> Please do not do A to the project, it deserves better!
> >>>>>>>
> >>>>>>> Best Jan
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 13.03.2018 02:40, Jun Rao wrote:
> >>>>>>>
> >>>>>>> Hi, Jan,
> >>>>>>>
> >>>>>>> Thanks for the reply. A few more comments below.
> >>>>>>>>
> >>>>>>>> 50. Ok, we can think a bit harder for supporting compacted topics.
> >>>>>>>>
> >>>>>>>> 51. This is a fundamental design question. In the more common
> case,
> >>>>>>>> the
> >>>>>>>> reason why someone wants to increase the number of partitions is
> >>>>>>>> that
> >>>>>>>> the
> >>>>>>>> consumer application is slow and one wants to run more consumer
> >>>>>>>> instances
> >>>>>>>> to increase the degree of parallelism. So, fixing the number of
> >>>>>>>> running
> >>>>>>>> consumer instances when expanding the partitions won't help this
> >>>>>>>> case.
> >>>>>>>> If
> >>>>>>>> we do need to increase the number of consumer instances, we need
> to
> >>>>>>>> somehow
> >>>>>>>> reshuffle the state of the consumer across instances. What we have
> >>>>>>>> been
> >>>>>>>> discussing in this KIP is whether we can do this more effectively
> >>>>>>>> through
> >>>>>>>> the KStream library (e.g. through a 2-phase partition expansion).
> >>>>>>>> This
> >>>>>>>> will
> >>>>>>>> add some complexity, but it's probably better than everyone doing
> >>>>>>>> this
> >>>>>>>> in
> >>>>>>>> the application space. The recopying approach that you mentioned
> >>>>>>>> doesn't
> >>>>>>>> seem to address the consumer state management issue when the
> >>>>>>>> consumer
> >>>>>>>> switches from an old to a new topic.
> >>>>>>>>
> >>>>>>>> 52. As for your example, it depends on whether the join key is the
> >>>>>>>> same
> >>>>>>>> between (A,B) and (B,C). If the join key is the same, we can do a
> >>>>>>>> 2-phase
> >>>>>>>> partition expansion of A, B, and C together. If the join keys are
> >>>>>>>> different, one would need to repartition the data on a different
> key
> >>>>>>>> for
> >>>>>>>> the second join, then the partition expansion can be done
> >>>>>>>> independently
> >>>>>>>> between (A,B) and (B,C).
> >>>>>>>>
> >>>>>>>> 53. If you always fix the number of consumer instances, we you
> >>>>>>>> described
> >>>>>>>> works. However, as I mentioned in #51, I am not sure how your
> >>>>>>>> proposal
> >>>>>>>> deals with consumer states when the number of consumer instances
> >>>>>>>> grows.
> >>>>>>>> Also, it just seems that it's better to avoid re-copying the
> >>>>>>>> existing
> >>>>>>>> data.
> >>>>>>>>
> >>>>>>>> 60. "just want to throw in my question from the longer email in
> the
> >>>>>>>> other
> >>>>>>>> Thread here. How will the bloom filter help a new consumer to
> decide
> >>>>>>>> to
> >>>>>>>> apply the key or not?" Not sure that I fully understood your
> >>>>>>>> question.
> >>>>>>>> The
> >>>>>>>> consumer just reads whatever key is in the log. The bloom filter
> >>>>>>>> just
> >>>>>>>> helps
> >>>>>>>> clean up the old keys.
> >>>>>>>>
> >>>>>>>> 61. "Why can we afford having a topic where its apparently not
> >>>>>>>> possible
> >>>>>>>> to
> >>>>>>>> start a new application on? I think this is an overall flaw of the
> >>>>>>>> discussed idea here. Not playing attention to the overall
> >>>>>>>> architecture."
> >>>>>>>> Could you explain a bit more when one can't start a new
> application?
> >>>>>>>>
> >>>>>>>> Jun
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
> >>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi Jun, thanks for your mail.
> >>>>>>>>
> >>>>>>>> Thank you for your questions!
> >>>>>>>>
> >>>>>>>>> I think they are really good and tackle the core of the problem I
> >>>>>>>>> see.
> >>>>>>>>>
> >>>>>>>>> I will answer inline, mostly but still want to set the tone here.
> >>>>>>>>>
> >>>>>>>>> The core strength of kafka is what Martin once called the
> >>>>>>>>> kappa-Architecture. How does this work?
> >>>>>>>>> You have everything as a log as in kafka. When you need to change
> >>>>>>>>> something.
> >>>>>>>>> You create the new version of your application and leave it
> running
> >>>>>>>>> in
> >>>>>>>>> parallel.
> >>>>>>>>> Once the new version is good you switch your users to use the new
> >>>>>>>>> application.
> >>>>>>>>>
> >>>>>>>>> The online reshuffling effectively breaks this architecture and I
> >>>>>>>>> think
> >>>>>>>>> the switch in thinking here is more harmful
> >>>>>>>>> than any details about the partitioning function to allow such a
> >>>>>>>>> change.
> >>>>>>>>> I
> >>>>>>>>> feel with my suggestion we are the closest to
> >>>>>>>>> the original and battle proven architecture and I can only warn
> to
> >>>>>>>>> move
> >>>>>>>>> away from it.
> >>>>>>>>>
> >>>>>>>>> I might have forgotten something, sometimes its hard for me to
> >>>>>>>>> getting
> >>>>>>>>> all
> >>>>>>>>> the thoughts captured in a mail, but I hope the comments inline
> >>>>>>>>> will
> >>>>>>>>> further make my concern clear, and put some emphasis on why I
> >>>>>>>>> prefer
> >>>>>>>>> my
> >>>>>>>>> solution ;)
> >>>>>>>>>
> >>>>>>>>> One thing we should all be aware of when discussing this, and I
> >>>>>>>>> think
> >>>>>>>>> Dong
> >>>>>>>>> should have mentioned it (maybe he did).
> >>>>>>>>> We are not discussing all of this out of thin air but there is an
> >>>>>>>>> effort
> >>>>>>>>> in the Samza project.
> >>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+
> >>>>>>>>> Enable+partition+expansion+of+input+streams
> >>>>>>>>> https://issues.apache.org/jira/browse/SAMZA-1293
> >>>>>>>>>
> >>>>>>>>> To be clear. I think SEP-5 (state of last week, dont know if it
> >>>>>>>>> adapted
> >>>>>>>>> to
> >>>>>>>>> this discussion) is on a way better path than KIP-253, and I
> can't
> >>>>>>>>> really
> >>>>>>>>> explain why.
> >>>>>>>>>
> >>>>>>>>> Best Jan,
> >>>>>>>>>
> >>>>>>>>> nice weekend everyone
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 09.03.2018 03:36, Jun Rao wrote:
> >>>>>>>>>
> >>>>>>>>> Hi, Jan,
> >>>>>>>>>
> >>>>>>>>> Thanks for the feedback. Just some comments on the earlier points
> >>>>>>>>>
> >>>>>>>>>> that
> >>>>>>>>>> you
> >>>>>>>>>> mentioned.
> >>>>>>>>>>
> >>>>>>>>>> 50. You brought up the question of whether existing data needs
> to
> >>>>>>>>>> be
> >>>>>>>>>> copied
> >>>>>>>>>> during partition expansion. My understand of your view is that
> >>>>>>>>>> avoid
> >>>>>>>>>> copying existing data will be more efficient, but it doesn't
> work
> >>>>>>>>>> well
> >>>>>>>>>> with
> >>>>>>>>>> compacted topics since some keys in the original partitions will
> >>>>>>>>>> never
> >>>>>>>>>> be
> >>>>>>>>>> cleaned. It would be useful to understand your use case of
> >>>>>>>>>> compacted
> >>>>>>>>>> topics
> >>>>>>>>>> a bit more. In the common use case, the data volume in a
> compacted
> >>>>>>>>>> topic
> >>>>>>>>>> may not be large. So, I am not sure if there is a strong need to
> >>>>>>>>>> expand
> >>>>>>>>>> partitions in a compacted topic, at least initially.
> >>>>>>>>>>
> >>>>>>>>>> I do agree. State is usually smaller. Update rates might be also
> >>>>>>>>>>
> >>>>>>>>>> competitively high.
> >>>>>>>>>>
> >>>>>>>>> Doing Log-compaction (even beeing very efficient and
> configurable)
> >>>>>>>>> is
> >>>>>>>>> also
> >>>>>>>>> a more expensive operation than
> >>>>>>>>> just discarding old segments. Further if you want to use more
> >>>>>>>>> consumers
> >>>>>>>>> processing the events
> >>>>>>>>> you also have to grow the number of partitions. Especially for
> >>>>>>>>> use-cases
> >>>>>>>>> we do (KIP-213) a tiny state full
> >>>>>>>>> table might be very expensive to process if it joins against a
> huge
> >>>>>>>>> table.
> >>>>>>>>>
> >>>>>>>>> I can just say we have been in the spot of needing to grow log
> >>>>>>>>> compacted
> >>>>>>>>> topics. Mainly for processing power we can bring to the table.
> >>>>>>>>>
> >>>>>>>>> Further i am not at all concerned about the extra spaced used by
> >>>>>>>>> "garbage
> >>>>>>>>> keys". I am more concerned about the correctness of innocent
> >>>>>>>>> consumers.
> >>>>>>>>> The
> >>>>>>>>> logic becomes complicated. Say for streams one would need to load
> >>>>>>>>> the
> >>>>>>>>> record into state but not forward it the topology ( to have it
> >>>>>>>>> available
> >>>>>>>>> for shuffeling). I rather have it simple and a topic clean
> >>>>>>>>> regardless
> >>>>>>>>> if
> >>>>>>>>> it
> >>>>>>>>> still has its old partition count. Especially with multiple
> >>>>>>>>> partitions
> >>>>>>>>> growth's I think it becomes insanely hard to to this shuffle
> >>>>>>>>> correct.
> >>>>>>>>> Maybe
> >>>>>>>>> Streams and Samza can do it. Especially if you do "hipster stream
> >>>>>>>>> processing" <https://www.confluent.io/blog
> >>>>>>>>> /introducing-kafka-streams-
> >>>>>>>>> stream-processing-made-simple/>. This makes kafka way to
> >>>>>>>>> complicated.
> >>>>>>>>> With my approach I think its way simpler because the topic has no
> >>>>>>>>> "history"
> >>>>>>>>> in terms of partitioning but is always clean.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 51. "Growing the topic by an integer factor does not require any
> >>>>>>>>> state
> >>>>>>>>>
> >>>>>>>>> redistribution at all." Could you clarify this a bit more? Let's
> >>>>>>>>> say
> >>>>>>>>>
> >>>>>>>>>> you
> >>>>>>>>>> have a consumer app that computes the windowed count per key. If
> >>>>>>>>>> you
> >>>>>>>>>> double
> >>>>>>>>>> the number of partitions from 1 to 2 and grow the consumer
> >>>>>>>>>> instances
> >>>>>>>>>> from
> >>>>>>>>>> 1
> >>>>>>>>>> to 2, we would need to redistribute some of the counts to the
> new
> >>>>>>>>>> consumer
> >>>>>>>>>> instance. Regarding to linear hashing, it's true that it won't
> >>>>>>>>>> solve
> >>>>>>>>>> the
> >>>>>>>>>> problem with compacted topics. The main benefit is that it
> >>>>>>>>>> redistributes
> >>>>>>>>>> the keys in one partition to no more than two partitions, which
> >>>>>>>>>> could
> >>>>>>>>>> help
> >>>>>>>>>> redistribute the state.
> >>>>>>>>>>
> >>>>>>>>>> You don't need to spin up a new consumer in this case. every
> >>>>>>>>>> consumer
> >>>>>>>>>>
> >>>>>>>>>> would just read every partition with the (partition % num_task)
> >>>>>>>>>>
> >>>>>>>>> task.
> >>>>>>>>> it
> >>>>>>>>> will still have the previous data right there and can go on.
> >>>>>>>>>
> >>>>>>>>> This sounds contradictory to what I said before, but please bear
> >>>>>>>>> with
> >>>>>>>>> me.
> >>>>>>>>>
> >>>>>>>>> 52. Good point on coordinating the expansion of 2 topics that
> need
> >>>>>>>>> to
> >>>>>>>>> be
> >>>>>>>>>
> >>>>>>>>> joined together. This is where the 2-phase partition expansion
> >>>>>>>>> could
> >>>>>>>>>
> >>>>>>>>>> potentially help. In the first phase, we could add new
> partitions
> >>>>>>>>>> to
> >>>>>>>>>> the 2
> >>>>>>>>>> topics one at a time but without publishing to the new
> patitions.
> >>>>>>>>>> Then,
> >>>>>>>>>> we
> >>>>>>>>>> can add new consumer instances to pick up the new partitions. In
> >>>>>>>>>> this
> >>>>>>>>>> transition phase, no reshuffling is needed since no data is
> coming
> >>>>>>>>>> from
> >>>>>>>>>> the
> >>>>>>>>>> new partitions. Finally, we can enable the publishing to the new
> >>>>>>>>>> partitions.
> >>>>>>>>>>
> >>>>>>>>>> I think its even worse than you think. I would like to introduce
> >>>>>>>>>> the
> >>>>>>>>>>
> >>>>>>>>>> Term
> >>>>>>>>>>
> >>>>>>>>> transitive copartitioning. Imagine
> >>>>>>>>> 2 streams application. One joins (A,B) the other (B,C) then there
> >>>>>>>>> is
> >>>>>>>>> a
> >>>>>>>>> transitive copartition requirement for
> >>>>>>>>> (A,C) to be copartitioned aswell. This can spread significantly
> and
> >>>>>>>>> require many consumers to adapt at the same time.
> >>>>>>>>>
> >>>>>>>>> It is also not entirely clear to me how you not need reshuffling
> in
> >>>>>>>>> this
> >>>>>>>>> case. If A has a record that never gets updated after the
> expansion
> >>>>>>>>> and
> >>>>>>>>> the
> >>>>>>>>> coresponding B record moves to a new partition. How shall they
> meet
> >>>>>>>>> w/o
> >>>>>>>>> shuffle?
> >>>>>>>>>
> >>>>>>>>> 53. "Migrating consumer is a step that might be made completly
> >>>>>>>>>
> >>>>>>>>> unnecessary
> >>>>>>>>>
> >>>>>>>>>> if - for example streams - takes the gcd as partitioning scheme
> >>>>>>>>>> instead
> >>>>>>>>>> of
> >>>>>>>>>> enforcing 1 to 1." Not sure that I fully understand this. I
> think
> >>>>>>>>>> you
> >>>>>>>>>> mean
> >>>>>>>>>> that a consumer application can run more instances than the
> number
> >>>>>>>>>> of
> >>>>>>>>>> partitions. In that case, the consumer can just repartitioning
> the
> >>>>>>>>>> input
> >>>>>>>>>> data according to the number of instances. This is possible, but
> >>>>>>>>>> just
> >>>>>>>>>> has
> >>>>>>>>>> the overhead of reshuffling the data.
> >>>>>>>>>>
> >>>>>>>>>> No what I meant is ( that is also your question i think Mathias)
> >>>>>>>>>> that
> >>>>>>>>>> if
> >>>>>>>>>>
> >>>>>>>>>> you grow a topic by a factor.
> >>>>>>>>>>
> >>>>>>>>> Even if your processor is statefull you can can just assign all
> the
> >>>>>>>>> multiples of the previous partition to
> >>>>>>>>> this consumer and the state to keep processing correctly will be
> >>>>>>>>> present
> >>>>>>>>> w/o any shuffling.
> >>>>>>>>>
> >>>>>>>>> Say you have an assignment
> >>>>>>>>> Statefull consumer => partition
> >>>>>>>>> 0 => 0
> >>>>>>>>> 1 => 1
> >>>>>>>>> 2 => 2
> >>>>>>>>>
> >>>>>>>>> and you grow you topic by 4 you get,
> >>>>>>>>>
> >>>>>>>>> 0 => 0,3,6,9
> >>>>>>>>> 1 => 1,4,7,10
> >>>>>>>>> 2 => 2,5,8,11
> >>>>>>>>>
> >>>>>>>>> Say your hashcode is 8. 8%3 => 2  before so consumer for
> partition
> >>>>>>>>> 2
> >>>>>>>>> has
> >>>>>>>>> it.
> >>>>>>>>> Now you you have 12 partitions so 8%12 => 8, so it goes into
> >>>>>>>>> partition
> >>>>>>>>> 8
> >>>>>>>>> which is assigned to the same consumer
> >>>>>>>>> who had 2 before and therefore knows the key.
> >>>>>>>>>
> >>>>>>>>> Userland reshuffeling is there as an options. And it does exactly
> >>>>>>>>> what
> >>>>>>>>> I
> >>>>>>>>> suggest. And I think its the perfect strategie. All I am
> suggestion
> >>>>>>>>> is
> >>>>>>>>> broker side support to switch the producers to the newly
> >>>>>>>>> partitioned
> >>>>>>>>> topic.
> >>>>>>>>> Then the old (to few partition topic) can go away.  Remember the
> >>>>>>>>> list
> >>>>>>>>> of
> >>>>>>>>> steps in the beginning of this thread. If one has broker support
> >>>>>>>>> for
> >>>>>>>>> all
> >>>>>>>>> where its required and streams support for those that aren’t
> >>>>>>>>> necessarily.
> >>>>>>>>> Then one has solved the problem.
> >>>>>>>>> I repeat it because I think its important. I am really happy that
> >>>>>>>>> you
> >>>>>>>>> brought that up! because its 100% what I want just with the
> >>>>>>>>> differences
> >>>>>>>>> to
> >>>>>>>>> have an option to discard the to small topic later (after all
> >>>>>>>>> consumers
> >>>>>>>>> adapted). And to have order correct there. I need broker support
> >>>>>>>>> managing
> >>>>>>>>> the copy process + the produces and fence them against each
> other.
> >>>>>>>>> I
> >>>>>>>>> also
> >>>>>>>>> repeat. the copy process can run for weeks in the worst case.
> >>>>>>>>> Copying
> >>>>>>>>> the
> >>>>>>>>> data is not the longest task migrating consumers might very well
> >>>>>>>>> be.
> >>>>>>>>> Once all consumers switched and copying is really up to date
> (think
> >>>>>>>>> ISR
> >>>>>>>>> like up to date) only then we stop the producer, wait for the
> copy
> >>>>>>>>> to
> >>>>>>>>> finish and use the new topic for producing.
> >>>>>>>>>
> >>>>>>>>> After this the topic is perfect in shape. and no one needs to
> worry
> >>>>>>>>> about
> >>>>>>>>> complicated stuff. (old keys hanging around might arrive in some
> >>>>>>>>> other
> >>>>>>>>> topic later.....). can only imagine how many tricky bugs gonna
> >>>>>>>>> arrive
> >>>>>>>>> after
> >>>>>>>>> someone had grown and shrunken is topic 10 times.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 54. "The other thing I wanted to mention is that I believe the
> >>>>>>>>> current
> >>>>>>>>>
> >>>>>>>>> suggestion (without copying data over) can be implemented in pure
> >>>>>>>>>
> >>>>>>>>>> userland
> >>>>>>>>>> with a custom partitioner and a small feedbackloop from
> >>>>>>>>>> ProduceResponse
> >>>>>>>>>> =>
> >>>>>>>>>> Partitionier in coorporation with a change management system." I
> >>>>>>>>>> am
> >>>>>>>>>> not
> >>>>>>>>>> sure a customized partitioner itself solves the problem. We
> >>>>>>>>>> probably
> >>>>>>>>>> need
> >>>>>>>>>> some broker side support to enforce when the new partitions can
> be
> >>>>>>>>>> used.
> >>>>>>>>>> We
> >>>>>>>>>> also need some support on the consumer/kstream side to preserve
> >>>>>>>>>> the
> >>>>>>>>>> per
> >>>>>>>>>> key
> >>>>>>>>>> ordering and potentially migrate the processing state. This is
> not
> >>>>>>>>>> trivial
> >>>>>>>>>> and I am not sure if it's ideal to fully push to the application
> >>>>>>>>>> space.
> >>>>>>>>>>
> >>>>>>>>>> Broker support is defenitly the preferred way here. I have
> nothing
> >>>>>>>>>>
> >>>>>>>>>> against
> >>>>>>>>>>
> >>>>>>>>> broker support.
> >>>>>>>>> I tried to say that for what I would preffer - copying the data
> >>>>>>>>> over,
> >>>>>>>>> at
> >>>>>>>>> least for log compacted topics -
> >>>>>>>>> I would require more broker support than the KIP currently
> offers.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Jun
> >>>>>>>>>
> >>>>>>>>> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
> >>>>>>>>>
> >>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Dong,
> >>>>>>>>>>
> >>>>>>>>>> are you actually reading my emails, or are you just using the
> >>>>>>>>>> thread I
> >>>>>>>>>>
> >>>>>>>>>> started for general announcements regarding the KIP?
> >>>>>>>>>>>
> >>>>>>>>>>> I tried to argue really hard against linear hashing. Growing
> the
> >>>>>>>>>>> topic
> >>>>>>>>>>> by
> >>>>>>>>>>> an integer factor does not require any state redistribution at
> >>>>>>>>>>> all. I
> >>>>>>>>>>> fail
> >>>>>>>>>>> to see completely where linear hashing helps on log compacted
> >>>>>>>>>>> topics.
> >>>>>>>>>>>
> >>>>>>>>>>> If you are not willing to explain to me what I might be
> >>>>>>>>>>> overlooking:
> >>>>>>>>>>> that
> >>>>>>>>>>> is fine.
> >>>>>>>>>>> But I ask you to not reply to my emails then. Please understand
> >>>>>>>>>>> my
> >>>>>>>>>>> frustration with this.
> >>>>>>>>>>>
> >>>>>>>>>>> Best Jan
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 06.03.2018 19:38, Dong Lin wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for all the comments! It appears that everyone prefers
> >>>>>>>>>>> linear
> >>>>>>>>>>>
> >>>>>>>>>>> hashing because it reduces the amount of state that needs to be
> >>>>>>>>>>>> moved
> >>>>>>>>>>>> between consumers (for stream processing). The KIP has been
> >>>>>>>>>>>> updated
> >>>>>>>>>>>> to
> >>>>>>>>>>>> use
> >>>>>>>>>>>> linear hashing.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding the migration endeavor: it seems that migrating
> >>>>>>>>>>>> producer
> >>>>>>>>>>>> library
> >>>>>>>>>>>> to use linear hashing should be pretty straightforward without
> >>>>>>>>>>>> much operational endeavor. If we don't upgrade client library
> to
> >>>>>>>>>>>> use
> >>>>>>>>>>>> this
> >>>>>>>>>>>> KIP, we can not support in-order delivery after partition is
> >>>>>>>>>>>> changed
> >>>>>>>>>>>> anyway. Suppose we upgrade client library to use this KIP, if
> >>>>>>>>>>>> partition
> >>>>>>>>>>>> number is not changed, the key -> partition mapping will be
> >>>>>>>>>>>> exactly
> >>>>>>>>>>>> the
> >>>>>>>>>>>> same as it is now because it is still determined using
> >>>>>>>>>>>> murmur_hash(key)
> >>>>>>>>>>>> %
> >>>>>>>>>>>> original_partition_num. In other words, this change is
> backward
> >>>>>>>>>>>> compatible.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding the load distribution: if we use linear hashing, the
> >>>>>>>>>>>> load
> >>>>>>>>>>>> may
> >>>>>>>>>>>> be
> >>>>>>>>>>>> unevenly distributed because those partitions which are not
> >>>>>>>>>>>> split
> >>>>>>>>>>>> may
> >>>>>>>>>>>> receive twice as much traffic as other partitions that are
> >>>>>>>>>>>> split.
> >>>>>>>>>>>> This
> >>>>>>>>>>>> issue can be mitigated by creating topic with partitions that
> >>>>>>>>>>>> are
> >>>>>>>>>>>> several
> >>>>>>>>>>>> times the number of consumers. And there will be no imbalance
> if
> >>>>>>>>>>>> the
> >>>>>>>>>>>> partition number is always doubled. So this imbalance seems
> >>>>>>>>>>>> acceptable.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding storing the partition strategy as per-topic config:
> It
> >>>>>>>>>>>> seems
> >>>>>>>>>>>> not
> >>>>>>>>>>>> necessary since we can still use murmur_hash as the default
> hash
> >>>>>>>>>>>> function
> >>>>>>>>>>>> and additionally apply the linear hashing algorithm if the
> >>>>>>>>>>>> partition
> >>>>>>>>>>>> number
> >>>>>>>>>>>> has increased. Not sure if there is any use-case for producer
> to
> >>>>>>>>>>>> use a
> >>>>>>>>>>>> different hash function. Jason, can you check if there is some
> >>>>>>>>>>>> use-case
> >>>>>>>>>>>> that I missed for using the per-topic partition strategy?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding how to reduce latency (due to state store/load) in
> >>>>>>>>>>>> stream
> >>>>>>>>>>>> processing consumer when partition number changes: I need to
> >>>>>>>>>>>> read
> >>>>>>>>>>>> the
> >>>>>>>>>>>> Kafka
> >>>>>>>>>>>> Stream code to understand how Kafka Stream currently migrate
> >>>>>>>>>>>> state
> >>>>>>>>>>>> between
> >>>>>>>>>>>> consumers when the application is added/removed for a given
> >>>>>>>>>>>> job. I
> >>>>>>>>>>>> will
> >>>>>>>>>>>> reply after I finish reading the documentation and code.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Dong
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
> >>>>>>>>>>>> jason@confluent.io>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Great discussion. I think I'm wondering whether we can
> continue
> >>>>>>>>>>>> to
> >>>>>>>>>>>> leave
> >>>>>>>>>>>>
> >>>>>>>>>>>> Kafka agnostic to the partitioning strategy. The challenge is
> >>>>>>>>>>>>
> >>>>>>>>>>>> communicating
> >>>>>>>>>>>>
> >>>>>>>>>>>>> the partitioning logic from producers to consumers so that
> the
> >>>>>>>>>>>>> dependencies
> >>>>>>>>>>>>> between each epoch can be determined. For the sake of
> >>>>>>>>>>>>> discussion,
> >>>>>>>>>>>>> imagine
> >>>>>>>>>>>>> you did something like the following:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. The name (and perhaps version) of a partitioning strategy
> is
> >>>>>>>>>>>>> stored
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>> topic configuration when a topic is created.
> >>>>>>>>>>>>> 2. The producer looks up the partitioning strategy before
> >>>>>>>>>>>>> writing
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>> topic and includes it in the produce request (for fencing).
> If
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>> have an implementation for the configured strategy, it fails.
> >>>>>>>>>>>>> 3. The consumer also looks up the partitioning strategy and
> >>>>>>>>>>>>> uses
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> determine dependencies when reading a new epoch. It could
> >>>>>>>>>>>>> either
> >>>>>>>>>>>>> fail
> >>>>>>>>>>>>> or
> >>>>>>>>>>>>> make the most conservative dependency assumptions if it
> doesn't
> >>>>>>>>>>>>> know
> >>>>>>>>>>>>> how
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> implement the partitioning strategy. For the consumer, the
> new
> >>>>>>>>>>>>> interface
> >>>>>>>>>>>>> might look something like this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> // Return the partition dependencies following an epoch bump
> >>>>>>>>>>>>> Map<Integer, List<Integer>> dependencies(int
> >>>>>>>>>>>>> numPartitionsBeforeEpochBump,
> >>>>>>>>>>>>> int numPartitionsAfterEpochBump)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The unordered case then is just a particular implementation
> >>>>>>>>>>>>> which
> >>>>>>>>>>>>> never
> >>>>>>>>>>>>> has
> >>>>>>>>>>>>> any epoch dependencies. To implement this, we would need some
> >>>>>>>>>>>>> way
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> consumer to find out how many partitions there were in each
> >>>>>>>>>>>>> epoch,
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>> maybe that's not too unreasonable.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Jason
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
> >>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Dong
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> thank you very much for your questions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> regarding the time spend copying data across:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> It is correct that copying data from a topic with one
> >>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>> mapping
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> a topic with a different partition mapping takes way longer
> >>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stop producers. Tens of minutes is a very optimistic estimate
> >>>>>>>>>>>>> here.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Many
> >>>>>>>>>>>>>> people can not afford copy full steam and therefore will
> have
> >>>>>>>>>>>>>> some
> >>>>>>>>>>>>>> rate
> >>>>>>>>>>>>>> limiting in place, this can bump the timespan into the
> day's.
> >>>>>>>>>>>>>> The
> >>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> part
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> is that the vast majority of the data can be copied while
> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> producers
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> still going. One can then, piggyback the consumers ontop of
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> timeframe,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> by the method mentioned (provide them an mapping from their
> old
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> new offsets in their repartitioned topics. In that way we
> >>>>>>>>>>>>>> separate
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> migration of consumers from migration of producers
> (decoupling
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> these
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>> what kafka is strongest at). The time to actually swap over
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> producers
> >>>>>>>>>>>>>> should be kept minimal by ensuring that when a swap attempt
> is
> >>>>>>>>>>>>>> started
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> consumer copying over should be very close to the log end
> and
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> expected
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> to finish within the next fetch. The operation should have a
> >>>>>>>>>>>>>> time-out
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>> should be "reattemtable".
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Importance of logcompaction:
> >>>>>>>>>>>>>> If a producer produces key A, to partiton 0, its forever
> gonna
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>> there,
> >>>>>>>>>>>>>> unless it gets deleted. The record might sit in there for
> >>>>>>>>>>>>>> years. A
> >>>>>>>>>>>>>> new
> >>>>>>>>>>>>>> producer started with the new partitions will fail to delete
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the correct partition. Th record will be there forever and
> one
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> reliable bootstrap new consumers. I cannot see how linear
> >>>>>>>>>>>>> hashing
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> solve
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regarding your skipping of userland copying:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 100%, copying the data across in userland is, as far as i can
> >>>>>>>>>>>>>> see,
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>> usecase for log compacted topics. Even for logcompaction +
> >>>>>>>>>>>>>> retentions
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>> should only be opt-in. Why did I bring it up? I think log
> >>>>>>>>>>>>>> compaction
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>> very important feature to really embrace kafka as a "data
> >>>>>>>>>>>>>> plattform".
> >>>>>>>>>>>>>> The
> >>>>>>>>>>>>>> point I also want to make is that copying data this way is
> >>>>>>>>>>>>>> completely
> >>>>>>>>>>>>>> inline with the kafka architecture. it only consists of
> >>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> writing
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> to topics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I hope it clarifies more why I think we should aim for more
> >>>>>>>>>>>>> than
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> current KIP. I fear that once the KIP is done not much more
> >>>>>>>>>>>>>> effort
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> taken.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hey Jan,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In the current proposal, the consumer will be blocked on
> >>>>>>>>>>>>>> waiting
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> consumers of the group to consume up to a given offset. In
> >>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> cases,
> >>>>>>>>>>>>>> all
> >>>>>>>>>>>>>> consumers should be close to the LEO of the partitions when
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>> expansion happens. Thus the time waiting should not be long
> >>>>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> order of seconds. On the other hand, it may take a long time
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>> wait
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the entire partition to be copied -- the amount of time is
> >>>>>>>>>>>>>>> proportional
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the amount of existing data in the partition, which can
> take
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> tens of
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> minutes. So the amount of time that we stop consumers may
> not
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> same order of magnitude.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If we can implement this suggestion without copying data
> over
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>> purse
> >>>>>>>>>>>>>>> userland, it will be much more valuable. Do you have ideas
> on
> >>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> be done?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Not sure why the current KIP not help people who depend on
> >>>>>>>>>>>>>> log
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> compaction.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Could you elaborate more on this point?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan
> >>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
> >>>>>>>>>>>>>>> com
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I tried to focus on what the steps are one can currently
> >>>>>>>>>>>>>>> perform
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> expand
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> or shrink a keyed topic while maintaining a top notch
> >>>>>>>>>>>>>>>> semantics.
> >>>>>>>>>>>>>>>> I can understand that there might be confusion about
> >>>>>>>>>>>>>>>> "stopping
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> consumer". It is exactly the same as proposed in the KIP.
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> a time the producers agree on the new partitioning. The
> >>>>>>>>>>>>>>> extra
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> semantics I
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> want to put in there is that we have a possibility to wait
> >>>>>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> existing data
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> is copied over into the new partitioning scheme. When I say
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> stopping
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> think more of having a memory barrier that ensures the
> >>>>>>>>>>>>>>>> ordering. I
> >>>>>>>>>>>>>>>> am
> >>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>> aming for latencies  on the scale of leader failovers.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Consumers have to explicitly adapt the new partitioning
> >>>>>>>>>>>>>>>> scheme
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> above scenario. The reason is that in these cases where
> you
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> dependent
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> on a particular partitioning scheme, you also have other
> >>>>>>>>>>>>>>>> topics
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> co-partition enforcements or the kind -frequently. Therefore
> >>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> input topics might need to grow accordingly.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What I was suggesting was to streamline all these
> operations
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>> best
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>> possible to have "real" partition grow and shrinkage going
> >>>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Migrating
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> the producers to a new partitioning scheme can be much
> more
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> streamlined
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> with proper broker support for this. Migrating consumer is a
> >>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> might be made completly unnecessary if - for example
> streams
> >>>>>>>>>>>>>>>> -
> >>>>>>>>>>>>>>>> takes
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1.
> >>>>>>>>>>>>>>>> Connect
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> consumers
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> and other consumers should be fine anyways.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I hope this makes more clear where I was aiming at. The
> rest
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> figured out. The only danger i see is that when we are
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> introducing
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> feature as supposed in the KIP, it wont help any people
> >>>>>>>>>>>>>> depending
> >>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> log
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> compaction.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The other thing I wanted to mention is that I believe the
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> suggestion (without copying data over) can be implemented in
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> pure
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> userland
> >>>>>>>>>>>>>>>> with a custom partitioner and a small feedbackloop from
> >>>>>>>>>>>>>>>> ProduceResponse
> >>>>>>>>>>>>>>>> =>
> >>>>>>>>>>>>>>>> Partitionier in coorporation with a change management
> >>>>>>>>>>>>>>>> system.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best Jan
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hey Jan,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I am not sure if it is acceptable for producer to be
> stopped
> >>>>>>>>>>>>>>>> for a
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> while,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> particularly for online application which requires low
> >>>>>>>>>>>>>>>>> latency. I
> >>>>>>>>>>>>>>>>> am
> >>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>> not sure how consumers can switch to a new topic. Does
> user
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> application
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> needs to explicitly specify a different topic for
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> producer/consumer
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> subscribe to? It will be helpful for discussion if you can
> >>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> detail on the interface change for this solution.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> com
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> just want to throw my though in. In general the
> functionality
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> usefull, we should though not try to find the
> architecture
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> hard
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>> implementing.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The manual steps would be to
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> create a new topic
> >>>>>>>>>>>>>>>>>> the mirrormake from the new old topic to the new topic
> >>>>>>>>>>>>>>>>>> wait for mirror making to catch up.
> >>>>>>>>>>>>>>>>>> then put the consumers onto the new topic
> >>>>>>>>>>>>>>>>>>             (having mirrormaker spit out a mapping from
> >>>>>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>> offsets to
> >>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>> offsets:
> >>>>>>>>>>>>>>>>>>                 if topic is increased by factor X there
> is
> >>>>>>>>>>>>>>>>>> gonna
> >>>>>>>>>>>>>>>>>> be a
> >>>>>>>>>>>>>>>>>> clean
> >>>>>>>>>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in
> the
> >>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>> topic,
> >>>>>>>>>>>>>>>>>>                 if there is no factor then there is no
> >>>>>>>>>>>>>>>>>> chance to
> >>>>>>>>>>>>>>>>>> generate a
> >>>>>>>>>>>>>>>>>> mapping that can be reasonable used for continuing)
> >>>>>>>>>>>>>>>>>>             make consumers stop at appropriate points
> and
> >>>>>>>>>>>>>>>>>> continue
> >>>>>>>>>>>>>>>>>> consumption
> >>>>>>>>>>>>>>>>>> with offsets from the mapping.
> >>>>>>>>>>>>>>>>>> have the producers stop for a minimal time.
> >>>>>>>>>>>>>>>>>> wait for mirrormaker to finish
> >>>>>>>>>>>>>>>>>> let producer produce with the new metadata.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Instead of implementing the approach suggest in the KIP
> >>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> leave
> >>>>>>>>>>>>>>>>>> log compacted topic completely crumbled and unusable.
> >>>>>>>>>>>>>>>>>> I would much rather try to build infrastructure to
> support
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> mentioned
> >>>>>>>>>>>>>>>>>> above operations more smoothly.
> >>>>>>>>>>>>>>>>>> Especially having producers stop and use another topic
> is
> >>>>>>>>>>>>>>>>>> difficult
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> it would be nice if one can trigger "invalid metadata"
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> exceptions
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> if one could give topics aliases so that their produces
> with
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>> will arrive in the new topic.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The downsides are obvious I guess ( having the same data
> >>>>>>>>>>>>>>>>>> twice
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> transition period, but kafka tends to scale well with
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> datasize).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> its a
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> nicer fit into the architecture.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I further want to argument that the functionality by the
> KIP
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> completely be implementing in "userland" with a custom
> >>>>>>>>>>>>>>>>>> partitioner
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> handles the transition as needed. I would appreciate if
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> someone
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> point
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> out what a custom partitioner couldn't handle in this case?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> With the above approach, shrinking a topic becomes the same
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> steps.
> >>>>>>>>>>>>>>>>>> Without
> >>>>>>>>>>>>>>>>>> loosing keys in the discontinued partitions.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Would love to hear what everyone thinks.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best Jan
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have created KIP-253: Support in-order message
> delivery
> >>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> expansion. See
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confl
> >>>>>>>>>>>>>>>>>>> uence/display/KAFKA/KIP-253%
> >>>>>>>>>>>>>>>>>>> 3A+Support+in-order+message+de
> >>>>>>>>>>>>>>>>>>> livery+with+partition+expansio
> >>>>>>>>>>>>>>>>>>> n
> >>>>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> This KIP provides a way to allow messages of the same
> key
> >>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>> producer to be consumed in the same order they are
> >>>>>>>>>>>>>>>>>>> produced
> >>>>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> expand partition of the topic.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message