From dev-return-104491-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri May 24 22:25:47 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 68465180671 for ; Sat, 25 May 2019 00:25:46 +0200 (CEST) Received: (qmail 6498 invoked by uid 500); 24 May 2019 22:25:43 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 6486 invoked by uid 99); 24 May 2019 22:25:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 May 2019 22:25:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id CE412C1597 for ; Fri, 24 May 2019 22:25:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.801 X-Spam-Level: * X-Spam-Status: No, score=1.801 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent.io Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 0MZ7BPq2z4YD for ; Fri, 24 May 2019 22:25:33 +0000 (UTC) Received: from mail-pl1-f179.google.com (mail-pl1-f179.google.com [209.85.214.179]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C6B4D5F118 for ; Fri, 24 May 2019 22:25:32 +0000 (UTC) Received: by mail-pl1-f179.google.com with SMTP id f12so4673355plt.8 for ; Fri, 24 May 2019 15:25:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent.io; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=qkSK1xl49Dhl2HPtcRsTtk7zh0WR6fPBlPSDbs/x4QE=; b=WOgg8Yh9PK3G9OkgtKeTx8URYyCIBmiwDsc1Zfu5Iki+vr3GTwnZbdELEevCXYjx8+ lCRuKjYybQVBZ/pkGmD/8kLLUW+NvKguuTA7Hpg9bB8cVn4xjkX44Re1ehsrEZnNhuKc L9Kqjh3iw1jqJ/g8ZmbZR0/lzH56RonhWsQ+yxM/uKV+coDRh5SkIYayaIFI0GLokfqq VONwGjYV0nEY7munffeaWu4sU2o3B+U7jos5exFCaHpBMQLLr90LBVKUVVqIWw2RvMqN 08EvFv7ivtHbv6b7Eu1QbKeaDJqSvMONYDXYXt7w5kOAJETb7yYQ0X9tPrk+LKDj8/6s Buvg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=qkSK1xl49Dhl2HPtcRsTtk7zh0WR6fPBlPSDbs/x4QE=; b=rqrsZV48sSn7l8xQ6r7TMqBVW8shIUA92RSdLpkqr9/z+rTtfeV3Gt2ZzXKCKBEjJf zNzkLUOm3Ez+JkSsIHXW9sKN28aXh01oMGd1mc8z6MSE83X+YcB5OhkBn9t1V1MlWIid rRrqGP1hP28Audz3Nt/mO1tbKIYKW/657x+B5mjTsaMVjyo6+1cJaDvES+KNgjUE+rMm 8zXdAW+dXcy3he0gcObTcr+Wxy1f1ykzDEnGO/DznCktb+41QR45upgyRbH3lxuWoVWg ix6mXY3MfJF6p63OAWstDO6bcnL0dwGHqfMrtejNaVcp6RvCvc4kJOdTNqYjqNV3jlJg umLA== X-Gm-Message-State: APjAAAV9z54EP7RxQDbEJ6TZnlDKHRREhpvZ1X5ues2zXN6onMHNl/5r 8Bm2mzfMrXZKXCVxrVD6h59d2SJmRoo+7GEjbEvjfx4r X-Google-Smtp-Source: APXvYqxJSY7WUeNM3s5AovZKFhzRAe0Nm+/8yGxse6gCMoSCutW9nabHJmbHyuEbkfEODKqHO8nhaDntvPEQzoZFLjU= X-Received: by 2002:a17:902:a613:: with SMTP id u19mr94863775plq.42.1558736725615; Fri, 24 May 2019 15:25:25 -0700 (PDT) MIME-Version: 1.0 References: <49074950-03fc-ff99-d162-e1a1a35c7c36@confluent.io> <8ee71519-1c44-13dd-a4a1-437e53d584cb@mail.ru> <03267022-7221-473C-8B44-B26FF8BBB7AC@gmail.com> <186DE1C5-1FEE-4AD1-814F-63AAE0B4A254@gmail.com> <6a7dcef5-40cd-8495-49c6-8cd6bc672341@confluent.io> In-Reply-To: <6a7dcef5-40cd-8495-49c6-8cd6bc672341@confluent.io> From: Michael Drogalis Date: Fri, 24 May 2019 15:25:14 -0700 Message-ID: Subject: Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="000000000000d449bb0589a9ac4e" --000000000000d449bb0589a9ac4e Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Matthias: I think that's pretty reasonable from my point of view. Good suggestion. On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax wrote: > Interesting discussion. > > I am wondering, if we cannot unify the advantage of both approaches: > > > > KStream#split() -> KBranchedStream > > // branch is not easily accessible in current scope > KBranchedStream#branch(Predicate, Consumer) > -> KBranchedStream > > // assign a name to the branch and > // return the sub-stream to the current scope later > // > // can be simple as `#branch(p, s->s, "name")` > // or also complex as `#branch(p, s->s.filter(...), "name")` > KBranchedStream#branch(Predicate, Function, String) > -> KBranchedStream > > // default branch is not easily accessible > // return map of all named sub-stream into current scope > KBranchedStream#default(Cosumer) > -> Map > > // assign custom name to default-branch > // return map of all named sub-stream into current scope > KBranchedStream#default(Function, String) > -> Map > > // assign a default name for default > // return map of all named sub-stream into current scope > KBranchedStream#defaultBranch(Function) > -> Map > > // return map of all names sub-stream into current scope > KBranchedStream#noDefaultBranch() > -> Map > > > > Hence, for each sub-stream, the user can pick to add a name and return > the branch "result" to the calling scope or not. The implementation can > also check at runtime that all returned names are unique. The returned > Map can be empty and it's also optional to use the Map. > > To me, it seems like a good way to get best of both worlds. > > Thoughts? > > > > -Matthias > > > > > On 5/6/19 5:15 PM, John Roesler wrote: > > Ivan, > > > > That's a very good point about the "start" operator in the dynamic case= . > > I had no problem with "split()"; I was just questioning the necessity. > > Since you've provided a proof of necessity, I'm in favor of the > > "split()" start operator. Thanks! > > > > Separately, I'm interested to see where the present discussion leads. > > I've written enough Javascript code in my life to be suspicious of > > nested closures. You have a good point about using method references (o= r > > indeed function literals also work). It should be validating that this > > was also the JS community's first approach to flattening the logic when > > their nested closure situation got out of hand. Unfortunately, it's > > replacing nesting with redirection, both of which disrupt code > > readability (but in different ways for different reasons). In other > > words, I agree that function references is *the* first-order solution i= f > > the nested code does indeed become a problem. > > > > However, the history of JS also tells us that function references aren'= t > > the end of the story either, and you can see that by observing that > > there have been two follow-on eras, as they continue trying to cope wit= h > > the consequences of living in such a callback-heavy language. First, yo= u > > have Futures/Promises, which essentially let you convert nested code to > > method-chained code (Observables/FP is a popular variation on this). > > Most lately, you have async/await, which is an effort to apply language > > (not just API) syntax to the problem, and offer the "flattest" possible > > programming style to solve the problem (because you get back to just on= e > > code block per functional unit). > > > > Stream-processing is a different domain, and Java+KStreams is nowhere > > near as callback heavy as JS, so I don't think we have to take the JS > > story for granted, but then again, I think we can derive some valuable > > lessons by looking sideways to adjacent domains. I'm just bringing this > > up to inspire further/deeper discussion. At the same time, just like JS= , > > we can afford to take an iterative approach to the problem. > > > > Separately again, I'm interested in the post-branch merge (and I'd also > > add join) problem that Paul brought up. We can clearly punt on it, by > > terminating the nested branches with sink operators. But is there a DSL > > way to do it? > > > > Thanks again for your driving this, > > -John > > > > On Thu, May 2, 2019 at 7:39 PM Paul Whalen > > wrote: > > > > Ivan, I=E2=80=99ll definitely forfeit my point on the clumsiness of= the > > branch(predicate, consumer) solution, I don=E2=80=99t see any real = drawbacks > > for the dynamic case. > > > > IMO the one trade off to consider at this point is the scope > > question. I don=E2=80=99t know if I totally agree that =E2=80=9Cwe = rarely need them > > in the same scope=E2=80=9D since merging the branches back together= later > > seems like a perfectly plausible use case that can be a lot nicer > > when the branched streams are in the same scope. That being said, > > for the reasons Ivan listed, I think it is overall the better > > solution - working around the scope thing is easy enough if you nee= d > > to. > > > > > On May 2, 2019, at 7:00 PM, Ivan Ponomarev > > wrote: > > > > > > Hello everyone, thank you all for joining the discussion! > > > > > > Well, I don't think the idea of named branches, be it a > > LinkedHashMap (no other Map will do, because order of definition > > matters) or `branch` method taking name and Consumer has more > > advantages than drawbacks. > > > > > > In my opinion, the only real positive outcome from Michael's > > proposal is that all the returned branches are in the same scope. > > But 1) we rarely need them in the same scope 2) there is a > > workaround for the scope problem, described in the KIP. > > > > > > 'Inlining the complex logic' is not a problem, because we can use > > method references instead of lambdas. In real world scenarios you > > tend to split the complex logic to methods anyway, so the code is > > going to be clean. > > > > > > The drawbacks are strong. The cohesion between predicates and > > handlers is lost. We have to define predicates in one place, and > > handlers in another. This opens the door for bugs: > > > > > > - what if we forget to define a handler for a name? or a name for > > a handler? > > > - what if we misspell a name? > > > - what if we copy-paste and duplicate a name? > > > > > > What Michael propose would have been totally OK if we had been > > writing the API in Lua, Ruby or Python. In those languages the > > "dynamic naming" approach would have looked most concise and > > beautiful. But in Java we expect all the problems related to > > identifiers to be eliminated in compile time. > > > > > > Do we have to invent duck-typing for the Java API? > > > > > > And if we do, what advantage are we supposed to get besides havin= g > > all the branches in the same scope? Michael, maybe I'm missing your > > point? > > > > > > --- > > > > > > Earlier in this discussion John Roesler also proposed to do > > without "start branching" operator, and later Paul mentioned that i= n > > the case when we have to add a dynamic number of branches, the > > current KIP is 'clumsier' compared to Michael's 'Map' solution. Let > > me address both comments here. > > > > > > 1) "Start branching" operator (I think that *split* is a good nam= e > > for it indeed) is critical when we need to do a dynamic branching, > > see example below. > > > > > > 2) No, dynamic branching in current KIP is not clumsy at all. > > Imagine a real-world scenario when you need one branch per enum > > value (say, RecordType). You can have something like this: > > > > > > /*John:if we had to start with stream.branch(...) here, it would > > have been much messier.*/ > > > KBranchedStream branched =3D stream.split(); > > > > > > /*Not clumsy at all :-)*/ > > > for (RecordType recordType : RecordType.values()) > > > branched =3D branched.branch((k, v) -> v.getRecType()= =3D=3D > > recordType, > > > recordType::processRecords); > > > > > > Regards, > > > > > > Ivan > > > > > > > > > 02.05.2019 14:40, Matthias J. Sax =D0=BF=D0=B8=D1=88=D0=B5=D1=82: > > >> I also agree with Michael's observation about the core problem o= f > > >> current `branch()` implementation. > > >> > > >> However, I also don't like to pass in a clumsy Map object. My > > thinking > > >> was more aligned with Paul's proposal to just add a name to each > > >> `branch()` statement and return a `Map`. > > >> > > >> It makes the code easier to read, and also make the order of > > >> `Predicates` (that is essential) easier to grasp. > > >> > > >>>>>> Map> branches =3D stream.split() > > >>>>>> .branch("branchOne", Predicate) > > >>>>>> .branch( "branchTwo", Predicate) > > >>>>>> .defaultBranch("defaultBranch"); > > >> An open question is the case for which no defaultBranch() should > be > > >> specified. Atm, `split()` and `branch()` would return > > `BranchedKStream` > > >> and the call to `defaultBranch()` that returns the `Map` is > mandatory > > >> (what is not the case atm). Or is this actually not a real > problem, > > >> because users can just ignore the branch returned by > > `defaultBranch()` > > >> in the result `Map` ? > > >> > > >> > > >> About "inlining": So far, it seems to be a matter of personal > > >> preference. I can see arguments for both, but no "killer > > argument" yet > > >> that clearly make the case for one or the other. > > >> > > >> > > >> -Matthias > > >> > > >>> On 5/1/19 6:26 PM, Paul Whalen wrote: > > >>> Perhaps inlining is the wrong terminology. It doesn=E2=80=99t r= equire > > that a lambda with the full downstream topology be defined inline - > > it can be a method reference as with Ivan=E2=80=99s original sugges= tion. > > The advantage of putting the predicate and its downstream logic > > (Consumer) together in branch() is that they are required to be nea= r > > to each other. > > >>> > > >>> Ultimately the downstream code has to live somewhere, and deep > > branch trees will be hard to read regardless. > > >>> > > >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis > > > > wrote: > > >>>> > > >>>> I'm less enthusiastic about inlining the branch logic with its > > downstream > > >>>> functionality. Programs that have deep branch trees will > > quickly become > > >>>> harder to read as a single unit. > > >>>> > > >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen > > > wrote: > > >>>>> > > >>>>> Also +1 on the issues/goals as Michael outlined them, I think > > that sets a > > >>>>> great framework for the discussion. > > >>>>> > > >>>>> Regarding the SortedMap solution, my understanding is that th= e > > current > > >>>>> proposal in the KIP is what is in my PR which (pending naming > > decisions) is > > >>>>> roughly this: > > >>>>> > > >>>>> stream.split() > > >>>>> .branch(Predicate, Consumer>) > > >>>>> .branch(Predicate, Consumer>) > > >>>>> .defaultBranch(Consumer>); > > >>>>> > > >>>>> Obviously some ordering is necessary, since branching as a > > construct > > >>>>> doesn't work without it, but this solution seems like it > > provides as much > > >>>>> associativity as the SortedMap solution, because each branch(= ) > > call > > >>>>> directly associates the "conditional" with the "code block." > > The value it > > >>>>> provides over the KIP solution is the accessing of streams in > > the same > > >>>>> scope. > > >>>>> > > >>>>> The KIP solution is less "dynamic" than the SortedMap solutio= n > > in the sense > > >>>>> that it is slightly clumsier to add a dynamic number of > > branches, but it is > > >>>>> certainly possible. It seems to me like the API should favor > > the "static" > > >>>>> case anyway, and should make it simple and readable to > > fluently declare and > > >>>>> access your branches in-line. It also makes it impossible to > > ignore a > > >>>>> branch, and it is possible to build an (almost) identical > > SortedMap > > >>>>> solution on top of it. > > >>>>> > > >>>>> I could also see a middle ground where instead of a raw > > SortedMap being > > >>>>> taken in, branch() takes a name and not a Consumer. Somethin= g > > like this: > > >>>>> > > >>>>> Map> branches =3D stream.split() > > >>>>> .branch("branchOne", Predicate) > > >>>>> .branch( "branchTwo", Predicate) > > >>>>> .defaultBranch("defaultBranch", Consumer>); > > >>>>> > > >>>>> Pros for that solution: > > >>>>> - accessing branched KStreams in same scope > > >>>>> - no double brace initialization, hopefully slightly more > > readable than > > >>>>> SortedMap > > >>>>> > > >>>>> Cons > > >>>>> - downstream branch logic cannot be specified inline which > > makes it harder > > >>>>> to read top to bottom (like existing API and SortedMap, but > > unlike the KIP) > > >>>>> - you can forget to "handle" one of the branched streams (lik= e > > existing > > >>>>> API and SortedMap, but unlike the KIP) > > >>>>> > > >>>>> (KBranchedStreams could even work *both* ways but perhaps > > that's overdoing > > >>>>> it). > > >>>>> > > >>>>> Overall I'm curious how important it is to be able to easily > > access the > > >>>>> branched KStream in the same scope as the original. It's > > possible that it > > >>>>> doesn't need to be handled directly by the API, but instead > > left up to the > > >>>>> user. I'm sort of in the middle on it. > > >>>>> > > >>>>> Paul > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman > > > > > >>>>> wrote: > > >>>>> > > >>>>>> I'd like to +1 what Michael said about the issues with the > > existing > > >>>>> branch > > >>>>>> method, I agree with what he's outlined and I think we shoul= d > > proceed by > > >>>>>> trying to alleviate these problems. Specifically it seems > > important to be > > >>>>>> able to cleanly access the individual branches (eg by mappin= g > > >>>>>> name->stream), which I thought was the original intention of > > this KIP. > > >>>>>> > > >>>>>> That said, I don't think we should so easily give in to the > > double brace > > >>>>>> anti-pattern or force ours users into it if at all possible = to > > >>>>> avoid...just > > >>>>>> my two cents. > > >>>>>> > > >>>>>> Cheers, > > >>>>>> Sophie > > >>>>>> > > >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < > > >>>>>> michael.drogalis@confluent.io > > > wrote: > > >>>>>> > > >>>>>>> I=E2=80=99d like to propose a different way of thinking abo= ut this. > > To me, > > >>>>> there > > >>>>>>> are three problems with the existing branch signature: > > >>>>>>> > > >>>>>>> 1. If you use it the way most people do, Java raises unsafe > type > > >>>>>> warnings. > > >>>>>>> 2. The way in which you use the stream branches is > > positionally coupled > > >>>>>> to > > >>>>>>> the ordering of the conditionals. > > >>>>>>> 3. It is brittle to extend existing branch calls with > > additional code > > >>>>>>> paths. > > >>>>>>> > > >>>>>>> Using associative constructs instead of relying on ordered > > constructs > > >>>>>> would > > >>>>>>> be a stronger approach. Consider a signature that instead > > looks like > > >>>>>> this: > > >>>>>>> Map> KStream#branch(SortedMap > Predicate > >>>>>>> super K,? super V>>); > > >>>>>>> > > >>>>>>> Branches are given names in a map, and as a result, the API > > returns a > > >>>>>>> mapping of names to streams. The ordering of the > conditionals is > > >>>>>> maintained > > >>>>>>> because it=E2=80=99s a sorted map. Insert order determines = the order > of > > >>>>>> evaluation. > > >>>>>>> This solves problem 1 because there are no more varargs. It > > solves > > >>>>>> problem > > >>>>>>> 2 because you no longer lean on ordering to access the > > branch you=E2=80=99re > > >>>>>>> interested in. It solves problem 3 because you can introduc= e > > another > > >>>>>>> conditional by simply attaching another name to the > > structure, rather > > >>>>>> than > > >>>>>>> messing with the existing indices. > > >>>>>>> > > >>>>>>> One of the drawbacks is that creating the map inline is > > historically > > >>>>>>> awkward in Java. I know it=E2=80=99s an anti-pattern to use > > voluminously, but > > >>>>>>> double brace initialization would clean up the aesthetics. > > >>>>>>> > > >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler > > > > > >>>>> wrote: > > >>>>>>>> Hi Ivan, > > >>>>>>>> > > >>>>>>>> Thanks for the update. > > >>>>>>>> > > >>>>>>>> FWIW, I agree with Matthias that the current "start > branching" > > >>>>> operator > > >>>>>>> is > > >>>>>>>> confusing when named the same way as the actual branches. > > "Split" > > >>>>> seems > > >>>>>>>> like a good name. Alternatively, we can do without a "star= t > > >>>>> branching" > > >>>>>>>> operator at all, and just do: > > >>>>>>>> > > >>>>>>>> stream > > >>>>>>>> .branch(Predicate) > > >>>>>>>> .branch(Predicate) > > >>>>>>>> .defaultBranch(); > > >>>>>>>> > > >>>>>>>> Tentatively, I think that this branching operation should = be > > >>>>> terminal. > > >>>>>>> That > > >>>>>>>> way, we don't create ambiguity about how to use it. That > > is, `branch` > > >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is > > `void`, to > > >>>>>>>> enforce that it comes last, and that there is only one > > definition of > > >>>>>> the > > >>>>>>>> default branch. Potentially, we should log a warning if > > there's no > > >>>>>>> default, > > >>>>>>>> and additionally log a warning (or throw an exception) if = a > > record > > >>>>>> falls > > >>>>>>>> though with no default. > > >>>>>>>> > > >>>>>>>> Thoughts? > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> -John > > >>>>>>>> > > >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < > > >>>>> matthias@confluent.io > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Thanks for updating the KIP and your answers. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>>> this is to make the name similar to String#split > > >>>>>>>>>>> that also returns an array, right? > > >>>>>>>>> The intend was to avoid name duplication. The return type > > should > > >>>>>> _not_ > > >>>>>>>>> be an array. > > >>>>>>>>> > > >>>>>>>>> The current proposal is > > >>>>>>>>> > > >>>>>>>>> stream.branch() > > >>>>>>>>> .branch(Predicate) > > >>>>>>>>> .branch(Predicate) > > >>>>>>>>> .defaultBranch(); > > >>>>>>>>> > > >>>>>>>>> IMHO, this reads a little odd, because the first > > `branch()` does > > >>>>> not > > >>>>>>>>> take any parameters and has different semantics than the > later > > >>>>>>>>> `branch()` calls. Note, that from the code snippet above, > it's > > >>>>> hidden > > >>>>>>>>> that the first call is `KStream#branch()` while the other= s > are > > >>>>>>>>> `KBranchedStream#branch()` what makes reading the code > harder. > > >>>>>>>>> > > >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`= , > > I though > > >>>>>> it > > >>>>>>>>> might be better to also rename `KStream#branch()` to avoi= d > the > > >>>>> naming > > >>>>>>>>> overlap that seems to be confusing. The following reads > much > > >>>>> cleaner > > >>>>>> to > > >>>>>>>> me: > > >>>>>>>>> stream.split() > > >>>>>>>>> .branch(Predicate) > > >>>>>>>>> .branch(Predicate) > > >>>>>>>>> .defaultBranch(); > > >>>>>>>>> > > >>>>>>>>> Maybe there is a better alternative to `split()` though t= o > > avoid > > >>>>> the > > >>>>>>>>> naming overlap. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately > we > > >>>>> cannot > > >>>>>>> have > > >>>>>>>>> a method with such name :-) > > >>>>>>>>> > > >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up > > with a > > >>>>> short > > >>>>>>>> name? > > >>>>>>>>> > > >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP > > with all > > >>>>> it's > > >>>>>>>>> methods? It will be part of public API and should be > > contained in > > >>>>> the > > >>>>>>>>> KIP. For example, it's unclear atm, what the return type = of > > >>>>>>>>> `defaultBranch()` is. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> You did not comment on the idea to add a > > `KBranchedStream#get(int > > >>>>>>> index) > > >>>>>>>>> -> KStream` method to get the individually > > branched-KStreams. Would > > >>>>>> be > > >>>>>>>>> nice to get your feedback about it. It seems you suggest > > that users > > >>>>>>>>> would need to write custom utility code otherwise, to > > access them. > > >>>>> We > > >>>>>>>>> should discuss the pros and cons of both approaches. It > feels > > >>>>>>>>> "incomplete" to me atm, if the API has no built-in suppor= t > > to get > > >>>>> the > > >>>>>>>>> branched-KStreams directly. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -Matthias > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: > > >>>>>>>>>> Hi all! > > >>>>>>>>>> > > >>>>>>>>>> I have updated the KIP-418 according to the new vision. > > >>>>>>>>>> > > >>>>>>>>>> Matthias, thanks for your comment! > > >>>>>>>>>> > > >>>>>>>>>>> Renaming KStream#branch() -> #split() > > >>>>>>>>>> I can see your point: this is to make the name similar t= o > > >>>>>>> String#split > > >>>>>>>>>> that also returns an array, right? But is it worth the > > loss of > > >>>>>>>> backwards > > >>>>>>>>>> compatibility? We can have overloaded branch() as well > > without > > >>>>>>>> affecting > > >>>>>>>>>> the existing code. Maybe the old array-based `branch` > method > > >>>>> should > > >>>>>>> be > > >>>>>>>>>> deprecated, but this is a subject for discussion. > > >>>>>>>>>> > > >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> > > >>>>> BranchingKStream#branch(), > > >>>>>>>>>> KBranchedStream#defaultBranch() -> > BranchingKStream#default() > > >>>>>>>>>> > > >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' > is, > > >>>>>>> however, a > > >>>>>>>>>> reserved word, so unfortunately we cannot have a method > > with such > > >>>>>>> name > > >>>>>>>>> :-) > > >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument, > but I > > >>>>> think > > >>>>>>> that > > >>>>>>>>>> is not required? > > >>>>>>>>>> > > >>>>>>>>>> Absolutely! I think that was just copy-paste error or > > something. > > >>>>>>>>>> > > >>>>>>>>>> Dear colleagues, > > >>>>>>>>>> > > >>>>>>>>>> please revise the new version of the KIP and Paul's PR > > >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) > > >>>>>>>>>> > > >>>>>>>>>> Any new suggestions/objections? > > >>>>>>>>>> > > >>>>>>>>>> Regards, > > >>>>>>>>>> > > >>>>>>>>>> Ivan > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax =D0=BF=D0=B8=D1=88=D0= =B5=D1=82: > > >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems > that > > >>>>>>> everybody > > >>>>>>>>>>> agrees that the current branch() method using arrays is > not > > >>>>>> optimal. > > >>>>>>>>>>> I had a quick look into the PR and I like the overall > > proposal. > > >>>>>>> There > > >>>>>>>>>>> are some minor things we need to consider. I would > > recommend the > > >>>>>>>>>>> following renaming: > > >>>>>>>>>>> > > >>>>>>>>>>> KStream#branch() -> #split() > > >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch(= ) > > >>>>>>>>>>> KBranchedStream#defaultBranch() -> > > BranchingKStream#default() > > >>>>>>>>>>> > > >>>>>>>>>>> It's just a suggestion to get slightly shorter method > names. > > >>>>>>>>>>> > > >>>>>>>>>>> In the current PR, defaultBranch() does take an > > `Predicate` as > > >>>>>>>> argument, > > >>>>>>>>>>> but I think that is not required? > > >>>>>>>>>>> > > >>>>>>>>>>> Also, we should consider KIP-307, that was recently > > accepted and > > >>>>>> is > > >>>>>>>>>>> currently implemented: > > >>>>>>>>>>> > > >>>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+def= ine+custom+processor+names+with+KStreams+DSL > > >>>>>>>>>>> Ie, we should add overloads that accepted a `Named` > > parameter. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> For the issue that the created `KStream` object are in > > different > > >>>>>>>> scopes: > > >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int > > index)` method > > >>>>>>> that > > >>>>>>>>>>> returns the corresponding "branched" result `KStream` > > object? > > >>>>>> Maybe, > > >>>>>>>> the > > >>>>>>>>>>> second argument of `addBranch()` should not be a > > >>>>>> `Consumer` > > >>>>>>>> but > > >>>>>>>>>>> a `Function` and `get()` could return > > whatever > > >>>>>> the > > >>>>>>>>>>> `Function` returns? > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Finally, I would also suggest to update the KIP with th= e > > current > > >>>>>>>>>>> proposal. That makes it easier to review. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Matthias > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: > > >>>>>>>>>>>> Ivan, > > >>>>>>>>>>>> > > >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it > > makes sense > > >>>>>> for > > >>>>>>>> you > > >>>>>>>>> to > > >>>>>>>>>>>> revise the KIP and continue the discussion. Obviously > > we'll > > >>>>> need > > >>>>>>>> some > > >>>>>>>>>>>> buy-in from committers that have actual binding votes = on > > >>>>> whether > > >>>>>>> the > > >>>>>>>>> KIP > > >>>>>>>>>>>> could be adopted. It would be great to hear if they > > think this > > >>>>>> is > > >>>>>>> a > > >>>>>>>>> good > > >>>>>>>>>>>> idea overall. I'm not sure if that happens just by > > starting a > > >>>>>>> vote, > > >>>>>>>>> or if > > >>>>>>>>>>>> there is generally some indication of interest > beforehand. > > >>>>>>>>>>>> > > >>>>>>>>>>>> That being said, I'll continue the discussion a bit: > > assuming > > >>>>> we > > >>>>>> do > > >>>>>>>>> move > > >>>>>>>>>>>> forward the solution of "stream.branch() returns > > >>>>>> KBranchedStream", > > >>>>>>> do > > >>>>>>>>> we > > >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I > would > > >>>>> favor > > >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs > that > > >>>>>>> accomplish > > >>>>>>>>> the > > >>>>>>>>>>>> same thing is confusing, especially when they're fairl= y > > similar > > >>>>>>>>> anyway. We > > >>>>>>>>>>>> just need to be sure we're not making something > > >>>>>>> impossible/difficult > > >>>>>>>>> that > > >>>>>>>>>>>> is currently possible/easy. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Regarding my PR - I think the general structure would > work, > > >>>>> it's > > >>>>>>>> just a > > >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. = In > > >>>>>>> particular, > > >>>>>>>>>>>> passing in the "predicates" and "children" lists which > get > > >>>>>> modified > > >>>>>>>> in > > >>>>>>>>>>>> KBranchedStream but read from all the way > > KStreamLazyBranch is > > >>>>> a > > >>>>>>> bit > > >>>>>>>>>>>> complicated to follow. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks, > > >>>>>>>>>>>> Paul > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < > > >>>>>> iponomarev@mail.ru > > >>>>>>>>> wrote: > > >>>>>>>>>>>>> Hi Paul! > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I read your code carefully and now I am fully > > convinced: your > > >>>>>>>> proposal > > >>>>>>>>>>>>> looks better and should work. We just have to documen= t > the > > >>>>>> crucial > > >>>>>>>>> fact > > >>>>>>>>>>>>> that KStream consumers are invoked as they're added. > > And then > > >>>>>> it's > > >>>>>>>> all > > >>>>>>>>>>>>> going to be very nice. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and > > resume the > > >>>>>>>>>>>>> discussion here, right? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Why are you telling that your PR 'should not be even = a > > >>>>> starting > > >>>>>>>> point > > >>>>>>>>> if > > >>>>>>>>>>>>> we go in this direction'? To me it looks like a good > > starting > > >>>>>>> point. > > >>>>>>>>> But > > >>>>>>>>>>>>> as a novice in this project I might miss some importa= nt > > >>>>> details. > > >>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Ivan > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen =D0=BF=D0=B8=D1=88=D0= =B5=D1=82: > > >>>>>>>>>>>>>> Ivan, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Maybe I=E2=80=99m missing the point, but I believe t= he > > >>>>> stream.branch() > > >>>>>>>>> solution > > >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will = be > > >>>>> invoked > > >>>>>> as > > >>>>>>>>> they=E2=80=99re > > >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user > > still > > >>>>>> ought > > >>>>>>> to > > >>>>>>>>> be > > >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and > > depend on > > >>>>> the > > >>>>>>>>> branched > > >>>>>>>>>>>>> streams having been set. > > >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to > > access > > >>>>> the > > >>>>>>>>> branched > > >>>>>>>>>>>>> streams in the same scope as the original stream (tha= t > > is, not > > >>>>>>>> inside > > >>>>>>>>> the > > >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed > > >>>>> solutions. > > >>>>>> It > > >>>>>>>>> can be > > >>>>>>>>>>>>> worked around though. > > >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I= =E2=80=99m > > excited > > >>>>> to > > >>>>>>>> hear > > >>>>>>>>>>>>> your thoughts!] > > >>>>>>>>>>>>>> Paul > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < > > >>>>>> iponomarev@mail.ru > > >>>>>>>>> wrote: > > >>>>>>>>>>>>>>> Hi Paul! > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the > > >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at > first > > >>>>> glance, > > >>>>>>> but > > >>>>>>>>> --- > > >>>>>>>>>>>>>>>> the newly branched streams are not available in th= e > > same > > >>>>>> scope > > >>>>>>> as > > >>>>>>>>> each > > >>>>>>>>>>>>> other. That is, if we wanted to merge them back > together > > >>>>> again > > >>>>>> I > > >>>>>>>>> don't see > > >>>>>>>>>>>>> a way to do that. > > >>>>>>>>>>>>>>> You just took the words right out of my mouth, I wa= s > > just > > >>>>>> going > > >>>>>>> to > > >>>>>>>>>>>>> write in details about this issue. > > >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say > > we need > > >>>>> to > > >>>>>>>>> identify > > >>>>>>>>>>>>> customers who have bought coffee and made a purchase > > in the > > >>>>>>>>> electronics > > >>>>>>>>>>>>> store to give them coupons. > > >>>>>>>>>>>>>>> This is the code I usually write under these > > circumstances > > >>>>>> using > > >>>>>>>> my > > >>>>>>>>>>>>> 'brancher' class: > > >>>>>>>>>>>>>>> @Setter > > >>>>>>>>>>>>>>> class CouponIssuer{ > > >>>>>>>>>>>>>>> private KStream<....> coffePurchases; > > >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases; > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> KStream<...> coupons(){ > > >>>>>>>>>>>>>>> return > > >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever > > >>>>>>>>>>>>>>> /*In the real world the code here can be > > complex, so > > >>>>>>>>> creation of > > >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in > > order to > > >>>>>>>> separate > > >>>>>>>>>>>>> classes' responsibilities.*/ > > >>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> CouponIssuer couponIssuer =3D new CouponIssuer(); > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() > > >>>>>>>>>>>>>>> .branch(predicate1, > couponIssuer::setCoffePurchases) > > >>>>>>>>>>>>>>> .branch(predicate2, > > >>>>>> couponIssuer::setElectronicsPurchases) > > >>>>>>>>>>>>>>> .onTopOf(transactionStream); > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up > > everything > > >>>>>>>> later, > > >>>>>>>>>>>>> without the terminal operation!!!*/ > > >>>>>>>>>>>>>>> couponIssuer.coupons()... > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Does this make sense? In order to properly > > initialize the > > >>>>>>>>> CouponIssuer > > >>>>>>>>>>>>> we need the terminal operation to be called before > > >>>>>>>>> streamsBuilder.build() > > >>>>>>>>>>>>> is called. > > >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is > > essentially > > >>>>>> the > > >>>>>>>>> next > > >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts > > based on > > >>>>> my > > >>>>>>>>> experience, > > >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.] > > >>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Ivan > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen =D0=BF=D0=B8=D1=88=D0= =B5=D1=82: > > >>>>>>>>>>>>>>>> Ivan, > > >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a > > fluent > > >>>>> API > > >>>>>>>> based > > >>>>>>>>>>>>> off of > > >>>>>>>>>>>>>>>> KStream here > > (https://github.com/apache/kafka/pull/6512), > > >>>>>> and > > >>>>>>> I > > >>>>>>>>> think > > >>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>> succeeded at removing both cons. > > >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about > > >>>>>>> compatibility > > >>>>>>>>>>>>> issues, > > >>>>>>>>>>>>>>>> there aren't any direct ones. I was unaware > > that Java > > >>>>> is > > >>>>>>>> smart > > >>>>>>>>>>>>> enough to > > >>>>>>>>>>>>>>>> distinguish between a branch(varargs...) > > returning one > > >>>>>>> thing > > >>>>>>>>> and > > >>>>>>>>>>>>> branch() > > >>>>>>>>>>>>>>>> with no arguments returning another thing. > > >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't actuall= y > > need > > >>>>> it. > > >>>>>>> We > > >>>>>>>>> can > > >>>>>>>>>>>>> just > > >>>>>>>>>>>>>>>> build up the branches in the KBranchedStream wh= o > > shares > > >>>>>> its > > >>>>>>>>> state > > >>>>>>>>>>>>> with the > > >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the > > branching. > > >>>>>>> It's > > >>>>>>>>> not > > >>>>>>>>>>>>> terribly > > >>>>>>>>>>>>>>>> pretty in its current form, but I think it > > demonstrates > > >>>>>> its > > >>>>>>>>>>>>> feasibility. > > >>>>>>>>>>>>>>>> To be clear, I don't think that pull request shoul= d > be > > >>>>> final > > >>>>>> or > > >>>>>>>>> even a > > >>>>>>>>>>>>>>>> starting point if we go in this direction, I just > > wanted to > > >>>>>> see > > >>>>>>>> how > > >>>>>>>>>>>>>>>> challenging it would be to get the API working. > > >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing > > solution > > >>>>>>> could > > >>>>>>>> be > > >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originall= y > > >>>>> suggested > > >>>>>>>> was a > > >>>>>>>>>>>>>>>> possibility. The reason is that the newly branche= d > > streams > > >>>>>> are > > >>>>>>>> not > > >>>>>>>>>>>>>>>> available in the same scope as each other. That > > is, if we > > >>>>>>> wanted > > >>>>>>>>> to > > >>>>>>>>>>>>> merge > > >>>>>>>>>>>>>>>> them back together again I don't see a way to do > > that. The > > >>>>>> KIP > > >>>>>>>>>>>>> proposal > > >>>>>>>>>>>>>>>> has the same issue, though - all this means is tha= t > for > > >>>>>> either > > >>>>>>>>>>>>> solution, > > >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the > > table. > > >>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>> Paul > > >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < > > >>>>>>>>> iponomarev@mail.ru > > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to > this > > >>>>>> point. > > >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that > > branch API > > >>>>>>> needs > > >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> There are two potential ways to do it: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() > > >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) > > >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) > > >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional > > >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf > > returns > > >>>>>> its > > >>>>>>>>> argument > > >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code > won't > > >>>>> make > > >>>>>>>> sense > > >>>>>>>>>>>>> until > > >>>>>>>>>>>>>>>>> all the necessary ingredients are provided. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher > > instance > > >>>>>>>>> contrasts the > > >>>>>>>>>>>>>>>>> fluency of other KStream methods. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 2. (as Paul proposes) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) > > >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) > > >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both > > >>>>>>>>> defaultBranch(..) > > >>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> noDefault() return void > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interfac= e > is > > >>>>>> defined. > > >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods > > >>>>>>>> (defaultBranch(ks->) > > >>>>>>>>> and > > >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to > > miss the > > >>>>>> fact > > >>>>>>>>> that one > > >>>>>>>>>>>>>>>>> of the terminal methods should be called. If thes= e > > methods > > >>>>>> are > > >>>>>>>> not > > >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do > better? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Ivan > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev =D0=BF=D0=B8=D1= =88=D0=B5=D1=82: > > >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev =D0=BF=D0=B8=D1= =88=D0=B5=D1=82: > > >>>>>>>>>>>>>>>>>>> Paul, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I see your point when you are talking about > > >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be > > implemented the > > >>>>>>> easy > > >>>>>>>>> way. > > >>>>>>>>>>>>>>>>>>> Maybe we all should think further. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that > assumes > > >>>>> nothing > > >>>>>>>> will > > >>>>>>>>>>>>> reach > > >>>>>>>>>>>>>>>>>>>> the default branch, > > >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only > option > > >>>>>> besides > > >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we > > want to > > >>>>>> just > > >>>>>>>>> silently > > >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any > > predicate. 2) > > >>>>>>> Throwing > > >>>>>>>>> an > > >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing > > looks > > >>>>>> like a > > >>>>>>>> bad > > >>>>>>>>>>>>> idea. > > >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer t= o > > emit a > > >>>>>>>> special > > >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly > where > > >>>>>>> `default` > > >>>>>>>>> can > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>> used. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the > > >>>>> InternalTopologyBuilder > > >>>>>>> to > > >>>>>>>>> track > > >>>>>>>>>>>>>>>>>>>> dangling > > >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise > > a clear > > >>>>>>> error > > >>>>>>>>>>>>> before it > > >>>>>>>>>>>>>>>>>>> becomes an issue. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program = is > > >>>>> compiled > > >>>>>>> and > > >>>>>>>>> run? > > >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't > > compile if > > >>>>> used > > >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a > > method chain > > >>>>>>>> starting > > >>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference > > between > > >>>>>>>> runtime > > >>>>>>>>> and > > >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers > > >>>>> instantly > > >>>>>> on > > >>>>>>>>> unit > > >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a > > compilation > > >>>>>>>> failure. > > >>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Ivan > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen =D0=BF=D0=B8=D1=88= =D0=B5=D1=82: > > >>>>>>>>>>>>>>>>>>>> Ivan, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being > > required. > > >>>>>>> But > > >>>>>>>> is > > >>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>> really > > >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a > > >>>>>> defaultBranch > > >>>>>>>>> they > > >>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>> call > > >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?= ) > > just as > > >>>>>>>>> easily. In > > >>>>>>>>>>>>>>>>>>>> fact I > > >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer AP= I > - a > > >>>>> user > > >>>>>>>> could > > >>>>>>>>>>>>> specify > > >>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reac= h > the > > >>>>>> default > > >>>>>>>>> branch, > > >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. > That > > >>>>> seems > > >>>>>>> like > > >>>>>>>>> an > > >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API, > > which allows > > >>>>>> for > > >>>>>>>> the > > >>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>> subtle > > >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting > dropped. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly ha= s > > to be > > >>>>>> well > > >>>>>>>>>>>>>>>>>>>> documented, but > > >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the > > >>>>> InternalTopologyBuilder > > >>>>>>> to > > >>>>>>>>> track > > >>>>>>>>>>>>>>>>>>>> dangling > > >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and rais= e > > a clear > > >>>>>>> error > > >>>>>>>>>>>>> before it > > >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there i= s > a > > >>>>> "build > > >>>>>>>> step" > > >>>>>>>>>>>>> where > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when > > >>>>>> StreamsBuilder.build() > > >>>>>>> is > > >>>>>>>>>>>>> called. > > >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I > agree > > >>>>> that > > >>>>>>> it's > > >>>>>>>>>>>>>>>>>>>> critical to > > >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the inpu= t > > stream. > > >>>>>>> With > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>> fluent > > >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all > other > > >>>>>>> operations > > >>>>>>>>> do - > > >>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>> want to process off the original KStream > multiple > > >>>>> times, > > >>>>>>> you > > >>>>>>>>> just > > >>>>>>>>>>>>>>>>>>>> need the > > >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many > > operations > > >>>>>> on > > >>>>>>> it > > >>>>>>>>> as > > >>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>> desire. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thoughts? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>> Paul > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev= < > > >>>>>>>>> iponomarev@mail.ru > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hello Paul, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not > > always need > > >>>>>> the > > >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal > operation we > > >>>>> don't > > >>>>>>>> know > > >>>>>>>>>>>>> when to > > >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, > > so we > > >>>>> can > > >>>>>> do > > >>>>>>>>>>>>> something > > >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching= . > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of > special > > >>>>> object > > >>>>>>>>>>>>> construction > > >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods= . > But > > >>>>> here > > >>>>>> we > > >>>>>>>>> have a > > >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split th= e > > flow, > > >>>>> so > > >>>>>> I > > >>>>>>>>> think > > >>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>> still idiomatic. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Ivan > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen =D0=BF=D0=B8=D1= =88=D0=B5=D1=82: > > >>>>>>>>>>>>>>>>>>>>>> Ivan, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this > > API, but I > > >>>>>> find > > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> onTopOff() > > >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it > > contrasts the > > >>>>>>> fluency > > >>>>>>>>> of > > >>>>>>>>>>>>> other > > >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to > > just call > > >>>>> a > > >>>>>>>>> method on > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branc= h > > cases > > >>>>> are > > >>>>>>>>> defined > > >>>>>>>>>>>>>>>>>>>>>> fluently. > > >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) > > is very > > >>>>>> nice > > >>>>>>>>> and the > > >>>>>>>>>>>>>>>>>>>>>> right > > >>>>>>>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around > > how we > > >>>>>>> specify > > >>>>>>>>> the > > >>>>>>>>>>>>> source > > >>>>>>>>>>>>>>>>>>>>>> stream. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Like: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> stream.branch() > > >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, > this::handle1) > > >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, > this::handle2) > > >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault= ); > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or > > >>>>>>>> KStreamBrancher > > >>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>>> something, > > >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and > > terminated by > > >>>>>>>>>>>>> defaultBranch() > > >>>>>>>>>>>>>>>>>>>>>> (which > > >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously > > incompatible with > > >>>>> the > > >>>>>>>>> current > > >>>>>>>>>>>>>>>>>>>>>> API, so > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a > > different > > >>>>>> name, > > >>>>>>>> but > > >>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call = it > > >>>>>> something > > >>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>>> branched() > > >>>>>>>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your > > KIP? It > > >>>>>> seems > > >>>>>>>>> like it > > >>>>>>>>>>>>>>>>>>>>>> does to > > >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching > > while also > > >>>>>>>> allowing > > >>>>>>>>> you > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of > > KBranchedStreams > > >>>>>> if > > >>>>>>>>> desired. > > >>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>> Paul > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomar= ev > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream > > ks){ > > >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...) > > >>>>>>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream > String> ks){ > > >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... > > >>>>>>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> ...... > > >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher() > > >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, > > this::handleFirstCase) > > >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, > > this::handleSecondCase) > > >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Ivan > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck =D0=BF=D0=B8= =D1=88=D0=B5=D1=82: > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the > KafkaStreamsBrancher > > >>>>> takes a > > >>>>>>>>> Consumer > > >>>>>>>>>>>>> as a > > >>>>>>>>>>>>>>>>>>>>>>> second > > >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the > > example in > > >>>>>> the > > >>>>>>>> KIP > > >>>>>>>>>>>>> shows > > >>>>>>>>>>>>>>>>>>>>>>>> each > > >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal no= de > > >>>>>>>>> (KafkaStreams#to() > > >>>>>>>>>>>>>>>>>>>>>>>> in this > > >>>>>>>>>>>>>>>>>>>>>>>> case). > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would > we > > >>>>> handle > > >>>>>>> the > > >>>>>>>>> case > > >>>>>>>>>>>>>>>>>>>>>>>> where the > > >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to > continue > > >>>>>>>> processing > > >>>>>>>>> and > > >>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the > branched > > >>>>>> stream > > >>>>>>>>>>>>> immediately? > > >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if > > we had > > >>>>>>>> something > > >>>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>>>>> this: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> KStream[] branches =3D > > >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, > > >>>>>>>>>>>>>>>>>>>>>>>> predicate2); > > >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. > > >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()...= .. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks! > > >>>>>>>>>>>>>>>>>>>>>>>> Bill > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejec= k > < > > >>>>>>>>> bbejeck@gmail.com > > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> All, > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for > KIP- > > >>>>> 418. > > >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Hello, > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about > KIP-418. > > >>>>> Please > > >>>>>>>> take > > >>>>>>>>> a > > >>>>>>>>>>>>> look > > >>>>>>>>>>>>>>>>> at > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any > > feedback :) > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: > > >>>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-cha= ining+way+to+branch+KStream > > >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: > > >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 > > >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: > > >>>>> https://github.com/apache/kafka/pull/6164 > > >>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>> > > > > > > --000000000000d449bb0589a9ac4e--