kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ray Chiang <rchi...@apache.org>
Subject Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion
Date Mon, 09 Apr 2018 18:18:56 GMT
My notes from today's meeting.  Sorry if I got anyone's name wrong. Plus 
I missed a few moments with noise at home and/or dropped video.

-Ray

=====

KIP-253 Discussion

- Currently, adding partitions can cause keys to be read out-of-order.
   This KIP is trying to preserve the key ordering when adding partitions.

- State management in applications (i.e. Kafka Streams) can maintain
   local state via caching.  If the number of partitions changes, how
   would those applications update their local state.  This is the current
   point of discussion/disagreement.

- Jan Filipiak is mostly worried about log compacted topics.  Not as
   concerned about producer swapping.  Worried about the consumer design is
   a bit contradictory compared to the architecture.

   Current design is to start up a new consumer in parallel with old
   topic/consumer.  Run until consumer finishes "copying" to the new topic.
   Once the consumer is caught up, point the producer at the new topic.

   Would like to have this technique as a "core primitive" to Kafka.
   - Is this a useful primitive?
   - What's the best way to support it?

   - Topic expansion as it currently exists just "adds partitions". But
     how does this affect bootstrapping applications?  How to deal with
     "moved" (from "old partition" to "new expanded partition") keys?

   - Dong's proposal example.  10 partitions growing to 15.  5 of the
     first 10 partitions are split into 2 each.  Say Kafka remembers
     parent->child relationship.  Then for each parent partition, there
     are two child partitions.  Initially, there were 10 states to
     manage.  Then bootstrapping new application would have 15 states.
     Need to know which "generation" of partition you are consuming
     from.  Until you get to "newer" generation of data, then the keys
     will be find (i.e. reading from old partition).

   - Scheme works well for transient events.  Any stateful processor will
     likely break.

   - Tracking can become extremely complicated, since each split requires
     potentially more and more offset/partition combos.

   - Need to support continuation for consumers to read the new partitions.

   - With linear hashing, integral multiple increase (2x, 3x, 4x, etc).
     Easier mapping from old partition sets to new partition sets.
     Keys end up with a clean hierarchy instead of a major reshuffling.

   - Dong's approach piggyback on existing leader epoch.  Log segment
     could be tagged with version in linear hashing case.

   - In Jan's case, existing consumers bootstrap from the beginning.

   - James' use case.  Using Kafka as a long term persistent data store.
     Putting "source of truth" information into Kafka.  Bootstrap case
     is very important.  New applications could be bootstrapping as they
     come up.

     - Increasing partitions will help with load from prodcuer and
       increasing consumer parallelism.
     - How does Kinesis handling partition splits?  They don't have
       compacted logs, so no issue with bootstrapping.  Kinesis uses
       MD5 and splits results based on md5sum into bucket ranges.
     - Is it useful for the server to know the partitioning function?
       Consumer has some implicit assumptions about keyed partitions,
       but not strongly enforced on server side.

     - KIP-213 (one to many joins in Kafka Streams)

       - MySQL case.  Primary key forced to be used as Kafka key.

         (Sorry had some audio and video drop at this point)

       - Mirror Maker.  Source cluster has custom partitioning function.
         Producer won't duplicate to same partitioning setup as source.
         Need to provide same partitioning function to producer.
         Would need to determine partitioning function based on topic.

         - Should server validate partitioning?
         - Who does actual determination of which key goes to which 
partition.

       - How to implement backfill?

         - Who will do it?  In producer?  Hard to do.  Every client would
           need to add this functionality.  Better to do on server side.
         - Add a type of "copy consumer"?  Add backoff to producer?
           Benefit of doing in consumer vs. producer?

   - Still TBD
     - How to dedupe control messages?
     - How to deal with subtle cases during transition?
     - Is it useful for the server to have the option to validate the key
       distribution?
     - Jan concerned about how a consumer application would look with the
       new "split partition" design.
     - KIP introduced callback.  Jan doesn't think is useful. Callback
       for switching "between Partition 1 and can start on Partition 11".
       Rely on marker in Partition 1 instead.  Intent for callback is
       for possibility that delivery of messages for given key is moved
       to a different consumer instance.



On 4/6/18 9:44 AM, Dong Lin wrote:
> Hey John,
>
> Thanks much for your super-detailed explanation. This is very helpful.
>
> Now that I have finished reading through your email, I think the proposed
> solution in my previous email probably meets the requirement #6 without
> requiring additional coordination (w.r.t. partition function) among
> clients. My understanding of requirement #6 is that, after partition
> expansion, messages with the given key will go to the same consumer before
> and after the partition expansion such that stream processing jobs won't be
> affected. Thus this approach seems to be better than backfilling since it
> does not require data copy for input topics.
>
> In order for the proposed solution to meet requirements #6, we need two
> extra requirements in addition to what has been described in the previous
> email: 1) stream processing job starts with the same number of processors
> as the initial number of partitions of the input topics; and 2) at any
> given time the number of partitions of the input topic >= the number of
> processors of the given stream processing job.
>
> Could you take a look at the proposed solution and see if any of the claims
> above is false?
>
>
> Hey Jan,
>
> Maybe it is more efficient for us to discuss your concern in the KIP
> Meeting.
>
>
> Thanks,
> Dong
>
>
> On Thu, Mar 29, 2018 at 2:05 PM, John Roesler <john@confluent.io> wrote:
>
>> Hi Jun,
>>
>> Thanks for the response. I'm very new to this project, but I will share my
>> perspective. I'm going to say a bunch of stuff that I know you know
>> already, but just so we're on the same page...
>>
>> This may also be a good time to get feedback from the other KStreams folks.
>>
>> Using KStreams as a reference implementation for how stream processing
>> frameworks may interact with Kafka, I think it's important to eschew
>> knowledge about how KStreams currently handles internal communication,
>> making state durable, etc. Both because these details may change, and
>> because they won't be shared with other stream processors.
>>
>> =================================
>> Background
>>
>> We are looking at a picture like this:
>>
>>       input input input
>>           \   |   /
>>        +-------------+
>> +-----+ Consumer(s) +-------+
>> |     +-------------+       |
>> |                           |
>> |    KStreams Application   |
>> |                           |
>> |     +-------------+       |
>> +-----+ Producer(s) +-------+
>>        +-------------+
>>             /    \
>>          output output
>>
>> The inputs and outputs are Kafka topics (and therefore have 1 or more
>> partitions). We'd have at least 1 input and 0 or more outputs. The
>> Consumers and Producers are both the official KafkaConsumer and
>> KafkaProducer.
>>
>> In general, we'll assume that the input topics are provided by actors over
>> which we have no control, although we may as well assume they are friendly
>> and amenable to requests, and also that their promises are trustworthy.
>> This is important because we must depend on them to uphold some promises:
>> * That they tell us the schema of the data they publish, and abide by that
>> schema. Without this, the inputs are essentially garbage.
>> * That they tell us some defining characteristics of the topics (more on
>> this in a sec.) and again strictly abide by that promise.
>>
>> What are the topic characteristics we care about?
>> 1. The name (or name pattern)
>> 2. How the messages are keyed (if at all)
>> 3. Whether the message timestamps are meaningful, and if so, what their
>> meaning is
>> 4. Assuming the records have identity, whether the partitions partition the
>> records' identity space
>> 5. Whether the topic completely contains the data set
>> 6. Whether the messages in the topic are ordered
>>
>> #1 is obvious: without this information, we cannot access the data at all.
>>
>> For #2, #3, #4, and #6, we may or may not need this information, depending
>> on the logic of the application. For example, a trivial application that
>> simply counts all events it sees doesn't care about #2, #3, #4, or #6. But
>> an application that groups by some attribute can take advantage of #2 and
>> #4 if the topic data is already keyed and partitioned over that attribute.
>> Likewise, if the application includes some temporal semantics on a temporal
>> dimension that is already captured in #3, it may take advantage of that
>> fact.
>>
>> Note that #2, #3, #4, and #6 are all optional. If they are not promised, we
>> can do extra work inside the application to accomplish what we need.
>> However, if they are promised (and if we depend on that promise), it is
>> essential that the topic providers uphold those promises, as we may not be
>> in a position to verify them.
>>
>> Note also that if they make a promise, but it doesn't happen to line up
>> with our needs (data is keyed by attr1, but we need it by attr2, or
>> timestamp is produce-time, but we need it by event-time, etc.), then we
>> will have to go ahead and do that extra work internally anyway. This also
>> captures the situation in which two inputs are produced by different
>> providers, one of which meets our needs, and the other does not. The fact
>> that we can cope with this situation is the basis for my statement that we
>> do not require coordination among producers.
>>
>> (Key Point A): In terms of optimization, #4 and #6 are the most valuable.
>> If these characteristics happen to line up with our needs, then KStreams
>> can be incredibly efficient in both time and computational resources.
>>
>>   #5 is similar to knowing the schema in that it tells us whether the
>> computation we want to do is possible or not. For example, suppose we have
>> a topic of "users", and we want to construct a table for querying. If the
>> user topic doesn't completely contain the dataset, we cannot construct the
>> table. Note that it doesn't matter whether the topic is compacted or not.
>> If the topic is complete, I can consume it starting at "earliest" and build
>> my table. If it is not complete, I can do other computations on it. In both
>> cases, it may or may not be compacted; it just doesn't matter.
>>
>> On the output side, the roles are reversed. We provide (or not) exactly the
>> same set of guarantees to consumers of our outputs, and we likewise must
>> abide by the promises we make.
>>
>>
>> =================================
>> Partition Expansion
>>
>> With this formation in place, let's talk about partition expansion.
>>
>> Why do we have partitions in the first place? (let me know if I miss
>> something here)
>> * For logical data streams that are themselves partitionable, it allows
>> producers to operate concurrently without coordination. For example,
>> streaming data from a sensor in a particle accelerator, the sensor can be
>> subdivided into a grid and each grid square can produce independently to a
>> different topic. This may be valuable because the total rate of data
>> exceeds the throughput to a single broker node or just because it allows
>> for failure of a single producer to cause the loss of only part of the
>> data.
>> * The brokers can offer linearly scaling throughput on the number of
>> partitions by hosting each partition on a separate broker node
>> * The brokers can host topics that are too large to fit on a single
>> broker's storage by hosting some partitions on separate broker nodes
>> * In cases where the use case permits handling partitions independently,
>> consumers can have algorithmic simplicity by processing the data for
>> separate partitions in separate threads, avoiding costly and error-prone
>> concurrent coordination code
>> * In cases where the use case permits handling partitions independently,
>> consumers can exceed the total throughput of a single broker-consumer pair
>> * Just to throw this in as well, in cases where some network links are less
>> costly than others (or lower latency or more reliable), such as when
>> brokers, producers, and consumers are running in racks: producer and
>> consumers can both benefit (independently) by locating work on each
>> partition in the same rack as the broker hosting that partition.
>>
>> In other words, we have three actors in this system: producers, brokers,
>> and consumers, and they all benefit from partitioning for different (but
>> sometimes related) reasons.
>>
>> This leads naturally to the conclusion that any of these actors may find
>> themselves in a sub-optimal or even dangerous situation in which partition
>> expansion would be the solution. For example, the producer may find that
>> the existing throughput to the brokers is insufficient to match the data
>> rate, forcing them to drop data. Or a broker hosting a single partition may
>> be running out of disk space. Or a consumer node handling a single
>> partition cannot match the rate of production for that partition, causing
>> it to fall behind.
>>
>> I think it's reasonable to assume that all the actors in the system can't
>> just arbitrarily expand a topic's partition. I think it's reasonable to
>> align this responsibility with the provider of the data, namely the
>> producer (the logical producer, not the KafkaProducer class). Therefore,
>> the producer who finds themselves in trouble can unilaterally expand
>> partitions to solve their problem.
>>
>> For the broker or a consumer in trouble, they have only one resort: to
>> request the producer to expand partitions. This is where it's helpful to
>> assume the producer is friendly.
>>
>>
>> Now, let's look at how a KStreams application fits into this scenario.
>>
>> (Key Point B1): As a consumer, we may find that the producer expands the
>> partitions of a topic, either for their own benefit or for the brokers. In
>> this situation, the expand operation MUST NOT violate any promises that
>> have previously been made to us. This is the essence of KIP-253, to ensure
>> the maintenance of promises #6 and #4. It would be great if the mechanics
>> of the expansion required no major disruption to processing or human
>> intervention.
>>
>> Specifically, let's say that input partition X splits into X1 and X2. #6
>> requires that the same old ordering guarantees of Kafka continue to hold.
>> Obviously, this is what KIP-253's title is about. #4 requires that we
>> either ensure that X1 and X2 are assigned to the same thread that was
>> previously assigned X OR that we immediately pause processing and split any
>> state such that it appears X1 and X2 were *always* separate partitions.
>>
>> In other words, Option 1 is we treat X1 and X2 as still logically one
>> partition, equal to X. This is ideal, since in this scenario, partitions
>> are expanding for external reasons. We don't need to expand our processing
>> to match. Option 2 requires a major disruption, since we'd have to pause
>> processing while we split our state. Clearly, KStreams or any other
>> stateful consumer would advocate for Option 1.
>>
>>
>> (Corollary to Key Point A): Still on the consumer side, we may find that we
>> ourselves can benefit from partition expansion of an input. Since we can
>> cope with the absence of promise #4, partition expansion is not a hard
>> requirement for us, but assuming we were already benefiting from the major
>> performance optimizations afforded by #4, it would be nice to be able to
>> request the producer satisfy our request for partition expansion **and to
>> be able to benefit from it**.
>>
>> What does it mean to be able to benefit from partition expansion? Assuming
>> input topic partition X splits into X1 and X2, in this scenario, we *would*
>> wish to be able to split our state such that it appears X1 and X2 were
>> *always* separate partitions. Of course, the conclusion of Key Point B1
>> still applies: we should be able to continue operating on (X1+X2 = X) as
>> one partition while asynchronously building the state of X1 and X2
>> separately.
>>
>> When it comes to the mechanics of building the state of X1 and X2
>> separately, we have really just two high-level options. Either this problem
>> is solved by Kafka itself, giving me a view in which X1 and X2 were always
>> separate partitions, or I have to do it myself. The latter means that I
>> have to take on substantially more complexity than I do today:
>> Bummer 1: My state has to be splittable to begin with, implying at the
>> least that I need to be able to scan every record in my state, a
>> requirement that otherwise does not exist.
>> Bummer 2: After splitting the state  of X1 and X2, I need to be able to
>> send at least one of those tasks, state included, to another application
>> node (in order to realize the benefit of the split). This is also a
>> requirement that does not currently exist.
>> Bummer 3: In order to actually perform the split, I must know and be able
>> to execute the exact same partition function the producer of my topic uses.
>> This introduces a brand-new a priori commitment from my input producers:
>> (would-be #7: convey the partition function and abide by it). This is a big
>> restriction over #4, which only requires them to guarantee *that there is a
>> partition function*. Now they actually have to share the function with me.
>> And I have to be able to implement and execute it myself. And if the
>> producer wishes to refine the partition function for an existing topic, we
>> have another round of coordination, as they have to be sure that I, and all
>> other consumers, begin using the new function *before* they do. This is
>> similar to the schema problem, with a similar set of solutions. We would
>> likely need a partition function registry and another magic byte on every
>> record to be sure we do this right. Not to mention some way to express
>> arbitrary partitioning logic over arbitrary data in a way that is portable
>> across programming languages.
>>
>> Alternatively, if Kafka gives me a view in which X1 and X2 were always
>> separate, then I can create tasks for X1 and X2 and allow them to bootstrap
>> while I continue to process X. Once they are ready, I can coordinate a
>> transition to stop X's task and switch to X1 and X2. None of those bummers
>> are present, so this is a significantly better option for me.
>>
>> (Key Point B2): As a (friendly) producer, we may once again want on our own
>> to expand partitions, or we may want to satisfy a request from the broker
>> or our consumers to do so. Again, we MUST NOT violate any promises we have
>> previously given, and it would be great if the expansion required no major
>> disruption to processing or human intervention. Additionally, since we are
>> actually driving the expansion, it would also be great if we could avoid
>> Bummer 3's coordination problems from the producer's perspective.
>>
>>
>> ======================================
>> Briefly: KStreams internals
>>
>> I'm pretty sure you were asking me to comment on the implementation details
>> of KStreams, so I'll say a few words about it. The most important thing is
>> that KStreams is still very early in its development. Maybe "early-middle
>> maturity" is a good way to put it. We are actively discussing more-or-less
>> major implementation changes to improve performance, footprint,
>> scalability, and ergonomics. So it may actually be misleading to discuss
>> deeply how KStreams internally uses Kafka topics.
>>
>> Nevertheless, it is currently true that KStreams uses Kafka topics for
>> communication between some internal computation nodes. We partition these
>> topics as the base unit of concurrency granularity, so it would potentially
>> be beneficial to be able to expand partitions for some of these internal
>> topics at some point. However, we can alternatively just overpartition
>> these internal topics, creating in the low 100s of partitions instead of
>> the low 10s, for example. (Side note: if Kafka were someday to support
>> higher numbers of partitions, we could expand this scheme to overpartition
>> in the 1000s of partitions.) With the option to overpartition, we don't
>> have a strong need for partition expansion internally.
>>
>> It is also currently true that KStreams uses Kafka to store a durable
>> changelog for some of our internal state stores. But we *only* read from
>> this topic *if* we need to restore a state store after node loss (or to
>> maintain a hot mirror of the state store), so I think it's unlikely that we
>> would ever make use of partition expansion on the changelog topics.
>>
>> But once again, I'd like to emphasize that we may choose an alternative
>> implementation for either interprocess communication or state durability.
>>
>>
>> ======================================
>> Concluding thoughts
>>
>> I know this is a very long email, and I really appreciate you sticking with
>> me this long. I hope it was useful for syncing our mental picture of this
>> system. Also, you're far more knowledgeable than I am about this system and
>> this domain, so please correct me if I've said anything wrong.
>>
>> To me the key takeaways are that:
>> - KIP-253 satisfies all we need for correctness, since it contains
>> solutions to guarantee producers can abide by their promises w.r.t. #4 and
>> #6.
>> - From Key Point A: #4 is actually optional for KIP-253, but without it, we
>> lose a potentially valuable optimization in KStreams (and all other
>> consumer applications)
>> - From Corollary to Point A: Without low-level support for partition
>> expansion with backfill, we cannot employ requesting partition expansion as
>> a consumer to improve application performance. In that case, to ensure
>> performance scalability, we would have to discard for all KStreams
>> applications the performance optimization afforded by #4.
>> - From Key Point B1: After a partition split, we really need to be able to
>> seamlessly continue operating as if it had not split.
>> - From Key Point B2: Since KIP-253 allows us to maintain all our promises,
>> we have the option of expanding partitions in the topics we produce.
>> Without a backfill operation, though, our consumers may not be able to
>> realize the benefits of that split, if they were hoping to.
>>
>> In general, faced with the possibility of having to coordinate the
>> partition function with our inputs' producers or with our outputs'
>> consumers, I would personally lean toward overprovisioning and completely
>> avoiding resize for our use case. This doesn't mean that it's not useful in
>> the ecosystem at large without backfill, just that it loses its luster for
>> me. It also means that we can no longer take advantage of some of our
>> current optimizations, and in fact that we must introduce an extra hop of
>> repartitioning on every single input.
>>
>> I think this is actually a pretty good picture of the opportunities and
>> challenges that other consumers and producers in the Kafka ecosystem will
>> face.
>>
>> I hope this helps!
>>
>> Thanks,
>> -John
>>
>> On Wed, Mar 28, 2018 at 11:51 AM, Jun Rao <jun@confluent.io> wrote:
>>
>>> Hi, John,
>>>
>>> I actually think it's important to think through how KStreams handles
>>> partition expansion in this KIP. If we do decide that we truly need
>>> backfilling, it's much better to think through how to add it now, instead
>>> of retrofitting it later. It would be useful to outline how both existing
>>> KStreams jobs and new KStreams jobs work to see if backfilling is really
>>> needed.
>>>
>>> If we can figure out how KStreams works, at least we have one reference
>>> implementation for other stream processing frameworks that face the same
>>> issue.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Tue, Mar 27, 2018 at 4:56 PM, John Roesler <john@confluent.io> wrote:
>>>
>>>> Hi Jun,
>>>>
>>>> That's a good point.
>>>>
>>>> Yeah, I don't think it would work too well for existing consumers in
>> the
>>>> middle of gen 0 to try and switch to a newly backfilled prefix of gen
>> 1.
>>>> They probably just need to finish up until they get to the end of gen 0
>>> and
>>>> transition just as if there were no backfill available yet.
>>>>
>>>> This isn't terrible, since consumer applications that care about
>> scaling
>>> up
>>>> to match a freshly split partition would wait until after the backfill
>> is
>>>> available to scale up. The consumer that starts out in gen=0, part=0 is
>>>> going to be stuck with part=0 and part=3 in gen=1 in my example
>>> regardless
>>>> of whether they finish scanning gen=0 before or after the backfill is
>>>> available.
>>>>
>>>> The broker knowing when it's ok to delete gen 0, including its offset
>>>> mappings, is a big issue, though. I don't have any immediate ideas for
>>>> solving it, but it doesn't feel impossible. Hopefully, you agree this
>> is
>>>> outside of KIP-253's scope, so maybe we don't need to worry about it
>>> right
>>>> now.
>>>>
>>>> I do agree that reshuffling in KStreams effectively solves the
>>> scalability
>>>> problem as well, as it decouples the partition count (and the partition
>>>> scheme) upstream from the parallelism of the streams application.
>> Likely,
>>>> we will do this in any case. I'm predominantly advocating for follow-on
>>>> work to enable backfill for the *other* Kafka users that are not
>>> KStreams.
>>>> Thanks for your consideration,
>>>> -John
>>>>
>>>> On Tue, Mar 27, 2018 at 6:19 PM, Jun Rao <jun@confluent.io> wrote:
>>>>
>>>>> Hi, John,
>>>>>
>>>>> Thanks for the reply. I agree that the backfill approach works
>> cleaner
>>>> for
>>>>> newly started consumers. I am just not sure if it's a good primitive
>> to
>>>>> support for existing consumers. One of the challenges that I see is
>> the
>>>>> remapping of the offsets. In your approach, we need to copy the
>>> existing
>>>>> records from the partitions in generation 0 to generation 1. Those
>>>> records
>>>>> will get different offsets in the new generation. The broker will
>> have
>>> to
>>>>> store those offset mappings somewhere. When the backfill completes,
>> you
>>>> can
>>>>> delete generation 0's data. However, the broker can't throw away the
>>>> offset
>>>>> mappings immediately since it doesn't know if there is any existing
>>>>> consumer still consuming generation 0's records. In a compacted
>> topic,
>>>> the
>>>>> broker probably can only safely remove the offset mappings when all
>>>> records
>>>>> in generation 0 are removed by the cleaner. This may never happen
>>> though.
>>>>> If we reshuffle the input inside a KStreams job, it obviates the need
>>> for
>>>>> offset remapping on the broker.
>>>>>
>>>>> Jun
>>>>>
>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <john@confluent.io>
>>>> wrote:
>>>>>> Hey Dong and Jun,
>>>>>>
>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix my
>>>>> replies
>>>>>> together to try for a coherent response. I'm not too familiar with
>>>>>> mailing-list etiquette, though.
>>>>>>
>>>>>> I'm going to keep numbering my points because it makes it easy for
>>> you
>>>>> all
>>>>>> to respond.
>>>>>>
>>>>>> Point 1:
>>>>>> As I read it, KIP-253 is *just* about properly fencing the
>> producers
>>>> and
>>>>>> consumers so that you preserve the correct ordering of records
>> during
>>>>>> partition expansion. This is clearly necessary regardless of
>> anything
>>>>> else
>>>>>> we discuss. I think this whole discussion about backfill,
>> consumers,
>>>>>> streams, etc., is beyond the scope of KIP-253. But it would be
>>>> cumbersome
>>>>>> to start a new thread at this point.
>>>>>>
>>>>>> I had missed KIP-253's Proposed Change #9 among all the details...
>> I
>>>>> think
>>>>>> this is a nice addition to the proposal. One thought is that it's
>>>>> actually
>>>>>> irrelevant whether the hash function is linear. This is simply an
>>>>> algorithm
>>>>>> for moving a key from one partition to another, so the type of hash
>>>>>> function need not be a precondition. In fact, it also doesn't
>> matter
>>>>>> whether the topic is compacted or not, the algorithm works
>>> regardless.
>>>>>> I think this is a good algorithm to keep in mind, as it might
>> solve a
>>>>>> variety of problems, but it does have a downside: that the producer
>>>> won't
>>>>>> know whether or not K1 was actually in P1, it just knows that K1
>> was
>>> in
>>>>>> P1's keyspace before the new epoch. Therefore, it will have to
>>>>>> pessimistically send (K1,null) to P1 just in case. But the next
>> time
>>> K1
>>>>>> comes along, the producer *also* won't remember that it already
>>>> retracted
>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
>> extension,
>>>>> every
>>>>>> time the producer sends to P2, it will also have to send a
>> tombstone
>>> to
>>>>> P1,
>>>>>> which is a pretty big burden. To make the situation worse, if there
>>> is
>>>> a
>>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging
>> to
>>> P3
>>>>>> will also have to be retracted from P2 *and* P1, since the producer
>>>> can't
>>>>>> know whether Kx had been last written to P2 or P1. Over a long
>> period
>>>> of
>>>>>> time, this clearly becomes a issue, as the producer must send an
>>>>> arbitrary
>>>>>> number of retractions along with every update.
>>>>>>
>>>>>> In contrast, the proposed backfill operation has an end, and after
>> it
>>>>> ends,
>>>>>> everyone can afford to forget that there ever was a different
>>> partition
>>>>>> layout.
>>>>>>
>>>>>> Really, though, figuring out how to split compacted topics is
>> beyond
>>>> the
>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in
>> this
>>>>> KIP...
>>>>>> We do need in-order delivery during partition expansion. It would
>> be
>>>> fine
>>>>>> by me to say that you *cannot* expand partitions of a log-compacted
>>>> topic
>>>>>> and call it a day. I think it would be better to tackle that in
>>> another
>>>>>> KIP.
>>>>>>
>>>>>>
>>>>>> Point 2:
>>>>>> Regarding whether the consumer re-shuffles its inputs, this is
>> always
>>>> on
>>>>>> the table; any consumer who wants to re-shuffle its input is free
>> to
>>> do
>>>>> so.
>>>>>> But this is currently not required. It's just that the current
>>>> high-level
>>>>>> story with Kafka encourages the use of partitions as a unit of
>>>>> concurrency.
>>>>>> As long as consumers are single-threaded, they can happily consume
>> a
>>>>> single
>>>>>> partition without concurrency control of any kind. This is a key
>>> aspect
>>>>> to
>>>>>> this system that lets folks design high-throughput systems on top
>> of
>>> it
>>>>>> surprisingly easily. If all consumers were instead
>>> encouraged/required
>>>> to
>>>>>> implement a repartition of their own, then the consumer becomes
>>>>>> significantly more complex, requiring either the consumer to first
>>>>> produce
>>>>>> to its own intermediate repartition topic or to ensure that
>> consumer
>>>>>> threads have a reliable, high-bandwith channel of communication
>> with
>>>>> every
>>>>>> other consumer thread.
>>>>>>
>>>>>> Either of those tradeoffs may be reasonable for a particular user
>> of
>>>>> Kafka,
>>>>>> but I don't know if we're in a position to say that they are
>>> reasonable
>>>>> for
>>>>>> *every* user of Kafka.
>>>>>>
>>>>>>
>>>>>> Point 3:
>>>>>> Regarding Jun's point about this use case, "(3) stateful and
>>>> maintaining
>>>>>> the
>>>>>> states in a local store", I agree that they may use a framework
>>> *like*
>>>>>> Kafka Streams, but that is not the same as using Kafka Streams.
>> This
>>> is
>>>>> why
>>>>>> I think it's better to solve it in Core: because it is then solved
>>> for
>>>>>> KStreams and also for everything else that facilitates local state
>>>>>> maintenance. To me, Streams is a member of the category of "stream
>>>>>> processing frameworks", which is itself a subcategory of "things
>>>>> requiring
>>>>>> local state maintenence". I'm not sure if it makes sense to assert
>>> that
>>>>>> Streams is a sufficient and practical replacement for everything in
>>>>> "things
>>>>>> requiring local state maintenence".
>>>>>>
>>>>>> But, yes, I do agree that per-key ordering is an absolute
>>> requirement,
>>>>>> therefore I think that KIP-253 itself is a necessary step.
>> Regarding
>>>> the
>>>>>> coupling of the state store partitioning to the topic partitioning,
>>>> yes,
>>>>>> this is an issue we are discussing solutions to right now. We may
>> go
>>>>> ahead
>>>>>> and introduce an overpartition layer on our inputs to solve it, but
>>>> then
>>>>>> again, if we get the ability to split partitions with backfill, we
>>> may
>>>>> not
>>>>>> need to!
>>>>>>
>>>>>>
>>>>>> Point 4:
>>>>>> On this:
>>>>>>
>>>>>>> Regarding thought 2: If we don't care about the stream use-case,
>>> then
>>>>> the
>>>>>>> current KIP probably has already addressed problem without
>>> requiring
>>>>>>> consumer to know the partition function. If we care about the
>>> stream
>>>>>>> use-case, we already need coordination across producers of
>>> different
>>>>>>> topics, i.e. the same partition function needs to be used by
>>>> producers
>>>>> of
>>>>>>> topics A and B in order to join topics A and B. Thus, it might be
>>>>>>> reasonable to extend coordination a bit and say we need
>>> coordination
>>>>>> across
>>>>>>> clients (i.e. producer and consumer), such that consumer knows
>> the
>>>>>>> partition function used by producer. If we do so, then we can let
>>>>>> consumer
>>>>>>> re-copy data for the change log topic using the same partition
>>>> function
>>>>>> as
>>>>>>> producer. This approach has lower overhead as compared to having
>>>>> producer
>>>>>>> re-copy data of the input topic.
>>>>>>> Also, producer currently does not need to know the data already
>>>>> produced
>>>>>> to
>>>>>>> the topic. If we let producer split/merge partition, it would
>>> require
>>>>>>> producer to consume the existing data, which intuitively is the
>>> task
>>>> of
>>>>>>> consumer.
>>>>>>
>>>>>> I think we do care about use cases *like* Streams, I just don't
>> think
>>>> we
>>>>>> should rely on Streams to implement a feature of Core like
>> partition
>>>>>> expansion.
>>>>>>
>>>>>> Note, though, that we (Streams) do not require coordination across
>>>>>> producers. If two topics are certified to be co-partitioned, then
>>>> Streams
>>>>>> apps can make use of that knowledge to optimize their topology
>>>> (skipping
>>>>> a
>>>>>> repartition). But if they don't know whether they are
>> co-partitioned,
>>>>> then
>>>>>> they'd better go ahead and repartition within the topology. This is
>>> the
>>>>>> current state.
>>>>>>
>>>>>> A huge selling point of Kafka is enabling different parts of
>> loosely
>>>>>> coupled organizations to produce and consume data independently.
>> Some
>>>>>> coordination between producers and consumers is necessary, like
>>>>>> coordinating on the names of topics and their schemas. But Kafka's
>>>> value
>>>>>> proposition w.r.t. ESBs, etc. is inversely proportional to the
>> amount
>>>> of
>>>>>> coordination required. I think it behooves us to be extremely
>>> skeptical
>>>>>> about introducing any coordination beyond correctness protocols.
>>>>>>
>>>>>> Asking producers and consumers, or even two different producers, to
>>>> share
>>>>>> code like the partition function is a pretty huge ask. What if they
>>> are
>>>>>> using different languages?
>>>>>>
>>>>>> Comparing organizational overhead vs computational overhead, there
>>> are
>>>>>> maybe two orders of magnitude difference between them. In other
>>> words,
>>>> I
>>>>>> would happily take on the (linear) overhead of having the producer
>>>>> re-copy
>>>>>> the data once during a re-partition in order to save the
>>> organizational
>>>>>> overhead of tying all the producers and consumers together across
>>>>> multiple
>>>>>> boundaries.
>>>>>>
>>>>>> On that last paragraph: note that the producer *did* know the data
>> it
>>>>>> already produced. It handled it the first time around. Asking it to
>>>>>> re-produce it into a new partition layout is squarely within its
>>> scope
>>>> of
>>>>>> capabilities. Contrast this with the alternative, asking the
>> consumer
>>>> to
>>>>>> re-partition the data. I think this is even less intuitive, when
>> the
>>>>>> partition function belongs to the producer.
>>>>>>
>>>>>>
>>>>>> Point 5:
>>>>>> Dong asked this:
>>>>>>
>>>>>>> For stream use-case that needs to increase consumer number, the
>>>>>>> existing consumer can backfill the existing data in the change
>> log
>>>>> topic
>>>>>> to
>>>>>>> the same change log topic with the new partition number, before
>> the
>>>> new
>>>>>> set
>>>>>>> of consumers bootstrap state from the new partitions of the
>> change
>>>> log
>>>>>>> topic, right?
>>>>>>
>>>>>> In this sense, the "consumer" is actually the producer of the
>>> changelog
>>>>>> topic, so if we support partition expansion + backfill as a
>>>>> producer/broker
>>>>>> operation, then it would be very straightforward for Streams to
>>> split a
>>>>>> state store. As you say, they would simply instruct the broker to
>>> split
>>>>> the
>>>>>> changelog topic's partitions, then backfill. Once the backfill is
>>>> ready,
>>>>>> they can create a new crop of StandbyTasks to bootstrap the more
>>>> granular
>>>>>> state stores and finally switch over to them when they are ready.
>>>>>>
>>>>>> But this actually seems to be an argument in favor of
>> split+backfill,
>>>> so
>>>>>> maybe I missed the point.
>>>>>>
>>>>>> You also asked me to explain why copying the "input" topic is
>> better
>>>> than
>>>>>> copying the "changelog" topic. I think they are totally
>> independent,
>>>>>> actually. For one thing, you can't depend on the existence of a
>>>>> "changelog"
>>>>>> topic in general, only within Streams, but Kafka's user base
>> clearly
>>>>>> exceeds Streams's user base. Plus, you actually also can't depend
>> on
>>>> the
>>>>>> existence of a changelog topic within Streams, since that is an
>>>> optional
>>>>>> feature of *some* state store implementations. Even in the
>> situation
>>>>> where
>>>>>> you do have a changelog topic in Streams, there may be use cases
>>> where
>>>> it
>>>>>> makes sense to expand the partitions of just the input, or just the
>>>>>> changelog.
>>>>>>
>>>>>> The ask for a Core feature of split+backfill is really about
>>> supporting
>>>>> the
>>>>>> use case of splitting partitions in log-compacted topics,
>> regardless
>>> of
>>>>>> whether that topic is an "input" or a "changelog" or anything else
>>> for
>>>>> that
>>>>>> matter.
>>>>>>
>>>>>>
>>>>>> Point 6:
>>>>>> On the concern about the performance overhead of copying data
>> between
>>>> the
>>>>>> brokers, I think it's actually a bit overestimated. Splitting a
>>> topic's
>>>>>> partition is probably rare, certainly rarer in general than
>>>> bootstrapping
>>>>>> new consumers on that topic. If "bootstrapping new consumers" means
>>>> that
>>>>>> they have to re-shuffle the data before they consume it, then you
>>> wind
>>>> up
>>>>>> copying the same record multiple times:
>>>>>>
>>>>>> (broker: input topic) -> (initial consumer) -> (broker: repartition
>>>>> topic)
>>>>>> -> (real consumer)
>>>>>>
>>>>>> That's 3x, and it's also 3x for every new record after the split as
>>>> well,
>>>>>> since you don't get to stop repartitioning/reshuffling once you
>>> start.
>>>>>> Whereas if you do a backfill in something like the procedure I
>>>> outlined,
>>>>>> you only copy the prefix of the partition before the split, and you
>>>> send
>>>>> it
>>>>>> once to the producer and then once to the new generation partition.
>>>> Plus,
>>>>>> assuming we're splitting the partition for the benefit of
>> consumers,
>>>>>> there's no reason we can't co-locate the post-split partitions on
>> the
>>>>> same
>>>>>> host as the pre-split partition, making the second copy a local
>>>>> filesystem
>>>>>> operation.
>>>>>>
>>>>>> Even if you follow these two copies up with bootstrapping a new
>>>> consumer,
>>>>>> it's still rare for this to occur, so you get to amortize these
>>> copies
>>>>> over
>>>>>> the lifetime of the topic, whereas a reshuffle just keeps making
>>> copies
>>>>> for
>>>>>> every new event.
>>>>>>
>>>>>> And finally, I really do think that regardless of any performance
>>>>> concerns
>>>>>> about this operation, if it preserves loose organizational
>> coupling,
>>> it
>>>>> is
>>>>>> certainly worth it.
>>>>>>
>>>>>>
>>>>>> In conclusion:
>>>>>> It might actually be a good idea for us to clarify the scope of
>>>> KIP-253.
>>>>> If
>>>>>> we're all agreed that it's a good algorithm for allowing in-order
>>>> message
>>>>>> delivery during partition expansion, then we can continue this
>>>> discussion
>>>>>> as a new KIP, something like "backfill with partition expansion".
>>> This
>>>>>> would let Dong proceed with KIP-253. On the other hand, if it seems
>>>> like
>>>>>> this conversation may alter the design of KIP-253, then maybe we
>>>> *should*
>>>>>> just finish working it out.
>>>>>>
>>>>>> For my part, my only concern about KIP-253 is the one I raised
>>> earlier.
>>>>>> Thanks again, all, for considering these points,
>>>>>> -John
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindong28@gmail.com>
>>> wrote:
>>>>>>> On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindong28@gmail.com>
>>>>> wrote:
>>>>>>>> Hey Jan,
>>>>>>>>
>>>>>>>> Thanks for the enthusiasm in improving Kafka's design. Now
>> that I
>>>>> have
>>>>>>>> read through your discussion with Jun, here are my thoughts:
>>>>>>>>
>>>>>>>> - The latest proposal should with log compacted topics by
>>> properly
>>>>>>>> deleting old messages after a new message with the same key is
>>>>>> produced.
>>>>>>> So
>>>>>>>> it is probably not a concern anymore. Could you comment if
>> there
>>> is
>>>>>> still
>>>>>>>> issue?
>>>>>>>>
>>>>>>>> - I wrote the SEP-5 and I am pretty familiar with the
>> motivation
>>>> and
>>>>>> the
>>>>>>>> design of SEP-5. SEP-5 is probably orthornal to the motivation
>> of
>>>>> this
>>>>>>> KIP.
>>>>>>>> The goal of SEP-5 is to allow user to increase task number of
>> an
>>>>>> existing
>>>>>>>> Samza job. But if we increase the partition number of input
>>> topics,
>>>>>>>> messages may still be consumed out-of-order by tasks in Samza
>>> which
>>>>>> cause
>>>>>>>> incorrect result. Similarly, the approach you proposed does not
>>>> seem
>>>>> to
>>>>>>>> ensure that the messages can be delivered in order, even if we
>>> can
>>>>> make
>>>>>>>> sure that each consumer instance is assigned the set of new
>>>>> partitions
>>>>>>>> covering the same set of keys.
>>>>>>>>
>>>>>>> Let me correct this comment. The approach of copying data to a
>> new
>>>>> topic
>>>>>>> can ensure in-order message delivery suppose we properly migrate
>>>>> offsets
>>>>>>> from old topic to new topic.
>>>>>>>
>>>>>>>
>>>>>>>> - I am trying to understand why it is better to copy the data
>>>> instead
>>>>>> of
>>>>>>>> copying the change log topic for streaming use-case. For core
>>> Kafka
>>>>>>>> use-case, and for the stream use-case that does not need to
>>>> increase
>>>>>>>> consumers, the current KIP already supports in-order delivery
>>>> without
>>>>>> the
>>>>>>>> overhead of copying the data. For stream use-case that needs to
>>>>>> increase
>>>>>>>> consumer number, the existing consumer can backfill the
>> existing
>>>> data
>>>>>> in
>>>>>>>> the change log topic to the same change log topic with the new
>>>>>> partition
>>>>>>>> number, before the new set of consumers bootstrap state from
>> the
>>>> new
>>>>>>>> partitions of the change log topic. If this solution works,
>> then
>>>>> could
>>>>>>> you
>>>>>>>> summarize the advantage of copying the data of input topic as
>>>>> compared
>>>>>> to
>>>>>>>> copying the change log topic? For example, does it enable more
>>>>>> use-case,
>>>>>>>> simplify the implementation of Kafka library, or reduce the
>>>> operation
>>>>>>>> overhead etc?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dong
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <
>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Jun,
>>>>>>>>>
>>>>>>>>> I was really seeing progress in our conversation but your
>> latest
>>>>> reply
>>>>>>> is
>>>>>>>>> just devastating.
>>>>>>>>> I though we were getting close being on the same page now it
>>> feels
>>>>>> like
>>>>>>>>> we are in different libraries.
>>>>>>>>>
>>>>>>>>> I just quickly slam my answers in here. If they are to brief I
>>> am
>>>>>> sorry
>>>>>>>>> give me a ping and try to go into details more.
>>>>>>>>> Just want to show that your pro/cons listing is broken.
>>>>>>>>>
>>>>>>>>> Best Jan
>>>>>>>>>
>>>>>>>>> and want to get rid of this horrible compromise
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 19.03.2018 05:48, Jun Rao wrote:
>>>>>>>>>
>>>>>>>>>> Hi, Jan,
>>>>>>>>>>
>>>>>>>>>> Thanks for the discussion. Great points.
>>>>>>>>>>
>>>>>>>>>> Let me try to summarize the approach that you are proposing.
>> On
>>>> the
>>>>>>>>>> broker
>>>>>>>>>> side, we reshuffle the existing data in a topic from current
>>>>>> partitions
>>>>>>>>>> to
>>>>>>>>>> the new partitions. Once the reshuffle fully catches up,
>> switch
>>>> the
>>>>>>>>>> consumers to start consuming from the new partitions. If a
>>>> consumer
>>>>>>> needs
>>>>>>>>>> to rebuild its local state (due to partition changes), let
>> the
>>>>>> consumer
>>>>>>>>>> rebuild its state by reading all existing data from the new
>>>>>> partitions.
>>>>>>>>>> Once all consumers have switches over, cut over the producer
>> to
>>>> the
>>>>>> new
>>>>>>>>>> partitions.
>>>>>>>>>>
>>>>>>>>>> The pros for this approach are that :
>>>>>>>>>> 1. There is just one way to rebuild the local state, which is
>>>>>> simpler.
>>>>>>>>> true thanks
>>>>>>>>>
>>>>>>>>>> The cons for this approach are:
>>>>>>>>>> 1. Need to copy existing data.
>>>>>>>>>>
>>>>>>>>> Very unfair and not correct. It does not require you to copy
>>> over
>>>>>>>>> existing data. It _allows_ you to copy all existing data.
>>>>>>>>>
>>>>>>>>> 2. The cutover of the producer is a bit complicated since it
>>> needs
>>>>> to
>>>>>>>>>> coordinate with all consumer groups.
>>>>>>>>>>
>>>>>>>>> Also not true. I explicitly tried to make clear that there is
>>> only
>>>>> one
>>>>>>>>> special consumer (in the case of actually copying data)
>>>> coordination
>>>>>> is
>>>>>>>>> required.
>>>>>>>>>
>>>>>>>>>> 3. The rebuilding of the state in the consumer is from the
>>> input
>>>>>> topic,
>>>>>>>>>> which can be more expensive than rebuilding from the existing
>>>>> state.
>>>>>>>>> true, but rebuilding state is only required if you want to
>>>> increase
>>>>>>>>> processing power, so we assume this is at hand.
>>>>>>>>>
>>>>>>>>>> 4. The broker potentially has to know the partitioning
>>> function.
>>>> If
>>>>>>> this
>>>>>>>>>> needs to be customized at the topic level, it can be a bit
>>> messy.
>>>>>>>>> I would argue against having the operation being performed by
>>> the
>>>>>>> broker.
>>>>>>>>> This was not discussed yet but if you see my original email i
>>>>>> suggested
>>>>>>>>> otherwise from the beginning.
>>>>>>>>>
>>>>>>>>>> Here is an alternative approach by applying your idea not in
>>> the
>>>>>>> broker,
>>>>>>>>>> but in the consumer. When new partitions are added, we don't
>>> move
>>>>>>>>>> existing
>>>>>>>>>> data. In KStreams, we first reshuffle the new input data to a
>>> new
>>>>>> topic
>>>>>>>>>> T1
>>>>>>>>>> with the old number of partitions and feed T1's data to the
>>> rest
>>>> of
>>>>>> the
>>>>>>>>>> pipeline. In the meantime, KStreams reshuffles all existing
>>> data
>>>> of
>>>>>> the
>>>>>>>>>> change capture topic to another topic C1 with the new number
>> of
>>>>>>>>>> partitions.
>>>>>>>>>> We can then build the state of the new tasks from C1. Once
>> the
>>>> new
>>>>>>> states
>>>>>>>>>> have been fully built, we can cut over the consumption to the
>>>> input
>>>>>>> topic
>>>>>>>>>> and delete T1. This approach works with compacted topic too.
>> If
>>>> an
>>>>>>>>>> application reads from the beginning of a compacted topic,
>> the
>>>>>> consumer
>>>>>>>>>> will reshuffle the portion of the input when the number of
>>>>> partitions
>>>>>>>>>> doesn't match the number of tasks.
>>>>>>>>>>
>>>>>>>>> We all wipe this idea from our heads instantly. Mixing Ideas
>>> from
>>>> an
>>>>>>>>> argument is not a resolution strategy
>>>>>>>>> just leads to horrible horrible software.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> The pros of this approach are:
>>>>>>>>>> 1. No need to copy existing data.
>>>>>>>>>> 2. Each consumer group can cut over to the new partitions
>>>>>>> independently.
>>>>>>>>>> 3. The state is rebuilt from the change capture topic, which
>> is
>>>>>> cheaper
>>>>>>>>>> than rebuilding from the input topic.
>>>>>>>>>> 4. Only the KStreams job needs to know the partitioning
>>> function.
>>>>>>>>>> The cons of this approach are:
>>>>>>>>>> 1. Potentially the same input topic needs to be reshuffled
>> more
>>>>> than
>>>>>>> once
>>>>>>>>>> in different consumer groups during the transition phase.
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jun
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <
>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Jun,
>>>>>>>>>>> thank you for following me on these thoughts. It was
>> important
>>>> to
>>>>> me
>>>>>>> to
>>>>>>>>>>> feel that kind of understanding for my arguments.
>>>>>>>>>>>
>>>>>>>>>>> What I was hoping for (I mentioned this earlier) is that we
>>> can
>>>>>> model
>>>>>>>>>>> the
>>>>>>>>>>> case where we do not want to copy the data the exact same
>> way
>>> as
>>>>> the
>>>>>>>>>>> case
>>>>>>>>>>> when we do copy the data. Maybe you can peek into the mails
>>>> before
>>>>>> to
>>>>>>>>>>> see
>>>>>>>>>>> more details for this.
>>>>>>>>>>>
>>>>>>>>>>> This means we have the same mechanism to transfer consumer
>>>> groups
>>>>> to
>>>>>>>>>>> switch topic. The offset mapping that would be generated
>> would
>>>>> even
>>>>>> be
>>>>>>>>>>> simpler End Offset of the Old topic => offset 0 off all the
>>>>>> partitions
>>>>>>>>>>> of
>>>>>>>>>>> the new topic. Then we could model the transition of a
>>> non-copy
>>>>>>>>>>> expansion
>>>>>>>>>>> the exact same way as a copy-expansion.
>>>>>>>>>>>
>>>>>>>>>>> I know this only works when topic growth by a factor. But
>> the
>>>>>> benefits
>>>>>>>>>>> of
>>>>>>>>>>> only growing by a factor are to strong anyways. See
>> Clemens's
>>>> hint
>>>>>> and
>>>>>>>>>>> remember that state reshuffling is entirely not needed if
>> one
>>>>>> doesn't
>>>>>>>>>>> want
>>>>>>>>>>> to grow processing power.
>>>>>>>>>>>
>>>>>>>>>>> I think these benefits should be clear, and that there is
>>>>> basically
>>>>>> no
>>>>>>>>>>> downside to what is currently at hand but just makes
>>> everything
>>>>>> easy.
>>>>>>>>>>> One thing you need to know is. that if you do not offer
>>>>> rebuilding a
>>>>>>> log
>>>>>>>>>>> compacted topic like i suggest that even if you have
>> consumer
>>>>> state
>>>>>>>>>>> reshuffling. The topic is broken and can not be used to
>>>> bootstrap
>>>>>> new
>>>>>>>>>>> consumers. They don't know if they need to apply a key from
>>> and
>>>>> old
>>>>>>>>>>> partition or not. This is a horrible downside I haven't
>> seen a
>>>>>>> solution
>>>>>>>>>>> for
>>>>>>>>>>> in the email conversation.
>>>>>>>>>>>
>>>>>>>>>>> I argue to:
>>>>>>>>>>>
>>>>>>>>>>> Only grow topic by a factor always.
>>>>>>>>>>> Have the "no copy consumer" transition as the trivial case
>> of
>>>> the
>>>>>>> "copy
>>>>>>>>>>> consumer transition".
>>>>>>>>>>> If processors needs to be scaled, let them rebuild from the
>>> new
>>>>>> topic
>>>>>>>>>>> and
>>>>>>>>>>> leave the old running in the mean time.
>>>>>>>>>>> Do not implement key shuffling in streams.
>>>>>>>>>>>
>>>>>>>>>>> I hope I can convince you especially with the fact how I
>> want
>>> to
>>>>>>> handle
>>>>>>>>>>> consumer transition. I think
>>>>>>>>>>> you didn't quite understood me there before. I think the
>> term
>>>> "new
>>>>>>>>>>> topic"
>>>>>>>>>>> intimidated you a little.
>>>>>>>>>>> How we solve this on disc doesn't really matter, If the data
>>>> goes
>>>>>> into
>>>>>>>>>>> the
>>>>>>>>>>> same Dir or a different Dir or anything. I do think that it
>>>> needs
>>>>> to
>>>>>>>>>>> involve at least rolling a new segment for the existing
>>>>> partitions.
>>>>>>>>>>> But most of the transitions should work without restarting
>>>>>> consumers.
>>>>>>>>>>> (newer consumers with support for this). But with new topic
>> i
>>>> just
>>>>>>> meant
>>>>>>>>>>> the topic that now has a different partition count. Plenty
>> of
>>>> ways
>>>>>> to
>>>>>>>>>>> handle that (versions, aliases)
>>>>>>>>>>>
>>>>>>>>>>> Hope I can further get my idea across.
>>>>>>>>>>>
>>>>>>>>>>> Best Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 14.03.2018 02:45, Jun Rao wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi, Jan,
>>>>>>>>>>>> Thanks for sharing your view.
>>>>>>>>>>>>
>>>>>>>>>>>> I agree with you that recopying the data potentially makes
>>> the
>>>>>> state
>>>>>>>>>>>> management easier since the consumer can just rebuild its
>>> state
>>>>>> from
>>>>>>>>>>>> scratch (i.e., no need for state reshuffling).
>>>>>>>>>>>>
>>>>>>>>>>>> On the flip slide, I saw a few disadvantages of the
>> approach
>>>> that
>>>>>> you
>>>>>>>>>>>> suggested. (1) Building the state from the input topic from
>>>>> scratch
>>>>>>> is
>>>>>>>>>>>> in
>>>>>>>>>>>> general less efficient than state reshuffling. Let's say
>> one
>>>>>>> computes a
>>>>>>>>>>>> count per key from an input topic. The former requires
>>> reading
>>>>> all
>>>>>>>>>>>> existing
>>>>>>>>>>>> records in the input topic whereas the latter only requires
>>>>> reading
>>>>>>>>>>>> data
>>>>>>>>>>>> proportional to the number of unique keys. (2) The
>> switching
>>> of
>>>>> the
>>>>>>>>>>>> topic
>>>>>>>>>>>> needs modification to the application. If there are many
>>>>>> applications
>>>>>>>>>>>> on a
>>>>>>>>>>>> topic, coordinating such an effort may not be easy. Also,
>>> it's
>>>>> not
>>>>>>>>>>>> clear
>>>>>>>>>>>> how to enforce exactly-once semantic during the switch. (3)
>>> If
>>>> a
>>>>>>> topic
>>>>>>>>>>>> doesn't need any state management, recopying the data seems
>>>>>> wasteful.
>>>>>>>>>>>> In
>>>>>>>>>>>> that case, in place partition expansion seems more
>> desirable.
>>>>>>>>>>>> I understand your concern about adding complexity in
>>> KStreams.
>>>>> But,
>>>>>>>>>>>> perhaps
>>>>>>>>>>>> we could iterate on that a bit more to see if it can be
>>>>> simplified.
>>>>>>>>>>>> Jun
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
>>>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>>
>>>>>>>>>>>>> I will focus on point 61 as I think its _the_ fundamental
>>> part
>>>>>> that
>>>>>>> I
>>>>>>>>>>>>> cant
>>>>>>>>>>>>> get across at the moment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kafka is the platform to have state materialized multiple
>>>> times
>>>>>> from
>>>>>>>>>>>>> one
>>>>>>>>>>>>> input. I emphasize this: It is the building block in
>>>>> architectures
>>>>>>>>>>>>> that
>>>>>>>>>>>>> allow you to
>>>>>>>>>>>>> have your state maintained multiple times. You put a
>> message
>>>> in
>>>>>>> once,
>>>>>>>>>>>>> and
>>>>>>>>>>>>> you have it pop out as often as you like. I believe you
>>>>> understand
>>>>>>>>>>>>> this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Now! The path of thinking goes the following: I am using
>>>> apache
>>>>>>> kafka
>>>>>>>>>>>>> and
>>>>>>>>>>>>> I _want_ my state multiple times. What am I going todo?
>>>>>>>>>>>>>
>>>>>>>>>>>>> A) Am I going to take my state that I build up, plunge
>> some
>>>> sort
>>>>>> of
>>>>>>>>>>>>> RPC
>>>>>>>>>>>>> layer ontop of it, use that RPC layer to throw my records
>>>> across
>>>>>>>>>>>>> instances?
>>>>>>>>>>>>> B) Am I just going to read the damn message twice?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Approach A is fundamentally flawed and a violation of all
>>> that
>>>>> is
>>>>>>> good
>>>>>>>>>>>>> and
>>>>>>>>>>>>> holy in kafka deployments. I can not understand how this
>>> Idea
>>>>> can
>>>>>>>>>>>>> come in
>>>>>>>>>>>>> the first place.
>>>>>>>>>>>>> (I do understand: IQ in streams, they polluted the kafka
>>>> streams
>>>>>>>>>>>>> codebase
>>>>>>>>>>>>> really bad already. It is not funny! I think they are
>>> equally
>>>>>> flawed
>>>>>>>>>>>>> as
>>>>>>>>>>>>> A)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I say, we do what Kafka is good at. We repartition the
>> topic
>>>>> once.
>>>>>>> We
>>>>>>>>>>>>> switch the consumers.
>>>>>>>>>>>>> (Those that need more partitions are going to rebuild
>> their
>>>>> state
>>>>>> in
>>>>>>>>>>>>> multiple partitions by reading the new topic, those that
>>> don't
>>>>>> just
>>>>>>>>>>>>> assign
>>>>>>>>>>>>> the new partitions properly)
>>>>>>>>>>>>> We switch producers. Done!
>>>>>>>>>>>>>
>>>>>>>>>>>>> The best thing! It is trivial, hipster stream processor
>> will
>>>>> have
>>>>>> an
>>>>>>>>>>>>> easy
>>>>>>>>>>>>> time with that aswell. Its so super simple. And simple IS
>>>> good!
>>>>>>>>>>>>> It is what kafka was build todo. It is how we do it today.
>>>> All I
>>>>>> am
>>>>>>>>>>>>> saying
>>>>>>>>>>>>> is that a little broker help doing the producer swap is
>>> super
>>>>>>> useful.
>>>>>>>>>>>>> For everyone interested in why kafka is so powerful with
>>>>> approach
>>>>>> B,
>>>>>>>>>>>>> please watch https://youtu.be/bEbeZPVo98c?t=1633
>>>>>>>>>>>>> I already looked up a good point in time, I think after 5
>>>>> minutes
>>>>>>> the
>>>>>>>>>>>>> "state" topic is handled and you should be able to
>>> understand
>>>> me
>>>>>>>>>>>>> and inch better.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please do not do A to the project, it deserves better!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 13.03.2018 02:40, Jun Rao wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, Jan,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the reply. A few more comments below.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 50. Ok, we can think a bit harder for supporting
>> compacted
>>>>>> topics.
>>>>>>>>>>>>>> 51. This is a fundamental design question. In the more
>>> common
>>>>>> case,
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> reason why someone wants to increase the number of
>>> partitions
>>>>> is
>>>>>>> that
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> consumer application is slow and one wants to run more
>>>> consumer
>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>> to increase the degree of parallelism. So, fixing the
>>> number
>>>> of
>>>>>>>>>>>>>> running
>>>>>>>>>>>>>> consumer instances when expanding the partitions won't
>> help
>>>>> this
>>>>>>>>>>>>>> case.
>>>>>>>>>>>>>> If
>>>>>>>>>>>>>> we do need to increase the number of consumer instances,
>> we
>>>>> need
>>>>>> to
>>>>>>>>>>>>>> somehow
>>>>>>>>>>>>>> reshuffle the state of the consumer across instances.
>> What
>>> we
>>>>>> have
>>>>>>>>>>>>>> been
>>>>>>>>>>>>>> discussing in this KIP is whether we can do this more
>>>>> effectively
>>>>>>>>>>>>>> through
>>>>>>>>>>>>>> the KStream library (e.g. through a 2-phase partition
>>>>> expansion).
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> add some complexity, but it's probably better than
>> everyone
>>>>> doing
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> the application space. The recopying approach that you
>>>>> mentioned
>>>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>> seem to address the consumer state management issue when
>>> the
>>>>>>> consumer
>>>>>>>>>>>>>> switches from an old to a new topic.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 52. As for your example, it depends on whether the join
>> key
>>>> is
>>>>>> the
>>>>>>>>>>>>>> same
>>>>>>>>>>>>>> between (A,B) and (B,C). If the join key is the same, we
>>> can
>>>>> do a
>>>>>>>>>>>>>> 2-phase
>>>>>>>>>>>>>> partition expansion of A, B, and C together. If the join
>>> keys
>>>>> are
>>>>>>>>>>>>>> different, one would need to repartition the data on a
>>>>> different
>>>>>>> key
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> the second join, then the partition expansion can be done
>>>>>>>>>>>>>> independently
>>>>>>>>>>>>>> between (A,B) and (B,C).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 53. If you always fix the number of consumer instances,
>> we
>>>> you
>>>>>>>>>>>>>> described
>>>>>>>>>>>>>> works. However, as I mentioned in #51, I am not sure how
>>> your
>>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>> deals with consumer states when the number of consumer
>>>>> instances
>>>>>>>>>>>>>> grows.
>>>>>>>>>>>>>> Also, it just seems that it's better to avoid re-copying
>>> the
>>>>>>> existing
>>>>>>>>>>>>>> data.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 60. "just want to throw in my question from the longer
>>> email
>>>> in
>>>>>> the
>>>>>>>>>>>>>> other
>>>>>>>>>>>>>> Thread here. How will the bloom filter help a new
>> consumer
>>> to
>>>>>>> decide
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> apply the key or not?" Not sure that I fully understood
>>> your
>>>>>>>>>>>>>> question.
>>>>>>>>>>>>>> The
>>>>>>>>>>>>>> consumer just reads whatever key is in the log. The bloom
>>>>> filter
>>>>>>> just
>>>>>>>>>>>>>> helps
>>>>>>>>>>>>>> clean up the old keys.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 61. "Why can we afford having a topic where its
>> apparently
>>>> not
>>>>>>>>>>>>>> possible
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> start a new application on? I think this is an overall
>> flaw
>>>> of
>>>>>> the
>>>>>>>>>>>>>> discussed idea here. Not playing attention to the overall
>>>>>>>>>>>>>> architecture."
>>>>>>>>>>>>>> Could you explain a bit more when one can't start a new
>>>>>>> application?
>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Jun, thanks for your mail.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you for your questions!
>>>>>>>>>>>>>>> I think they are really good and tackle the core of the
>>>>> problem
>>>>>> I
>>>>>>>>>>>>>>> see.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I will answer inline, mostly but still want to set the
>>> tone
>>>>>> here.
>>>>>>>>>>>>>>> The core strength of kafka is what Martin once called
>> the
>>>>>>>>>>>>>>> kappa-Architecture. How does this work?
>>>>>>>>>>>>>>> You have everything as a log as in kafka. When you need
>> to
>>>>>> change
>>>>>>>>>>>>>>> something.
>>>>>>>>>>>>>>> You create the new version of your application and leave
>>> it
>>>>>>> running
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> parallel.
>>>>>>>>>>>>>>> Once the new version is good you switch your users to
>> use
>>>> the
>>>>>> new
>>>>>>>>>>>>>>> application.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The online reshuffling effectively breaks this
>>> architecture
>>>>> and
>>>>>> I
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> the switch in thinking here is more harmful
>>>>>>>>>>>>>>> than any details about the partitioning function to
>> allow
>>>>> such a
>>>>>>>>>>>>>>> change.
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> feel with my suggestion we are the closest to
>>>>>>>>>>>>>>> the original and battle proven architecture and I can
>> only
>>>>> warn
>>>>>> to
>>>>>>>>>>>>>>> move
>>>>>>>>>>>>>>> away from it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I might have forgotten something, sometimes its hard for
>>> me
>>>> to
>>>>>>>>>>>>>>> getting
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>> the thoughts captured in a mail, but I hope the comments
>>>>> inline
>>>>>>> will
>>>>>>>>>>>>>>> further make my concern clear, and put some emphasis on
>>> why
>>>> I
>>>>>>>>>>>>>>> prefer my
>>>>>>>>>>>>>>> solution ;)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One thing we should all be aware of when discussing
>> this,
>>>> and
>>>>> I
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>> should have mentioned it (maybe he did).
>>>>>>>>>>>>>>> We are not discussing all of this out of thin air but
>>> there
>>>> is
>>>>>> an
>>>>>>>>>>>>>>> effort
>>>>>>>>>>>>>>> in the Samza project.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/SAMZA/SEP-
>>> 5%3A+
>>>>>>>>>>>>>>> Enable+partition+expansion+of+input+streams
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/SAMZA-1293
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To be clear. I think SEP-5 (state of last week, dont
>> know
>>> if
>>>>> it
>>>>>>>>>>>>>>> adapted
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> this discussion) is on a way better path than KIP-253,
>>> and I
>>>>>> can't
>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>> explain why.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Jan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> nice weekend everyone
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 09.03.2018 03:36, Jun Rao wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, Jan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the feedback. Just some comments on the
>> earlier
>>>>>> points
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 50. You brought up the question of whether existing
>> data
>>>>> needs
>>>>>> to
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> copied
>>>>>>>>>>>>>>>> during partition expansion. My understand of your view
>> is
>>>>> that
>>>>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>> copying existing data will be more efficient, but it
>>>> doesn't
>>>>>> work
>>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> compacted topics since some keys in the original
>>> partitions
>>>>>> will
>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> cleaned. It would be useful to understand your use case
>>> of
>>>>>>>>>>>>>>>> compacted
>>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>> a bit more. In the common use case, the data volume in
>> a
>>>>>>> compacted
>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>> may not be large. So, I am not sure if there is a
>> strong
>>>> need
>>>>>> to
>>>>>>>>>>>>>>>> expand
>>>>>>>>>>>>>>>> partitions in a compacted topic, at least initially.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I do agree. State is usually smaller. Update rates
>> might
>>> be
>>>>>> also
>>>>>>>>>>>>>>>> competitively high.
>>>>>>>>>>>>>>> Doing Log-compaction (even beeing very efficient and
>>>>>> configurable)
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>> a more expensive operation than
>>>>>>>>>>>>>>> just discarding old segments. Further if you want to use
>>>> more
>>>>>>>>>>>>>>> consumers
>>>>>>>>>>>>>>> processing the events
>>>>>>>>>>>>>>> you also have to grow the number of partitions.
>> Especially
>>>> for
>>>>>>>>>>>>>>> use-cases
>>>>>>>>>>>>>>> we do (KIP-213) a tiny state full
>>>>>>>>>>>>>>> table might be very expensive to process if it joins
>>>> against a
>>>>>>> huge
>>>>>>>>>>>>>>> table.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I can just say we have been in the spot of needing to
>> grow
>>>> log
>>>>>>>>>>>>>>> compacted
>>>>>>>>>>>>>>> topics. Mainly for processing power we can bring to the
>>>> table.
>>>>>>>>>>>>>>> Further i am not at all concerned about the extra spaced
>>>> used
>>>>> by
>>>>>>>>>>>>>>> "garbage
>>>>>>>>>>>>>>> keys". I am more concerned about the correctness of
>>> innocent
>>>>>>>>>>>>>>> consumers.
>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>> logic becomes complicated. Say for streams one would
>> need
>>> to
>>>>>> load
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> record into state but not forward it the topology ( to
>>> have
>>>> it
>>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>> for shuffeling). I rather have it simple and a topic
>> clean
>>>>>>>>>>>>>>> regardless
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> still has its old partition count. Especially with
>>> multiple
>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>> growth's I think it becomes insanely hard to to this
>>> shuffle
>>>>>>>>>>>>>>> correct.
>>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>> Streams and Samza can do it. Especially if you do
>> "hipster
>>>>>> stream
>>>>>>>>>>>>>>> processing" <https://www.confluent.io/blog
>>>>>>>>>>>>>>> /introducing-kafka-streams-
>>>>>>>>>>>>>>> stream-processing-made-simple/>. This makes kafka way
>> to
>>>>>>>>>>>>>>> complicated.
>>>>>>>>>>>>>>> With my approach I think its way simpler because the
>> topic
>>>> has
>>>>>> no
>>>>>>>>>>>>>>> "history"
>>>>>>>>>>>>>>> in terms of partitioning but is always clean.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 51. "Growing the topic by an integer factor does not
>>> require
>>>>> any
>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> redistribution at all." Could you clarify this a bit
>> more?
>>>>> Let's
>>>>>>> say
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> have a consumer app that computes the windowed count
>> per
>>>> key.
>>>>>> If
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> double
>>>>>>>>>>>>>>>> the number of partitions from 1 to 2 and grow the
>>> consumer
>>>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> 1
>>>>>>>>>>>>>>>> to 2, we would need to redistribute some of the counts
>> to
>>>> the
>>>>>> new
>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>> instance. Regarding to linear hashing, it's true that
>> it
>>>>> won't
>>>>>>>>>>>>>>>> solve
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> problem with compacted topics. The main benefit is that
>>> it
>>>>>>>>>>>>>>>> redistributes
>>>>>>>>>>>>>>>> the keys in one partition to no more than two
>> partitions,
>>>>> which
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>> redistribute the state.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> You don't need to spin up a new consumer in this case.
>>>> every
>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> would just read every partition with the (partition %
>>>>> num_task)
>>>>>>>>>>>>>>> task.
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> will still have the previous data right there and can go
>>> on.
>>>>>>>>>>>>>>> This sounds contradictory to what I said before, but
>>> please
>>>>> bear
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 52. Good point on coordinating the expansion of 2 topics
>>>> that
>>>>>> need
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> joined together. This is where the 2-phase partition
>>>> expansion
>>>>>>> could
>>>>>>>>>>>>>>>> potentially help. In the first phase, we could add new
>>>>>> partitions
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> the 2
>>>>>>>>>>>>>>>> topics one at a time but without publishing to the new
>>>>>> patitions.
>>>>>>>>>>>>>>>> Then,
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> can add new consumer instances to pick up the new
>>>> partitions.
>>>>>> In
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> transition phase, no reshuffling is needed since no
>> data
>>> is
>>>>>>> coming
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> new partitions. Finally, we can enable the publishing
>> to
>>>> the
>>>>>> new
>>>>>>>>>>>>>>>> partitions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think its even worse than you think. I would like to
>>>>>> introduce
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Term
>>>>>>>>>>>>>>> transitive copartitioning. Imagine
>>>>>>>>>>>>>>> 2 streams application. One joins (A,B) the other (B,C)
>>> then
>>>>>> there
>>>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>> transitive copartition requirement for
>>>>>>>>>>>>>>> (A,C) to be copartitioned aswell. This can spread
>>>>> significantly
>>>>>>> and
>>>>>>>>>>>>>>> require many consumers to adapt at the same time.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It is also not entirely clear to me how you not need
>>>>> reshuffling
>>>>>>> in
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> case. If A has a record that never gets updated after
>> the
>>>>>>> expansion
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> coresponding B record moves to a new partition. How
>> shall
>>>> they
>>>>>>> meet
>>>>>>>>>>>>>>> w/o
>>>>>>>>>>>>>>> shuffle?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 53. "Migrating consumer is a step that might be made
>>>> completly
>>>>>>>>>>>>>>> unnecessary
>>>>>>>>>>>>>>>> if - for example streams - takes the gcd as
>> partitioning
>>>>> scheme
>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> enforcing 1 to 1." Not sure that I fully understand
>>> this. I
>>>>>> think
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> mean
>>>>>>>>>>>>>>>> that a consumer application can run more instances than
>>> the
>>>>>>> number
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> partitions. In that case, the consumer can just
>>>>> repartitioning
>>>>>>> the
>>>>>>>>>>>>>>>> input
>>>>>>>>>>>>>>>> data according to the number of instances. This is
>>>> possible,
>>>>>> but
>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>> the overhead of reshuffling the data.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> No what I meant is ( that is also your question i think
>>>>>> Mathias)
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> you grow a topic by a factor.
>>>>>>>>>>>>>>> Even if your processor is statefull you can can just
>>> assign
>>>>> all
>>>>>>> the
>>>>>>>>>>>>>>> multiples of the previous partition to
>>>>>>>>>>>>>>> this consumer and the state to keep processing correctly
>>>> will
>>>>> be
>>>>>>>>>>>>>>> present
>>>>>>>>>>>>>>> w/o any shuffling.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Say you have an assignment
>>>>>>>>>>>>>>> Statefull consumer => partition
>>>>>>>>>>>>>>> 0 => 0
>>>>>>>>>>>>>>> 1 => 1
>>>>>>>>>>>>>>> 2 => 2
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and you grow you topic by 4 you get,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 0 => 0,3,6,9
>>>>>>>>>>>>>>> 1 => 1,4,7,10
>>>>>>>>>>>>>>> 2 => 2,5,8,11
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Say your hashcode is 8. 8%3 => 2  before so consumer for
>>>>>>> partition 2
>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>> Now you you have 12 partitions so 8%12 => 8, so it goes
>>> into
>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>> 8
>>>>>>>>>>>>>>> which is assigned to the same consumer
>>>>>>>>>>>>>>> who had 2 before and therefore knows the key.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Userland reshuffeling is there as an options. And it
>> does
>>>>>> exactly
>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> suggest. And I think its the perfect strategie. All I am
>>>>>>> suggestion
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> broker side support to switch the producers to the newly
>>>>>>> partitioned
>>>>>>>>>>>>>>> topic.
>>>>>>>>>>>>>>> Then the old (to few partition topic) can go away.
>>> Remember
>>>>> the
>>>>>>>>>>>>>>> list
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> steps in the beginning of this thread. If one has broker
>>>>> support
>>>>>>> for
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>> where its required and streams support for those that
>>> aren’t
>>>>>>>>>>>>>>> necessarily.
>>>>>>>>>>>>>>> Then one has solved the problem.
>>>>>>>>>>>>>>> I repeat it because I think its important. I am really
>>> happy
>>>>>> that
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> brought that up! because its 100% what I want just with
>>> the
>>>>>>>>>>>>>>> differences
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> have an option to discard the to small topic later
>> (after
>>>> all
>>>>>>>>>>>>>>> consumers
>>>>>>>>>>>>>>> adapted). And to have order correct there. I need broker
>>>>> support
>>>>>>>>>>>>>>> managing
>>>>>>>>>>>>>>> the copy process + the produces and fence them against
>>> each
>>>>>>> other. I
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>> repeat. the copy process can run for weeks in the worst
>>>> case.
>>>>>>>>>>>>>>> Copying
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> data is not the longest task migrating consumers might
>>> very
>>>>> well
>>>>>>> be.
>>>>>>>>>>>>>>> Once all consumers switched and copying is really up to
>>> date
>>>>>>> (think
>>>>>>>>>>>>>>> ISR
>>>>>>>>>>>>>>> like up to date) only then we stop the producer, wait
>> for
>>>> the
>>>>>> copy
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> finish and use the new topic for producing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> After this the topic is perfect in shape. and no one
>> needs
>>>> to
>>>>>>> worry
>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>> complicated stuff. (old keys hanging around might arrive
>>> in
>>>>> some
>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>> topic later.....). can only imagine how many tricky bugs
>>>> gonna
>>>>>>>>>>>>>>> arrive
>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>> someone had grown and shrunken is topic 10 times.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 54. "The other thing I wanted to mention is that I
>> believe
>>>> the
>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> suggestion (without copying data over) can be
>> implemented
>>> in
>>>>>> pure
>>>>>>>>>>>>>>>> userland
>>>>>>>>>>>>>>>> with a custom partitioner and a small feedbackloop from
>>>>>>>>>>>>>>>> ProduceResponse
>>>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>>>> Partitionier in coorporation with a change management
>>>>> system."
>>>>>> I
>>>>>>> am
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> sure a customized partitioner itself solves the
>> problem.
>>> We
>>>>>>>>>>>>>>>> probably
>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> some broker side support to enforce when the new
>>> partitions
>>>>> can
>>>>>>> be
>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>> also need some support on the consumer/kstream side to
>>>>> preserve
>>>>>>> the
>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>> ordering and potentially migrate the processing state.
>>> This
>>>>> is
>>>>>>> not
>>>>>>>>>>>>>>>> trivial
>>>>>>>>>>>>>>>> and I am not sure if it's ideal to fully push to the
>>>>>> application
>>>>>>>>>>>>>>>> space.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Broker support is defenitly the preferred way here. I
>>> have
>>>>>>> nothing
>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>> broker support.
>>>>>>>>>>>>>>> I tried to say that for what I would preffer - copying
>> the
>>>>> data
>>>>>>>>>>>>>>> over,
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> least for log compacted topics -
>>>>>>>>>>>>>>> I would require more broker support than the KIP
>> currently
>>>>>> offers.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> are you actually reading my emails, or are you just
>> using
>>>> the
>>>>>>>>>>>>>>>> thread I
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> started for general announcements regarding the KIP?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I tried to argue really hard against linear hashing.
>>>> Growing
>>>>>> the
>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>> an integer factor does not require any state
>>>> redistribution
>>>>> at
>>>>>>>>>>>>>>>>> all. I
>>>>>>>>>>>>>>>>> fail
>>>>>>>>>>>>>>>>> to see completely where linear hashing helps on log
>>>>> compacted
>>>>>>>>>>>>>>>>> topics.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If you are not willing to explain to me what I might
>> be
>>>>>>>>>>>>>>>>> overlooking:
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> is fine.
>>>>>>>>>>>>>>>>> But I ask you to not reply to my emails then. Please
>>>>>> understand
>>>>>>> my
>>>>>>>>>>>>>>>>> frustration with this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 06.03.2018 19:38, Dong Lin wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for all the comments! It appears that everyone
>>>>> prefers
>>>>>>>>>>>>>>>>> linear
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> hashing because it reduces the amount of state that
>>> needs
>>>>> to
>>>>>> be
>>>>>>>>>>>>>>>>>> moved
>>>>>>>>>>>>>>>>>> between consumers (for stream processing). The KIP
>> has
>>>> been
>>>>>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>> linear hashing.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding the migration endeavor: it seems that
>>> migrating
>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>> library
>>>>>>>>>>>>>>>>>> to use linear hashing should be pretty
>> straightforward
>>>>>> without
>>>>>>>>>>>>>>>>>> much operational endeavor. If we don't upgrade client
>>>>> library
>>>>>>> to
>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> KIP, we can not support in-order delivery after
>>> partition
>>>>> is
>>>>>>>>>>>>>>>>>> changed
>>>>>>>>>>>>>>>>>> anyway. Suppose we upgrade client library to use this
>>>> KIP,
>>>>> if
>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>> number is not changed, the key -> partition mapping
>>> will
>>>> be
>>>>>>>>>>>>>>>>>> exactly
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> same as it is now because it is still determined
>> using
>>>>>>>>>>>>>>>>>> murmur_hash(key)
>>>>>>>>>>>>>>>>>> %
>>>>>>>>>>>>>>>>>> original_partition_num. In other words, this change
>> is
>>>>>> backward
>>>>>>>>>>>>>>>>>> compatible.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding the load distribution: if we use linear
>>>> hashing,
>>>>>> the
>>>>>>>>>>>>>>>>>> load
>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> unevenly distributed because those partitions which
>> are
>>>> not
>>>>>>> split
>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>> receive twice as much traffic as other partitions
>> that
>>>> are
>>>>>>> split.
>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>> issue can be mitigated by creating topic with
>>> partitions
>>>>> that
>>>>>>> are
>>>>>>>>>>>>>>>>>> several
>>>>>>>>>>>>>>>>>> times the number of consumers. And there will be no
>>>>> imbalance
>>>>>>> if
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> partition number is always doubled. So this imbalance
>>>> seems
>>>>>>>>>>>>>>>>>> acceptable.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding storing the partition strategy as per-topic
>>>>> config:
>>>>>>> It
>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> necessary since we can still use murmur_hash as the
>>>> default
>>>>>>> hash
>>>>>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>> and additionally apply the linear hashing algorithm
>> if
>>>> the
>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>> number
>>>>>>>>>>>>>>>>>> has increased. Not sure if there is any use-case for
>>>>> producer
>>>>>>> to
>>>>>>>>>>>>>>>>>> use a
>>>>>>>>>>>>>>>>>> different hash function. Jason, can you check if
>> there
>>> is
>>>>>> some
>>>>>>>>>>>>>>>>>> use-case
>>>>>>>>>>>>>>>>>> that I missed for using the per-topic partition
>>> strategy?
>>>>>>>>>>>>>>>>>> Regarding how to reduce latency (due to state
>>> store/load)
>>>>> in
>>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>> processing consumer when partition number changes: I
>>> need
>>>>> to
>>>>>>> read
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>> Stream code to understand how Kafka Stream currently
>>>>> migrate
>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>> consumers when the application is added/removed for a
>>>> given
>>>>>>> job.
>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> reply after I finish reading the documentation and
>>> code.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
>>>>>>>>>>>>>>>>>> jason@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Great discussion. I think I'm wondering whether we
>> can
>>>>>> continue
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kafka agnostic to the partitioning strategy. The
>>>> challenge
>>>>> is
>>>>>>>>>>>>>>>>>> communicating
>>>>>>>>>>>>>>>>>>> the partitioning logic from producers to consumers
>> so
>>>> that
>>>>>> the
>>>>>>>>>>>>>>>>>>> dependencies
>>>>>>>>>>>>>>>>>>> between each epoch can be determined. For the sake
>> of
>>>>>>>>>>>>>>>>>>> discussion,
>>>>>>>>>>>>>>>>>>> imagine
>>>>>>>>>>>>>>>>>>> you did something like the following:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. The name (and perhaps version) of a partitioning
>>>>> strategy
>>>>>>> is
>>>>>>>>>>>>>>>>>>> stored
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> topic configuration when a topic is created.
>>>>>>>>>>>>>>>>>>> 2. The producer looks up the partitioning strategy
>>>> before
>>>>>>>>>>>>>>>>>>> writing
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> topic and includes it in the produce request (for
>>>>> fencing).
>>>>>> If
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>>>>>>> have an implementation for the configured strategy,
>> it
>>>>>> fails.
>>>>>>>>>>>>>>>>>>> 3. The consumer also looks up the partitioning
>>> strategy
>>>>> and
>>>>>>>>>>>>>>>>>>> uses it
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> determine dependencies when reading a new epoch. It
>>>> could
>>>>>>> either
>>>>>>>>>>>>>>>>>>> fail
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> make the most conservative dependency assumptions if
>>> it
>>>>>>> doesn't
>>>>>>>>>>>>>>>>>>> know
>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> implement the partitioning strategy. For the
>> consumer,
>>>> the
>>>>>> new
>>>>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>>> might look something like this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> // Return the partition dependencies following an
>>> epoch
>>>>> bump
>>>>>>>>>>>>>>>>>>> Map<Integer, List<Integer>> dependencies(int
>>>>>>>>>>>>>>>>>>> numPartitionsBeforeEpochBump,
>>>>>>>>>>>>>>>>>>> int numPartitionsAfterEpochBump)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The unordered case then is just a particular
>>>>> implementation
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>> any epoch dependencies. To implement this, we would
>>> need
>>>>>> some
>>>>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> consumer to find out how many partitions there were
>> in
>>>>> each
>>>>>>>>>>>>>>>>>>> epoch,
>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>> maybe that's not too unreasonable.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Jason
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Dong
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> thank you very much for your questions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> regarding the time spend copying data across:
>>>>>>>>>>>>>>>>>>>> It is correct that copying data from a topic with
>> one
>>>>>>> partition
>>>>>>>>>>>>>>>>>>>> mapping
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> a topic with a different partition mapping takes
>> way
>>>>> longer
>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> stop producers. Tens of minutes is a very optimistic
>>>>>> estimate
>>>>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Many
>>>>>>>>>>>>>>>>>>>> people can not afford copy full steam and therefore
>>>> will
>>>>>> have
>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> rate
>>>>>>>>>>>>>>>>>>>> limiting in place, this can bump the timespan into
>>> the
>>>>>> day's.
>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> is that the vast majority of the data can be copied
>>>> while
>>>>>> the
>>>>>>>>>>>>>>>>>>> producers
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> still going. One can then, piggyback the consumers
>>>> ontop
>>>>> of
>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> timeframe,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> by the method mentioned (provide them an mapping
>> from
>>>>> their
>>>>>>> old
>>>>>>>>>>>>>>>>>>> offsets
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> new offsets in their repartitioned topics. In that
>>> way
>>>> we
>>>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> migration of consumers from migration of producers
>>>>>> (decoupling
>>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> what kafka is strongest at). The time to actually
>>> swap
>>>>> over
>>>>>>> the
>>>>>>>>>>>>>>>>>>>> producers
>>>>>>>>>>>>>>>>>>>> should be kept minimal by ensuring that when a swap
>>>>> attempt
>>>>>>> is
>>>>>>>>>>>>>>>>>>>> started
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> consumer copying over should be very close to the
>> log
>>>> end
>>>>>> and
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> to finish within the next fetch. The operation
>> should
>>>>> have
>>>>>> a
>>>>>>>>>>>>>>>>>>>> time-out
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> should be "reattemtable".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Importance of logcompaction:
>>>>>>>>>>>>>>>>>>>> If a producer produces key A, to partiton 0, its
>>>> forever
>>>>>>> gonna
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> there,
>>>>>>>>>>>>>>>>>>>> unless it gets deleted. The record might sit in
>> there
>>>> for
>>>>>>>>>>>>>>>>>>>> years. A
>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> producer started with the new partitions will fail
>> to
>>>>>> delete
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> the correct partition. Th record will be there
>>> forever
>>>>> and
>>>>>>> one
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> reliable bootstrap new consumers. I cannot see how
>>>> linear
>>>>>>>>>>>>>>>>>>> hashing
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> solve
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regarding your skipping of userland copying:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 100%, copying the data across in userland is, as
>> far
>>>> as i
>>>>>> can
>>>>>>>>>>>>>>>>>>>> see,
>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> usecase for log compacted topics. Even for
>>>> logcompaction
>>>>> +
>>>>>>>>>>>>>>>>>>>> retentions
>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> should only be opt-in. Why did I bring it up? I
>> think
>>>> log
>>>>>>>>>>>>>>>>>>>> compaction
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> very important feature to really embrace kafka as a
>>>> "data
>>>>>>>>>>>>>>>>>>>> plattform".
>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>> point I also want to make is that copying data this
>>> way
>>>>> is
>>>>>>>>>>>>>>>>>>>> completely
>>>>>>>>>>>>>>>>>>>> inline with the kafka architecture. it only
>> consists
>>> of
>>>>>>> reading
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> writing
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> to topics.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I hope it clarifies more why I think we should aim
>> for
>>>>> more
>>>>>>> than
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> current KIP. I fear that once the KIP is done not
>>> much
>>>>> more
>>>>>>>>>>>>>>>>>>>> effort
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> taken.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>>>>>>>>>>>>>>>>>>>> Hey Jan,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In the current proposal, the consumer will be
>> blocked
>>>> on
>>>>>>>>>>>>>>>>>>>> waiting
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>> consumers of the group to consume up to a given
>>>> offset.
>>>>> In
>>>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> consumers should be close to the LEO of the
>>> partitions
>>>>> when
>>>>>>> the
>>>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>> expansion happens. Thus the time waiting should not
>>> be
>>>>> long
>>>>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> order of seconds. On the other hand, it may take a
>>> long
>>>>>> time
>>>>>>> to
>>>>>>>>>>>>>>>>>>>> wait
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> the entire partition to be copied -- the amount of
>>>> time
>>>>> is
>>>>>>>>>>>>>>>>>>>>> proportional
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> the amount of existing data in the partition,
>> which
>>>> can
>>>>>> take
>>>>>>>>>>>>>>>>>>>> tens of
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> minutes. So the amount of time that we stop
>> consumers
>>>> may
>>>>>> not
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> same order of magnitude.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If we can implement this suggestion without
>> copying
>>>> data
>>>>>>> over
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> purse
>>>>>>>>>>>>>>>>>>>>> userland, it will be much more valuable. Do you
>> have
>>>>> ideas
>>>>>>> on
>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> be done?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Not sure why the current KIP not help people who
>>> depend
>>>>> on
>>>>>>> log
>>>>>>>>>>>>>>>>>>>> compaction.
>>>>>>>>>>>>>>>>>>>>> Could you elaborate more on this point?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan
>>>>>>>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
>>>>>>>>>>>>>>>>>>>>> com
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I tried to focus on what the steps are one can
>>>> currently
>>>>>>>>>>>>>>>>>>>>> perform
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> expand
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> or shrink a keyed topic while maintaining a top
>>> notch
>>>>>>>>>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>>>>>>>>> I can understand that there might be confusion
>>> about
>>>>>>>>>>>>>>>>>>>>>> "stopping
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> consumer". It is exactly the same as proposed in
>>> the
>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> a time the producers agree on the new
>> partitioning.
>>>> The
>>>>>>> extra
>>>>>>>>>>>>>>>>>>>> semantics I
>>>>>>>>>>>>>>>>>>>>>> want to put in there is that we have a
>> possibility
>>> to
>>>>>> wait
>>>>>>>>>>>>>>>>>>>>>> until
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> existing data
>>>>>>>>>>>>>>>>>>>>> is copied over into the new partitioning scheme.
>>> When
>>>> I
>>>>>> say
>>>>>>>>>>>>>>>>>>>> stopping
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> think more of having a memory barrier that
>> ensures
>>>> the
>>>>>>>>>>>>>>>>>>>>>> ordering. I
>>>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>>>>>> aming for latencies  on the scale of leader
>>>> failovers.
>>>>>>>>>>>>>>>>>>>>>> Consumers have to explicitly adapt the new
>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>> scheme
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> above scenario. The reason is that in these cases
>>>> where
>>>>>> you
>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> dependent
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> on a particular partitioning scheme, you also
>> have
>>>>> other
>>>>>>>>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> co-partition enforcements or the kind -frequently.
>>>>>> Therefore
>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> input topics might need to grow accordingly.
>>>>>>>>>>>>>>>>>>>>>> What I was suggesting was to streamline all these
>>>>>>> operations
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> best
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> possible to have "real" partition grow and
>>> shrinkage
>>>>>> going
>>>>>>>>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Migrating
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> the producers to a new partitioning scheme can be
>>>> much
>>>>>> more
>>>>>>>>>>>>>>>>>>>>> streamlined
>>>>>>>>>>>>>>>>>>>> with proper broker support for this. Migrating
>>> consumer
>>>>> is
>>>>>> a
>>>>>>>>>>>>>>>>>>>> step
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> might be made completly unnecessary if - for
>>> example
>>>>>>> streams
>>>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>> takes
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> gcd as partitioning scheme instead of enforcing 1
>>> to
>>>> 1.
>>>>>>>>>>>>>>>>>>>>>> Connect
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> consumers
>>>>>>>>>>>>>>>>>>>> and other consumers should be fine anyways.
>>>>>>>>>>>>>>>>>>>>> I hope this makes more clear where I was aiming
>> at.
>>>> The
>>>>>> rest
>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> figured out. The only danger i see is that when
>> we
>>>> are
>>>>>>>>>>>>>>>>>>>>> introducing
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> feature as supposed in the KIP, it wont help any
>>> people
>>>>>>>>>>>>>>>>>>>> depending
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> log
>>>>>>>>>>>>>>>>>>>>>> compaction.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The other thing I wanted to mention is that I
>>> believe
>>>>> the
>>>>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> suggestion (without copying data over) can be
>>>> implemented
>>>>>> in
>>>>>>>>>>>>>>>>>>>>> pure
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> userland
>>>>>>>>>>>>>>>>>>>>>> with a custom partitioner and a small
>> feedbackloop
>>>> from
>>>>>>>>>>>>>>>>>>>>>> ProduceResponse
>>>>>>>>>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>>>>>>>>>> Partitionier in coorporation with a change
>>> management
>>>>>>> system.
>>>>>>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hey Jan,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I am not sure if it is acceptable for producer to
>>> be
>>>>>>> stopped
>>>>>>>>>>>>>>>>>>>>>> for a
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> while,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> particularly for online application which
>> requires
>>>> low
>>>>>>>>>>>>>>>>>>>>>>> latency. I
>>>>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>> not sure how consumers can switch to a new
>> topic.
>>>> Does
>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> needs to explicitly specify a different topic
>> for
>>>>>>>>>>>>>>>>>>>>>> producer/consumer
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> subscribe to? It will be helpful for discussion if
>>> you
>>>>> can
>>>>>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>> detail on the interface change for this solution.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
>>>>>>>>>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> com
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>> just want to throw my though in. In general the
>>>>>>> functionality
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>> usefull, we should though not try to find the
>>>>>> architecture
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> hard
>>>>>>>>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>>>> implementing.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The manual steps would be to
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> create a new topic
>>>>>>>>>>>>>>>>>>>>>>>> the mirrormake from the new old topic to the
>> new
>>>>> topic
>>>>>>>>>>>>>>>>>>>>>>>> wait for mirror making to catch up.
>>>>>>>>>>>>>>>>>>>>>>>> then put the consumers onto the new topic
>>>>>>>>>>>>>>>>>>>>>>>>             (having mirrormaker spit out a
>> mapping
>>>>> from
>>>>>>> old
>>>>>>>>>>>>>>>>>>>>>>>> offsets to
>>>>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>> offsets:
>>>>>>>>>>>>>>>>>>>>>>>>                 if topic is increased by factor
>> X
>>>>> there
>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> gonna
>>>>>>>>>>>>>>>>>>>>>>>> be a
>>>>>>>>>>>>>>>>>>>>>>>> clean
>>>>>>>>>>>>>>>>>>>>>>>> mapping from 1 offset in the old topic to X
>>> offsets
>>>>> in
>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>> topic,
>>>>>>>>>>>>>>>>>>>>>>>>                 if there is no factor then there
>>> is
>>>> no
>>>>>>>>>>>>>>>>>>>>>>>> chance to
>>>>>>>>>>>>>>>>>>>>>>>> generate a
>>>>>>>>>>>>>>>>>>>>>>>> mapping that can be reasonable used for
>>> continuing)
>>>>>>>>>>>>>>>>>>>>>>>>             make consumers stop at appropriate
>>>> points
>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> continue
>>>>>>>>>>>>>>>>>>>>>>>> consumption
>>>>>>>>>>>>>>>>>>>>>>>> with offsets from the mapping.
>>>>>>>>>>>>>>>>>>>>>>>> have the producers stop for a minimal time.
>>>>>>>>>>>>>>>>>>>>>>>> wait for mirrormaker to finish
>>>>>>>>>>>>>>>>>>>>>>>> let producer produce with the new metadata.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Instead of implementing the approach suggest in
>>> the
>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>>>>>>> log compacted topic completely crumbled and
>>>> unusable.
>>>>>>>>>>>>>>>>>>>>>>>> I would much rather try to build infrastructure
>>> to
>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>>>>>>>>> above operations more smoothly.
>>>>>>>>>>>>>>>>>>>>>>>> Especially having producers stop and use
>> another
>>>>> topic
>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> difficult
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> it would be nice if one can trigger "invalid
>>>>> metadata"
>>>>>>>>>>>>>>>>>>>>>>> exceptions
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> if one could give topics aliases so that their
>>>> produces
>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>>>>>> will arrive in the new topic.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The downsides are obvious I guess ( having the
>>> same
>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>> twice
>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> transition period, but kafka tends to scale
>> well
>>>> with
>>>>>>>>>>>>>>>>>>>>>>> datasize).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>>>>> its a
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> nicer fit into the architecture.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I further want to argument that the functionality
>>> by
>>>>> the
>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>> completely be implementing in "userland" with a
>>>>> custom
>>>>>>>>>>>>>>>>>>>>>>>> partitioner
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> handles the transition as needed. I would
>>>> appreciate
>>>>> if
>>>>>>>>>>>>>>>>>>>>>>> someone
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>> point
>>>>>>>>>>>>>>>>>>>> out what a custom partitioner couldn't handle in
>> this
>>>>> case?
>>>>>>>>>>>>>>>>>>>>>> With the above approach, shrinking a topic
>> becomes
>>>> the
>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>> steps.
>>>>>>>>>>>>>>>>>>>>>>>> Without
>>>>>>>>>>>>>>>>>>>>>>>> loosing keys in the discontinued partitions.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Would love to hear what everyone thinks.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I have created KIP-253: Support in-order
>> message
>>>>>> delivery
>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> expansion. See
>>>>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confl
>>>>>>>>>>>>>>>>>>>>>>>>> uence/display/KAFKA/KIP-253%
>>>>>>>>>>>>>>>>>>>>>>>>> 3A+Support+in-order+message+de
>>>>>>>>>>>>>>>>>>>>>>>>> livery+with+partition+expansio
>>>>>>>>>>>>>>>>>>>>>>>>> n
>>>>>>>>>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> This KIP provides a way to allow messages of
>> the
>>>>> same
>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>> producer to be consumed in the same order they
>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> produced
>>>>>>>>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> expand partition of the topic.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>


Mime
View raw message