From dev-return-105843-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jul 17 22:20:24 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6BCC618060E for ; Thu, 18 Jul 2019 00:20:24 +0200 (CEST) Received: (qmail 58354 invoked by uid 500); 17 Jul 2019 22:20:21 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 58336 invoked by uid 99); 17 Jul 2019 22:20:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jul 2019 22:20:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7D9951826FA for ; Wed, 17 Jul 2019 22:20:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.199 X-Spam-Level: X-Spam-Status: No, score=-0.199 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent.io Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id GxvlcM5hDGHf for ; Wed, 17 Jul 2019 22:20:18 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.221.43; helo=mail-wr1-f43.google.com; envelope-from=john@confluent.io; receiver= Received: from mail-wr1-f43.google.com (mail-wr1-f43.google.com [209.85.221.43]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 1E427BC7E1 for ; Wed, 17 Jul 2019 22:20:18 +0000 (UTC) Received: by mail-wr1-f43.google.com with SMTP id r1so26454175wrl.7 for ; Wed, 17 Jul 2019 15:20:18 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent.io; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-transfer-encoding; bh=qeU5EXHStYF2tRuUcXcrU31v+B899kMFg+ELN3RZ0Go=; b=We6ZO/iucnKQf77m9lDnXa9WADjdaQoaRfusdhYdKZxmbIWUg+UJtn1Lbfq3+AzroE Ex2iCbqrKLL/rFUXR6IUO0HmkmtnpORmKEO9fXI+RC6Jy7CnL1SLv9nqJCc3gwwuREZQ xjLdVz1eBsN890jKoD8PfqUXA93MHKQXQ7tF28YI2OsV7OXNe8tcSstglBU99XLwoqyK oVwhKQeGWxREu7eax20O8rAce9gEqtzBJEPuBmR5KB2qtH8Q5wAi5Z/n4y13ogboENtc GNTEA2cMzUXyMfAIuhoeqRkMUQRxkeXrYDaEcU+4cj32ie3pqn+ZcpIGd2ZsCzVFL+DN zWeA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:content-transfer-encoding; bh=qeU5EXHStYF2tRuUcXcrU31v+B899kMFg+ELN3RZ0Go=; b=Ak75L6+WFW0lihi4Xhk2B3iJoiluEg+urUMrfhmPG+bOJOFRbTWtEBv2S+tLaBGCNi uI7oPVN/maaatm640rjsaCJMp7KXMHpLIGmkDXz1/zcd2KQY+UQIlOJsLDY3RBN6L8CK 74zo2suNqRvXh81QqBIdWHV3PRr1o/fGkF6M3YTeHI1lvmRyIqEXWQC01X7REfuxtoN7 hgjISPeMDWhHwusV8JX33KvghdlWQdh+G0SyWF1nnKHVlgM2Ot/+HJ3VSHBkO2PeWIpN GrTTZsI6SO4vW9w+MQOsp0Tt73q/+4AbyJDrdXAIdUeOuW2xtyfBuIwHqxEIt+qMfFSW qhNQ== X-Gm-Message-State: APjAAAVBFrp3fp31njMK/0jA60DtE7CJkHu3zyO3RYP8n6ldxcaM5K4p ulaA9gIuYMQ6rHfWnSdT+u4Io15Kc71JkVZuUk3ztVI1 X-Google-Smtp-Source: APXvYqxhfpXEFe8kezjKbZSJfNtByfFrlk6ywRqOb88K8GIAcWEqrvSjFlmwQsvWsC/oK7le/Jx06U93OQ4RZycyiso= X-Received: by 2002:adf:c803:: with SMTP id d3mr21930470wrh.130.1563402016742; Wed, 17 Jul 2019 15:20:16 -0700 (PDT) MIME-Version: 1.0 References: <8912A2AA-F936-47D5-9121-F877C736C6CE@gmail.com> <44E10ACF-5BFD-4EF6-8B6C-8AF983472DBC@gmail.com> <129FF7B3-1233-4EBC-9B5C-F096C17C5296@gmail.com> <6AE75524-4316-459E-B894-CC239AB96741@gmail.com> In-Reply-To: From: John Roesler Date: Wed, 17 Jul 2019 17:20:05 -0500 Message-ID: Subject: Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams To: dev@kafka.apache.org Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Yes, I believe that's what I had in mind. Again, not totally sure it makes sense, but I believe something similar is the rationale for having the partitioner option in Produced. Thanks, -John On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze wrote: > > Hey John, > > Oh that=E2=80=99s interesting use-case. > Do I understand this correctly, in your example I would first issue repar= tition(Repartitioned) with proper partitioner that essentially would be the= same as the topic I want to join with and then do the KStream#join with DS= L? > > Regards, > Levani > > > On Jul 17, 2019, at 11:11 PM, John Roesler wrote: > > > > Hey, all, just to chime in, > > > > I think it might be useful to have an option to specify the > > partitioner. The case I have in mind is that some data may get > > repartitioned and then joined with an input topic. If the right-side > > input topic uses a custom partitioning strategy, then the > > repartitioned stream also needs to be partitioned with the same > > strategy. > > > > Does that make sense, or did I maybe miss something important? > > > > Thanks, > > -John > > > > On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze > > wrote: > >> > >> Yes, I was thinking about it as well. To be honest I=E2=80=99m not sur= e about it yet. > >> As Kafka Streams DSL user, I don=E2=80=99t really think I would need c= ontrol over partitioner for internal topics. > >> As a user, I would assume that Kafka Streams knows best how to partiti= on data for internal topics. > >> In this KIP I wrote that Produced should be used only for topics that = are created by user In advance. > >> In those cases maybe it make sense to have possibility to specify the = partitioner. > >> I don=E2=80=99t have clear answer on that yet, but I guess specifying = the partitioner can be added as well if there=E2=80=99s agreement on this. > >> > >> Regards, > >> Levani > >> > >>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman wrote: > >>> > >>> Thanks for clearing that up. I agree that Repartitioned would be a us= eful > >>> addition. I'm wondering if it might also need to have > >>> a withStreamPartitioner method/field, similar to Produced? I'm not su= re how > >>> widely this feature is really used, but seems it should be available = for > >>> repartition topics. > >>> > >>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze > >>> wrote: > >>> > >>>> Hey Sophie, > >>>> > >>>> In both cases KStream#repartition and KStream#repartition(Repartitio= ned) > >>>> topic will be created and managed by Kafka Streams. > >>>> Idea of Repartitioned is to give user more control over the topic su= ch as > >>>> num of partitions. > >>>> I feel like Repartitioned parameter is something that is missing in > >>>> current DSL design. > >>>> Essentially giving user control over parallelism by configuring num = of > >>>> partitions for internal topics. > >>>> > >>>> Hope this answers your question. > >>>> > >>>> Regards, > >>>> Levani > >>>> > >>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman > >>>> wrote: > >>>>> > >>>>> Hey Levani, > >>>>> > >>>>> Thanks for the KIP! Can you clarify one thing for me -- for the > >>>>> KStream#repartition signature taking a Repartitioned, will the topi= c be > >>>>> auto-created by Streams (which seems to be the case for the signatu= re > >>>>> without a Repartitioned) or does it have to be pre-created? The wor= ding > >>>> in > >>>>> the KIP makes it seem like one version of the method will auto-crea= te > >>>>> topics while the other will not. > >>>>> > >>>>> Cheers, > >>>>> Sophie > >>>>> > >>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze < > >>>> levani.codes@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hello, > >>>>>> > >>>>>> One more bump about KIP-221 ( > >>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance= +KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>> < > >>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+K= Stream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>> ) > >>>>>> so it doesn=E2=80=99t get lost in mailing list :) > >>>>>> Would love to hear communities opinions/concerns about this KIP. > >>>>>> > >>>>>> Regards, > >>>>>> Levani > >>>>>> > >>>>>> > >>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze >>>>> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hello, > >>>>>>> > >>>>>>> Kind reminder about this KIP: > >>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance= +KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>> < > >>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+K= Stream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>> > >>>>>>> > >>>>>>> Regards, > >>>>>>> Levani > >>>>>>> > >>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze < > >>>> levani.codes@gmail.com > >>>>>> > wrote: > >>>>>>>> > >>>>>>>> Hello, > >>>>>>>> > >>>>>>>> In order to move this KIP forward, I=E2=80=99ve updated confluen= ce page with > >>>>>> the new proposal > >>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance= +KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>> < > >>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+K= Stream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>> > >>>>>>>> I=E2=80=99ve also filled =E2=80=9CRejected Alternatives=E2=80=9D= section. > >>>>>>>> > >>>>>>>> Looking forward to discuss this KIP :) > >>>>>>>> > >>>>>>>> King regards, > >>>>>>>> Levani > >>>>>>>> > >>>>>>>> > >>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze < > >>>> levani.codes@gmail.com > >>>>>> > wrote: > >>>>>>>>> > >>>>>>>>> Hello Matthias, > >>>>>>>>> > >>>>>>>>> Thanks for the feedback and ideas. > >>>>>>>>> I like the idea of introducing dedicated `Topic` class for topi= c > >>>>>> configuration for internal operators like `groupedBy`. > >>>>>>>>> Would be great to hear others opinion about this as well. > >>>>>>>>> > >>>>>>>>> Kind regards, > >>>>>>>>> Levani > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax >>>>>> > wrote: > >>>>>>>>>> > >>>>>>>>>> Levani, > >>>>>>>>>> > >>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing > >>>> everything. > >>>>>>>>>> Even if some points may have been discussed already (can't rea= lly > >>>>>>>>>> remember), it's helpful to get a good summary to refresh the > >>>>>> discussion. > >>>>>>>>>> > >>>>>>>>>> I think your reasoning makes sense. With regard to the distinc= tion > >>>>>>>>>> between operators that manage topics and operators that use > >>>>>> user-created > >>>>>>>>>> topics: Following this argument, it might indicate that leavin= g > >>>>>>>>>> `through()` as-is (as an operator that uses use-defined topics= ) and > >>>>>>>>>> introducing a new `repartition()` operator (an operator that m= anages > >>>>>>>>>> topics itself) might be good. Otherwise, there is one operator > >>>>>>>>>> `through()` that sometimes manages topics but sometimes not; a > >>>>>> different > >>>>>>>>>> name, ie, new operator would make the distinction clearer. > >>>>>>>>>> > >>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering if= the > >>>>>> same > >>>>>>>>>> argument as for `Produced` does apply and adding it is semanti= cally > >>>>>>>>>> questionable? Might be good to get opinions of others on this,= too. > >>>> I > >>>>>> am > >>>>>>>>>> not sure myself what solution I prefer atm. > >>>>>>>>>> > >>>>>>>>>> So far, KS uses configuration objects that allow to configure = a > >>>>>> certain > >>>>>>>>>> "entity" like a consumer, producer, store. If we assume that a= topic > >>>>>> is > >>>>>>>>>> a similar entity, I am wonder if we should have a > >>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of a= plain > >>>>>>>>>> integer? This would allow us to add other configs, like replic= ation > >>>>>>>>>> factor, retention-time etc, easily, without the need to change= the > >>>>>> "main > >>>>>>>>>> API". > >>>>>>>>>> > >>>>>>>>>> Just want to give some ideas. Not sure if I like them myself. = :) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote: > >>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced wi= th num > >>>>>> of partitions configuration is not the best approach. Let me expla= in > >>>> why: > >>>>>>>>>>> > >>>>>>>>>>> 1) If we enhance Produced class with this configuration, this= will > >>>>>> also affect KStream#to operation. Since KStream#to is the final si= nk of > >>>> the > >>>>>> topology, for me, it seems to be reasonable assumption that user n= eeds > >>>> to > >>>>>> manually create sink topic in advance. And in that case, having nu= m of > >>>>>> partitions configuration doesn=E2=80=99t make much sense. > >>>>>>>>>>> > >>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems li= ke > >>>>>> Produced is designed to be something that is explicitly for produc= er > >>>> (key > >>>>>> serializer, value serializer, partitioner those all are producer > >>>> specific > >>>>>> configurations) and num of partitions is topic level configuration= . And > >>>> I > >>>>>> don=E2=80=99t think mixing topic and producer level configurations= together in > >>>> one > >>>>>> class is the good approach. > >>>>>>>>>>> > >>>>>>>>>>> 3) Looking at KStream interface, seems like Produced paramete= r is > >>>>>> for operations that work with non-internal (e.g topics created and > >>>> managed > >>>>>> internally by Kafka Streams) topics and I think we should leave it= as > >>>> it is > >>>>>> in that case. > >>>>>>>>>>> > >>>>>>>>>>> Taking all this things into account, I think we should distin= guish > >>>>>> between DSL operations, where Kafka Streams should create and mana= ge > >>>>>> internal topics (KStream#groupBy) vs topics that should be created= in > >>>>>> advance (e.g KStream#to). > >>>>>>>>>>> > >>>>>>>>>>> To sum it up, I think adding numPartitions configuration in > >>>> Produced > >>>>>> will result in mixing topic and producer level configuration in on= e > >>>> class > >>>>>> and it=E2=80=99s gonna break existing API semantics. > >>>>>>>>>>> > >>>>>>>>>>> Regarding making topic name optional in KStream#through - I t= hink > >>>>>> underline idea is very useful and giving users possibility to spec= ify > >>>> num > >>>>>> of partitions there is even more useful :) Considering arguments a= gainst > >>>>>> adding num of partitions in Produced class, I see two options here= : > >>>>>>>>>>> 1) Add following method overloads > >>>>>>>>>>> * through() - topic will be auto-generated and num of partiti= ons > >>>>>> will be taken from source topic > >>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto > >>>>>> generated with specified num of partitions > >>>>>>>>>>> * through(final int numOfPartitions, final Produced > >>>>>> produced) - topic will be with generated with specified num of > >>>> partitions > >>>>>> and configuration taken from produced parameter. > >>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method - > >>>>>> KStream#repartition (I think Matthias suggested this in one of the > >>>> threads) > >>>>>>>>>>> > >>>>>>>>>>> Considering all mentioned above I propose the following plan: > >>>>>>>>>>> > >>>>>>>>>>> Option A: > >>>>>>>>>>> 1) Leave Produced as it is > >>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as > >>>>>> mentioned in the KIP) > >>>>>>>>>>> 3) Add following method overloads to KStream#through > >>>>>>>>>>> * through() - topic will be auto-generated and num of partiti= ons > >>>>>> will be taken from source topic > >>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto > >>>>>> generated with specified num of partitions > >>>>>>>>>>> * through(final int numOfPartitions, final Produced > >>>>>> produced) - topic will be with generated with specified num of > >>>> partitions > >>>>>> and configuration taken from produced parameter. > >>>>>>>>>>> > >>>>>>>>>>> Option B: > >>>>>>>>>>> 1) Leave Produced as it is > >>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as > >>>>>> mentioned in the KIP) > >>>>>>>>>>> 3) Add new operator KStream#repartition for creating and mana= ging > >>>>>> internal repartition topics > >>>>>>>>>>> > >>>>>>>>>>> P.S. I=E2=80=99m sorry if all of this was already discussed i= n the mailing > >>>>>> list, but I kinda got with all the threads that were about this KI= P :( > >>>>>>>>>>> > >>>>>>>>>>> Kind regards, > >>>>>>>>>>> Levani > >>>>>>>>>>> > >>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze < > >>>>>> levani.codes@gmail.com > wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hello, > >>>>>>>>>>>> > >>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going t= hrough > >>>>>> the discussion thread, there=E2=80=99s seems to agreement around u= sefulness of > >>>> this > >>>>>> feature. > >>>>>>>>>>>> Regarding the implementation, as far as I understood, the mo= st > >>>>>> optimal solution for me seems the following: > >>>>>>>>>>>> > >>>>>>>>>>>> 1) Add two method overloads to KStream#through method (essen= tially > >>>>>> making topic name optional) > >>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions configuration > >>>> field. > >>>>>>>>>>>> > >>>>>>>>>>>> Those two changes will allow DSL users to control parallelis= m and > >>>>>> trigger re-partition without doing stateful operations. > >>>>>>>>>>>> > >>>>>>>>>>>> I will update KIP with interface changes around KStream#thro= ugh if > >>>>>> this changes sound sensible. > >>>>>>>>>>>> > >>>>>>>>>>>> Kind regards, > >>>>>>>>>>>> Levani > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> >