Matthias: I think that's pretty reasonable from my point of view. Good
suggestion.
On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <matthias@confluent.io>
wrote:
> Interesting discussion.
>
> I am wondering, if we cannot unify the advantage of both approaches:
>
>
>
> KStream#split() -> KBranchedStream
>
> // branch is not easily accessible in current scope
> KBranchedStream#branch(Predicate, Consumer<KStream>)
> -> KBranchedStream
>
> // assign a name to the branch and
> // return the sub-stream to the current scope later
> //
> // can be simple as `#branch(p, s->s, "name")`
> // or also complex as `#branch(p, s->s.filter(...), "name")`
> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
> -> KBranchedStream
>
> // default branch is not easily accessible
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Cosumer<KStream>)
> -> Map<String,KStream>
>
> // assign custom name to default-branch
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Function<KStream,KStream>, String)
> -> Map<String,KStream>
>
> // assign a default name for default
> // return map of all named sub-stream into current scope
> KBranchedStream#defaultBranch(Function<KStream,KStream>)
> -> Map<String,KStream>
>
> // return map of all names sub-stream into current scope
> KBranchedStream#noDefaultBranch()
> -> Map<String,KStream>
>
>
>
> Hence, for each sub-stream, the user can pick to add a name and return
> the branch "result" to the calling scope or not. The implementation can
> also check at runtime that all returned names are unique. The returned
> Map can be empty and it's also optional to use the Map.
>
> To me, it seems like a good way to get best of both worlds.
>
> Thoughts?
>
>
>
> -Matthias
>
>
>
>
> On 5/6/19 5:15 PM, John Roesler wrote:
> > Ivan,
> >
> > That's a very good point about the "start" operator in the dynamic case.
> > I had no problem with "split()"; I was just questioning the necessity.
> > Since you've provided a proof of necessity, I'm in favor of the
> > "split()" start operator. Thanks!
> >
> > Separately, I'm interested to see where the present discussion leads.
> > I've written enough Javascript code in my life to be suspicious of
> > nested closures. You have a good point about using method references (or
> > indeed function literals also work). It should be validating that this
> > was also the JS community's first approach to flattening the logic when
> > their nested closure situation got out of hand. Unfortunately, it's
> > replacing nesting with redirection, both of which disrupt code
> > readability (but in different ways for different reasons). In other
> > words, I agree that function references is *the* first-order solution if
> > the nested code does indeed become a problem.
> >
> > However, the history of JS also tells us that function references aren't
> > the end of the story either, and you can see that by observing that
> > there have been two follow-on eras, as they continue trying to cope with
> > the consequences of living in such a callback-heavy language. First, you
> > have Futures/Promises, which essentially let you convert nested code to
> > method-chained code (Observables/FP is a popular variation on this).
> > Most lately, you have async/await, which is an effort to apply language
> > (not just API) syntax to the problem, and offer the "flattest" possible
> > programming style to solve the problem (because you get back to just one
> > code block per functional unit).
> >
> > Stream-processing is a different domain, and Java+KStreams is nowhere
> > near as callback heavy as JS, so I don't think we have to take the JS
> > story for granted, but then again, I think we can derive some valuable
> > lessons by looking sideways to adjacent domains. I'm just bringing this
> > up to inspire further/deeper discussion. At the same time, just like JS,
> > we can afford to take an iterative approach to the problem.
> >
> > Separately again, I'm interested in the post-branch merge (and I'd also
> > add join) problem that Paul brought up. We can clearly punt on it, by
> > terminating the nested branches with sink operators. But is there a DSL
> > way to do it?
> >
> > Thanks again for your driving this,
> > -John
> >
> > On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwhalen@gmail.com
> > <mailto:pgwhalen@gmail.com>> wrote:
> >
> > Ivan, I’ll definitely forfeit my point on the clumsiness of the
> > branch(predicate, consumer) solution, I don’t see any real drawbacks
> > for the dynamic case.
> >
> > IMO the one trade off to consider at this point is the scope
> > question. I don’t know if I totally agree that “we rarely need them
> > in the same scope” since merging the branches back together later
> > seems like a perfectly plausible use case that can be a lot nicer
> > when the branched streams are in the same scope. That being said,
> > for the reasons Ivan listed, I think it is overall the better
> > solution - working around the scope thing is easy enough if you need
> > to.
> >
> > > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
> > <iponomarev@mail.ru.invalid> wrote:
> > >
> > > Hello everyone, thank you all for joining the discussion!
> > >
> > > Well, I don't think the idea of named branches, be it a
> > LinkedHashMap (no other Map will do, because order of definition
> > matters) or `branch` method taking name and Consumer has more
> > advantages than drawbacks.
> > >
> > > In my opinion, the only real positive outcome from Michael's
> > proposal is that all the returned branches are in the same scope.
> > But 1) we rarely need them in the same scope 2) there is a
> > workaround for the scope problem, described in the KIP.
> > >
> > > 'Inlining the complex logic' is not a problem, because we can use
> > method references instead of lambdas. In real world scenarios you
> > tend to split the complex logic to methods anyway, so the code is
> > going to be clean.
> > >
> > > The drawbacks are strong. The cohesion between predicates and
> > handlers is lost. We have to define predicates in one place, and
> > handlers in another. This opens the door for bugs:
> > >
> > > - what if we forget to define a handler for a name? or a name for
> > a handler?
> > > - what if we misspell a name?
> > > - what if we copy-paste and duplicate a name?
> > >
> > > What Michael propose would have been totally OK if we had been
> > writing the API in Lua, Ruby or Python. In those languages the
> > "dynamic naming" approach would have looked most concise and
> > beautiful. But in Java we expect all the problems related to
> > identifiers to be eliminated in compile time.
> > >
> > > Do we have to invent duck-typing for the Java API?
> > >
> > > And if we do, what advantage are we supposed to get besides having
> > all the branches in the same scope? Michael, maybe I'm missing your
> > point?
> > >
> > > ---
> > >
> > > Earlier in this discussion John Roesler also proposed to do
> > without "start branching" operator, and later Paul mentioned that in
> > the case when we have to add a dynamic number of branches, the
> > current KIP is 'clumsier' compared to Michael's 'Map' solution. Let
> > me address both comments here.
> > >
> > > 1) "Start branching" operator (I think that *split* is a good name
> > for it indeed) is critical when we need to do a dynamic branching,
> > see example below.
> > >
> > > 2) No, dynamic branching in current KIP is not clumsy at all.
> > Imagine a real-world scenario when you need one branch per enum
> > value (say, RecordType). You can have something like this:
> > >
> > > /*John:if we had to start with stream.branch(...) here, it would
> > have been much messier.*/
> > > KBranchedStream branched = stream.split();
> > >
> > > /*Not clumsy at all :-)*/
> > > for (RecordType recordType : RecordType.values())
> > > branched = branched.branch((k, v) -> v.getRecType() ==
> > recordType,
> > > recordType::processRecords);
> > >
> > > Regards,
> > >
> > > Ivan
> > >
> > >
> > > 02.05.2019 14:40, Matthias J. Sax пишет:
> > >> I also agree with Michael's observation about the core problem of
> > >> current `branch()` implementation.
> > >>
> > >> However, I also don't like to pass in a clumsy Map object. My
> > thinking
> > >> was more aligned with Paul's proposal to just add a name to each
> > >> `branch()` statement and return a `Map<String,KStream>`.
> > >>
> > >> It makes the code easier to read, and also make the order of
> > >> `Predicates` (that is essential) easier to grasp.
> > >>
> > >>>>>> Map<String, KStream<K, V>> branches = stream.split()
> > >>>>>> .branch("branchOne", Predicate<K, V>)
> > >>>>>> .branch( "branchTwo", Predicate<K, V>)
> > >>>>>> .defaultBranch("defaultBranch");
> > >> An open question is the case for which no defaultBranch() should
> be
> > >> specified. Atm, `split()` and `branch()` would return
> > `BranchedKStream`
> > >> and the call to `defaultBranch()` that returns the `Map` is
> mandatory
> > >> (what is not the case atm). Or is this actually not a real
> problem,
> > >> because users can just ignore the branch returned by
> > `defaultBranch()`
> > >> in the result `Map` ?
> > >>
> > >>
> > >> About "inlining": So far, it seems to be a matter of personal
> > >> preference. I can see arguments for both, but no "killer
> > argument" yet
> > >> that clearly make the case for one or the other.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
> > >>> Perhaps inlining is the wrong terminology. It doesn’t require
> > that a lambda with the full downstream topology be defined inline -
> > it can be a method reference as with Ivan’s original suggestion.
> > The advantage of putting the predicate and its downstream logic
> > (Consumer) together in branch() is that they are required to be near
> > to each other.
> > >>>
> > >>> Ultimately the downstream code has to live somewhere, and deep
> > branch trees will be hard to read regardless.
> > >>>
> > >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
> > <michael.drogalis@confluent.io
> > <mailto:michael.drogalis@confluent.io>> wrote:
> > >>>>
> > >>>> I'm less enthusiastic about inlining the branch logic with its
> > downstream
> > >>>> functionality. Programs that have deep branch trees will
> > quickly become
> > >>>> harder to read as a single unit.
> > >>>>
> > >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
> > <pgwhalen@gmail.com <mailto:pgwhalen@gmail.com>> wrote:
> > >>>>>
> > >>>>> Also +1 on the issues/goals as Michael outlined them, I think
> > that sets a
> > >>>>> great framework for the discussion.
> > >>>>>
> > >>>>> Regarding the SortedMap solution, my understanding is that the
> > current
> > >>>>> proposal in the KIP is what is in my PR which (pending naming
> > decisions) is
> > >>>>> roughly this:
> > >>>>>
> > >>>>> stream.split()
> > >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> > >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> > >>>>> .defaultBranch(Consumer<KStream<K, V>>);
> > >>>>>
> > >>>>> Obviously some ordering is necessary, since branching as a
> > construct
> > >>>>> doesn't work without it, but this solution seems like it
> > provides as much
> > >>>>> associativity as the SortedMap solution, because each branch()
> > call
> > >>>>> directly associates the "conditional" with the "code block."
> > The value it
> > >>>>> provides over the KIP solution is the accessing of streams in
> > the same
> > >>>>> scope.
> > >>>>>
> > >>>>> The KIP solution is less "dynamic" than the SortedMap solution
> > in the sense
> > >>>>> that it is slightly clumsier to add a dynamic number of
> > branches, but it is
> > >>>>> certainly possible. It seems to me like the API should favor
> > the "static"
> > >>>>> case anyway, and should make it simple and readable to
> > fluently declare and
> > >>>>> access your branches in-line. It also makes it impossible to
> > ignore a
> > >>>>> branch, and it is possible to build an (almost) identical
> > SortedMap
> > >>>>> solution on top of it.
> > >>>>>
> > >>>>> I could also see a middle ground where instead of a raw
> > SortedMap being
> > >>>>> taken in, branch() takes a name and not a Consumer. Something
> > like this:
> > >>>>>
> > >>>>> Map<String, KStream<K, V>> branches = stream.split()
> > >>>>> .branch("branchOne", Predicate<K, V>)
> > >>>>> .branch( "branchTwo", Predicate<K, V>)
> > >>>>> .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
> > >>>>>
> > >>>>> Pros for that solution:
> > >>>>> - accessing branched KStreams in same scope
> > >>>>> - no double brace initialization, hopefully slightly more
> > readable than
> > >>>>> SortedMap
> > >>>>>
> > >>>>> Cons
> > >>>>> - downstream branch logic cannot be specified inline which
> > makes it harder
> > >>>>> to read top to bottom (like existing API and SortedMap, but
> > unlike the KIP)
> > >>>>> - you can forget to "handle" one of the branched streams (like
> > existing
> > >>>>> API and SortedMap, but unlike the KIP)
> > >>>>>
> > >>>>> (KBranchedStreams could even work *both* ways but perhaps
> > that's overdoing
> > >>>>> it).
> > >>>>>
> > >>>>> Overall I'm curious how important it is to be able to easily
> > access the
> > >>>>> branched KStream in the same scope as the original. It's
> > possible that it
> > >>>>> doesn't need to be handled directly by the API, but instead
> > left up to the
> > >>>>> user. I'm sort of in the middle on it.
> > >>>>>
> > >>>>> Paul
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
> > <sophie@confluent.io <mailto:sophie@confluent.io>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> I'd like to +1 what Michael said about the issues with the
> > existing
> > >>>>> branch
> > >>>>>> method, I agree with what he's outlined and I think we should
> > proceed by
> > >>>>>> trying to alleviate these problems. Specifically it seems
> > important to be
> > >>>>>> able to cleanly access the individual branches (eg by mapping
> > >>>>>> name->stream), which I thought was the original intention of
> > this KIP.
> > >>>>>>
> > >>>>>> That said, I don't think we should so easily give in to the
> > double brace
> > >>>>>> anti-pattern or force ours users into it if at all possible to
> > >>>>> avoid...just
> > >>>>>> my two cents.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Sophie
> > >>>>>>
> > >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> > >>>>>> michael.drogalis@confluent.io
> > <mailto:michael.drogalis@confluent.io>> wrote:
> > >>>>>>
> > >>>>>>> I’d like to propose a different way of thinking about this.
> > To me,
> > >>>>> there
> > >>>>>>> are three problems with the existing branch signature:
> > >>>>>>>
> > >>>>>>> 1. If you use it the way most people do, Java raises unsafe
> type
> > >>>>>> warnings.
> > >>>>>>> 2. The way in which you use the stream branches is
> > positionally coupled
> > >>>>>> to
> > >>>>>>> the ordering of the conditionals.
> > >>>>>>> 3. It is brittle to extend existing branch calls with
> > additional code
> > >>>>>>> paths.
> > >>>>>>>
> > >>>>>>> Using associative constructs instead of relying on ordered
> > constructs
> > >>>>>> would
> > >>>>>>> be a stronger approach. Consider a signature that instead
> > looks like
> > >>>>>> this:
> > >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
> > Predicate<?
> > >>>>>>> super K,? super V>>);
> > >>>>>>>
> > >>>>>>> Branches are given names in a map, and as a result, the API
> > returns a
> > >>>>>>> mapping of names to streams. The ordering of the
> conditionals is
> > >>>>>> maintained
> > >>>>>>> because it’s a sorted map. Insert order determines the order
> of
> > >>>>>> evaluation.
> > >>>>>>> This solves problem 1 because there are no more varargs. It
> > solves
> > >>>>>> problem
> > >>>>>>> 2 because you no longer lean on ordering to access the
> > branch you’re
> > >>>>>>> interested in. It solves problem 3 because you can introduce
> > another
> > >>>>>>> conditional by simply attaching another name to the
> > structure, rather
> > >>>>>> than
> > >>>>>>> messing with the existing indices.
> > >>>>>>>
> > >>>>>>> One of the drawbacks is that creating the map inline is
> > historically
> > >>>>>>> awkward in Java. I know it’s an anti-pattern to use
> > voluminously, but
> > >>>>>>> double brace initialization would clean up the aesthetics.
> > >>>>>>>
> > >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
> > <john@confluent.io <mailto:john@confluent.io>>
> > >>>>> wrote:
> > >>>>>>>> Hi Ivan,
> > >>>>>>>>
> > >>>>>>>> Thanks for the update.
> > >>>>>>>>
> > >>>>>>>> FWIW, I agree with Matthias that the current "start
> branching"
> > >>>>> operator
> > >>>>>>> is
> > >>>>>>>> confusing when named the same way as the actual branches.
> > "Split"
> > >>>>> seems
> > >>>>>>>> like a good name. Alternatively, we can do without a "start
> > >>>>> branching"
> > >>>>>>>> operator at all, and just do:
> > >>>>>>>>
> > >>>>>>>> stream
> > >>>>>>>> .branch(Predicate)
> > >>>>>>>> .branch(Predicate)
> > >>>>>>>> .defaultBranch();
> > >>>>>>>>
> > >>>>>>>> Tentatively, I think that this branching operation should be
> > >>>>> terminal.
> > >>>>>>> That
> > >>>>>>>> way, we don't create ambiguity about how to use it. That
> > is, `branch`
> > >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is
> > `void`, to
> > >>>>>>>> enforce that it comes last, and that there is only one
> > definition of
> > >>>>>> the
> > >>>>>>>> default branch. Potentially, we should log a warning if
> > there's no
> > >>>>>>> default,
> > >>>>>>>> and additionally log a warning (or throw an exception) if a
> > record
> > >>>>>> falls
> > >>>>>>>> though with no default.
> > >>>>>>>>
> > >>>>>>>> Thoughts?
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> -John
> > >>>>>>>>
> > >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
> > >>>>> matthias@confluent.io <mailto:matthias@confluent.io>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Thanks for updating the KIP and your answers.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> this is to make the name similar to String#split
> > >>>>>>>>>>> that also returns an array, right?
> > >>>>>>>>> The intend was to avoid name duplication. The return type
> > should
> > >>>>>> _not_
> > >>>>>>>>> be an array.
> > >>>>>>>>>
> > >>>>>>>>> The current proposal is
> > >>>>>>>>>
> > >>>>>>>>> stream.branch()
> > >>>>>>>>> .branch(Predicate)
> > >>>>>>>>> .branch(Predicate)
> > >>>>>>>>> .defaultBranch();
> > >>>>>>>>>
> > >>>>>>>>> IMHO, this reads a little odd, because the first
> > `branch()` does
> > >>>>> not
> > >>>>>>>>> take any parameters and has different semantics than the
> later
> > >>>>>>>>> `branch()` calls. Note, that from the code snippet above,
> it's
> > >>>>> hidden
> > >>>>>>>>> that the first call is `KStream#branch()` while the others
> are
> > >>>>>>>>> `KBranchedStream#branch()` what makes reading the code
> harder.
> > >>>>>>>>>
> > >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`,
> > I though
> > >>>>>> it
> > >>>>>>>>> might be better to also rename `KStream#branch()` to avoid
> the
> > >>>>> naming
> > >>>>>>>>> overlap that seems to be confusing. The following reads
> much
> > >>>>> cleaner
> > >>>>>> to
> > >>>>>>>> me:
> > >>>>>>>>> stream.split()
> > >>>>>>>>> .branch(Predicate)
> > >>>>>>>>> .branch(Predicate)
> > >>>>>>>>> .defaultBranch();
> > >>>>>>>>>
> > >>>>>>>>> Maybe there is a better alternative to `split()` though to
> > avoid
> > >>>>> the
> > >>>>>>>>> naming overlap.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately
> we
> > >>>>> cannot
> > >>>>>>> have
> > >>>>>>>>> a method with such name :-)
> > >>>>>>>>>
> > >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up
> > with a
> > >>>>> short
> > >>>>>>>> name?
> > >>>>>>>>>
> > >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP
> > with all
> > >>>>> it's
> > >>>>>>>>> methods? It will be part of public API and should be
> > contained in
> > >>>>> the
> > >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
> > >>>>>>>>> `defaultBranch()` is.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> You did not comment on the idea to add a
> > `KBranchedStream#get(int
> > >>>>>>> index)
> > >>>>>>>>> -> KStream` method to get the individually
> > branched-KStreams. Would
> > >>>>>> be
> > >>>>>>>>> nice to get your feedback about it. It seems you suggest
> > that users
> > >>>>>>>>> would need to write custom utility code otherwise, to
> > access them.
> > >>>>> We
> > >>>>>>>>> should discuss the pros and cons of both approaches. It
> feels
> > >>>>>>>>> "incomplete" to me atm, if the API has no built-in support
> > to get
> > >>>>> the
> > >>>>>>>>> branched-KStreams directly.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> -Matthias
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > >>>>>>>>>> Hi all!
> > >>>>>>>>>>
> > >>>>>>>>>> I have updated the KIP-418 according to the new vision.
> > >>>>>>>>>>
> > >>>>>>>>>> Matthias, thanks for your comment!
> > >>>>>>>>>>
> > >>>>>>>>>>> Renaming KStream#branch() -> #split()
> > >>>>>>>>>> I can see your point: this is to make the name similar to
> > >>>>>>> String#split
> > >>>>>>>>>> that also returns an array, right? But is it worth the
> > loss of
> > >>>>>>>> backwards
> > >>>>>>>>>> compatibility? We can have overloaded branch() as well
> > without
> > >>>>>>>> affecting
> > >>>>>>>>>> the existing code. Maybe the old array-based `branch`
> method
> > >>>>> should
> > >>>>>>> be
> > >>>>>>>>>> deprecated, but this is a subject for discussion.
> > >>>>>>>>>>
> > >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
> > >>>>> BranchingKStream#branch(),
> > >>>>>>>>>> KBranchedStream#defaultBranch() ->
> BranchingKStream#default()
> > >>>>>>>>>>
> > >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default'
> is,
> > >>>>>>> however, a
> > >>>>>>>>>> reserved word, so unfortunately we cannot have a method
> > with such
> > >>>>>>> name
> > >>>>>>>>> :-)
> > >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument,
> but I
> > >>>>> think
> > >>>>>>> that
> > >>>>>>>>>> is not required?
> > >>>>>>>>>>
> > >>>>>>>>>> Absolutely! I think that was just copy-paste error or
> > something.
> > >>>>>>>>>>
> > >>>>>>>>>> Dear colleagues,
> > >>>>>>>>>>
> > >>>>>>>>>> please revise the new version of the KIP and Paul's PR
> > >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
> > >>>>>>>>>>
> > >>>>>>>>>> Any new suggestions/objections?
> > >>>>>>>>>>
> > >>>>>>>>>> Regards,
> > >>>>>>>>>>
> > >>>>>>>>>> Ivan
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
> > >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems
> that
> > >>>>>>> everybody
> > >>>>>>>>>>> agrees that the current branch() method using arrays is
> not
> > >>>>>> optimal.
> > >>>>>>>>>>> I had a quick look into the PR and I like the overall
> > proposal.
> > >>>>>>> There
> > >>>>>>>>>>> are some minor things we need to consider. I would
> > recommend the
> > >>>>>>>>>>> following renaming:
> > >>>>>>>>>>>
> > >>>>>>>>>>> KStream#branch() -> #split()
> > >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
> > >>>>>>>>>>> KBranchedStream#defaultBranch() ->
> > BranchingKStream#default()
> > >>>>>>>>>>>
> > >>>>>>>>>>> It's just a suggestion to get slightly shorter method
> names.
> > >>>>>>>>>>>
> > >>>>>>>>>>> In the current PR, defaultBranch() does take an
> > `Predicate` as
> > >>>>>>>> argument,
> > >>>>>>>>>>> but I think that is not required?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Also, we should consider KIP-307, that was recently
> > accepted and
> > >>>>>> is
> > >>>>>>>>>>> currently implemented:
> > >>>>>>>>>>>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`
> > parameter.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> For the issue that the created `KStream` object are in
> > different
> > >>>>>>>> scopes:
> > >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
> > index)` method
> > >>>>>>> that
> > >>>>>>>>>>> returns the corresponding "branched" result `KStream`
> > object?
> > >>>>>> Maybe,
> > >>>>>>>> the
> > >>>>>>>>>>> second argument of `addBranch()` should not be a
> > >>>>>> `Consumer<KStream>`
> > >>>>>>>> but
> > >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return
> > whatever
> > >>>>>> the
> > >>>>>>>>>>> `Function` returns?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
> > current
> > >>>>>>>>>>> proposal. That makes it easier to review.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Matthias
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
> > >>>>>>>>>>>> Ivan,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it
> > makes sense
> > >>>>>> for
> > >>>>>>>> you
> > >>>>>>>>> to
> > >>>>>>>>>>>> revise the KIP and continue the discussion. Obviously
> > we'll
> > >>>>> need
> > >>>>>>>> some
> > >>>>>>>>>>>> buy-in from committers that have actual binding votes on
> > >>>>> whether
> > >>>>>>> the
> > >>>>>>>>> KIP
> > >>>>>>>>>>>> could be adopted. It would be great to hear if they
> > think this
> > >>>>>> is
> > >>>>>>> a
> > >>>>>>>>> good
> > >>>>>>>>>>>> idea overall. I'm not sure if that happens just by
> > starting a
> > >>>>>>> vote,
> > >>>>>>>>> or if
> > >>>>>>>>>>>> there is generally some indication of interest
> beforehand.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> That being said, I'll continue the discussion a bit:
> > assuming
> > >>>>> we
> > >>>>>> do
> > >>>>>>>>> move
> > >>>>>>>>>>>> forward the solution of "stream.branch() returns
> > >>>>>> KBranchedStream",
> > >>>>>>> do
> > >>>>>>>>> we
> > >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I
> would
> > >>>>> favor
> > >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs
> that
> > >>>>>>> accomplish
> > >>>>>>>>> the
> > >>>>>>>>>>>> same thing is confusing, especially when they're fairly
> > similar
> > >>>>>>>>> anyway. We
> > >>>>>>>>>>>> just need to be sure we're not making something
> > >>>>>>> impossible/difficult
> > >>>>>>>>> that
> > >>>>>>>>>>>> is currently possible/easy.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Regarding my PR - I think the general structure would
> work,
> > >>>>> it's
> > >>>>>>>> just a
> > >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
> > >>>>>>> particular,
> > >>>>>>>>>>>> passing in the "predicates" and "children" lists which
> get
> > >>>>>> modified
> > >>>>>>>> in
> > >>>>>>>>>>>> KBranchedStream but read from all the way
> > KStreamLazyBranch is
> > >>>>> a
> > >>>>>>> bit
> > >>>>>>>>>>>> complicated to follow.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>> Paul
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
> > >>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>> Hi Paul!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I read your code carefully and now I am fully
> > convinced: your
> > >>>>>>>> proposal
> > >>>>>>>>>>>>> looks better and should work. We just have to document
> the
> > >>>>>> crucial
> > >>>>>>>>> fact
> > >>>>>>>>>>>>> that KStream consumers are invoked as they're added.
> > And then
> > >>>>>> it's
> > >>>>>>>> all
> > >>>>>>>>>>>>> going to be very nice.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and
> > resume the
> > >>>>>>>>>>>>> discussion here, right?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
> > >>>>> starting
> > >>>>>>>> point
> > >>>>>>>>> if
> > >>>>>>>>>>>>> we go in this direction'? To me it looks like a good
> > starting
> > >>>>>>> point.
> > >>>>>>>>> But
> > >>>>>>>>>>>>> as a novice in this project I might miss some important
> > >>>>> details.
> > >>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
> > >>>>>>>>>>>>>> Ivan,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
> > >>>>> stream.branch()
> > >>>>>>>>> solution
> > >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
> > >>>>> invoked
> > >>>>>> as
> > >>>>>>>>> they’re
> > >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user
> > still
> > >>>>>> ought
> > >>>>>>> to
> > >>>>>>>>> be
> > >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and
> > depend on
> > >>>>> the
> > >>>>>>>>> branched
> > >>>>>>>>>>>>> streams having been set.
> > >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to
> > access
> > >>>>> the
> > >>>>>>>>> branched
> > >>>>>>>>>>>>> streams in the same scope as the original stream (that
> > is, not
> > >>>>>>>> inside
> > >>>>>>>>> the
> > >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
> > >>>>> solutions.
> > >>>>>> It
> > >>>>>>>>> can be
> > >>>>>>>>>>>>> worked around though.
> > >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m
> > excited
> > >>>>> to
> > >>>>>>>> hear
> > >>>>>>>>>>>>> your thoughts!]
> > >>>>>>>>>>>>>> Paul
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
> > >>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>>>> Hi Paul!
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
> > >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at
> first
> > >>>>> glance,
> > >>>>>>> but
> > >>>>>>>>> ---
> > >>>>>>>>>>>>>>>> the newly branched streams are not available in the
> > same
> > >>>>>> scope
> > >>>>>>> as
> > >>>>>>>>> each
> > >>>>>>>>>>>>> other. That is, if we wanted to merge them back
> together
> > >>>>> again
> > >>>>>> I
> > >>>>>>>>> don't see
> > >>>>>>>>>>>>> a way to do that.
> > >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was
> > just
> > >>>>>> going
> > >>>>>>> to
> > >>>>>>>>>>>>> write in details about this issue.
> > >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say
> > we need
> > >>>>> to
> > >>>>>>>>> identify
> > >>>>>>>>>>>>> customers who have bought coffee and made a purchase
> > in the
> > >>>>>>>>> electronics
> > >>>>>>>>>>>>> store to give them coupons.
> > >>>>>>>>>>>>>>> This is the code I usually write under these
> > circumstances
> > >>>>>> using
> > >>>>>>>> my
> > >>>>>>>>>>>>> 'brancher' class:
> > >>>>>>>>>>>>>>> @Setter
> > >>>>>>>>>>>>>>> class CouponIssuer{
> > >>>>>>>>>>>>>>> private KStream<....> coffePurchases;
> > >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases;
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> KStream<...> coupons(){
> > >>>>>>>>>>>>>>> return
> > >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
> > >>>>>>>>>>>>>>> /*In the real world the code here can be
> > complex, so
> > >>>>>>>>> creation of
> > >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in
> > order to
> > >>>>>>>> separate
> > >>>>>>>>>>>>> classes' responsibilities.*/
> > >>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
> > >>>>>>>>>>>>>>> .branch(predicate1,
> couponIssuer::setCoffePurchases)
> > >>>>>>>>>>>>>>> .branch(predicate2,
> > >>>>>> couponIssuer::setElectronicsPurchases)
> > >>>>>>>>>>>>>>> .onTopOf(transactionStream);
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
> > everything
> > >>>>>>>> later,
> > >>>>>>>>>>>>> without the terminal operation!!!*/
> > >>>>>>>>>>>>>>> couponIssuer.coupons()...
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Does this make sense? In order to properly
> > initialize the
> > >>>>>>>>> CouponIssuer
> > >>>>>>>>>>>>> we need the terminal operation to be called before
> > >>>>>>>>> streamsBuilder.build()
> > >>>>>>>>>>>>> is called.
> > >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
> > essentially
> > >>>>>> the
> > >>>>>>>>> next
> > >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts
> > based on
> > >>>>> my
> > >>>>>>>>> experience,
> > >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
> > >>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> > >>>>>>>>>>>>>>>> Ivan,
> > >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a
> > fluent
> > >>>>> API
> > >>>>>>>> based
> > >>>>>>>>>>>>> off of
> > >>>>>>>>>>>>>>>> KStream here
> > (https://github.com/apache/kafka/pull/6512),
> > >>>>>> and
> > >>>>>>> I
> > >>>>>>>>> think
> > >>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>> succeeded at removing both cons.
> > >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about
> > >>>>>>> compatibility
> > >>>>>>>>>>>>> issues,
> > >>>>>>>>>>>>>>>> there aren't any direct ones. I was unaware
> > that Java
> > >>>>> is
> > >>>>>>>> smart
> > >>>>>>>>>>>>> enough to
> > >>>>>>>>>>>>>>>> distinguish between a branch(varargs...)
> > returning one
> > >>>>>>> thing
> > >>>>>>>>> and
> > >>>>>>>>>>>>> branch()
> > >>>>>>>>>>>>>>>> with no arguments returning another thing.
> > >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't actually
> > need
> > >>>>> it.
> > >>>>>>> We
> > >>>>>>>>> can
> > >>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>> build up the branches in the KBranchedStream who
> > shares
> > >>>>>> its
> > >>>>>>>>> state
> > >>>>>>>>>>>>> with the
> > >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the
> > branching.
> > >>>>>>> It's
> > >>>>>>>>> not
> > >>>>>>>>>>>>> terribly
> > >>>>>>>>>>>>>>>> pretty in its current form, but I think it
> > demonstrates
> > >>>>>> its
> > >>>>>>>>>>>>> feasibility.
> > >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should
> be
> > >>>>> final
> > >>>>>> or
> > >>>>>>>>> even a
> > >>>>>>>>>>>>>>>> starting point if we go in this direction, I just
> > wanted to
> > >>>>>> see
> > >>>>>>>> how
> > >>>>>>>>>>>>>>>> challenging it would be to get the API working.
> > >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing
> > solution
> > >>>>>>> could
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
> > >>>>> suggested
> > >>>>>>>> was a
> > >>>>>>>>>>>>>>>> possibility. The reason is that the newly branched
> > streams
> > >>>>>> are
> > >>>>>>>> not
> > >>>>>>>>>>>>>>>> available in the same scope as each other. That
> > is, if we
> > >>>>>>> wanted
> > >>>>>>>>> to
> > >>>>>>>>>>>>> merge
> > >>>>>>>>>>>>>>>> them back together again I don't see a way to do
> > that. The
> > >>>>>> KIP
> > >>>>>>>>>>>>> proposal
> > >>>>>>>>>>>>>>>> has the same issue, though - all this means is that
> for
> > >>>>>> either
> > >>>>>>>>>>>>> solution,
> > >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the
> > table.
> > >>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>> Paul
> > >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> > >>>>>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to
> this
> > >>>>>> point.
> > >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that
> > branch API
> > >>>>>>> needs
> > >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> There are two potential ways to do it:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
> > >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..)
> > >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..)
> > >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional
> > >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf
> > returns
> > >>>>>> its
> > >>>>>>>>> argument
> > >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code
> won't
> > >>>>> make
> > >>>>>>>> sense
> > >>>>>>>>>>>>> until
> > >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher
> > instance
> > >>>>>>>>> contrasts the
> > >>>>>>>>>>>>>>>>> fluency of other KStream methods.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> stream
> > >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...)
> > >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...)
> > >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both
> > >>>>>>>>> defaultBranch(..)
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> noDefault() return void
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface
> is
> > >>>>>> defined.
> > >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
> > >>>>>>>> (defaultBranch(ks->)
> > >>>>>>>>> and
> > >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to
> > miss the
> > >>>>>> fact
> > >>>>>>>>> that one
> > >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
> > methods
> > >>>>>> are
> > >>>>>>>> not
> > >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do
> better?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> > >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> > >>>>>>>>>>>>>>>>>>> Paul,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
> > >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
> > implemented the
> > >>>>>>> easy
> > >>>>>>>>> way.
> > >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
> assumes
> > >>>>> nothing
> > >>>>>>>> will
> > >>>>>>>>>>>>> reach
> > >>>>>>>>>>>>>>>>>>>> the default branch,
> > >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only
> option
> > >>>>>> besides
> > >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we
> > want to
> > >>>>>> just
> > >>>>>>>>> silently
> > >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
> > predicate. 2)
> > >>>>>>> Throwing
> > >>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing
> > looks
> > >>>>>> like a
> > >>>>>>>> bad
> > >>>>>>>>>>>>> idea.
> > >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to
> > emit a
> > >>>>>>>> special
> > >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly
> where
> > >>>>>>> `default`
> > >>>>>>>>> can
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> used.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> > >>>>> InternalTopologyBuilder
> > >>>>>>> to
> > >>>>>>>>> track
> > >>>>>>>>>>>>>>>>>>>> dangling
> > >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
> > a clear
> > >>>>>>> error
> > >>>>>>>>>>>>> before it
> > >>>>>>>>>>>>>>>>>>> becomes an issue.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
> > >>>>> compiled
> > >>>>>>> and
> > >>>>>>>>> run?
> > >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't
> > compile if
> > >>>>> used
> > >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
> > method chain
> > >>>>>>>> starting
> > >>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference
> > between
> > >>>>>>>> runtime
> > >>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
> > >>>>> instantly
> > >>>>>> on
> > >>>>>>>>> unit
> > >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a
> > compilation
> > >>>>>>>> failure.
> > >>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> > >>>>>>>>>>>>>>>>>>>> Ivan,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
> > required.
> > >>>>>>> But
> > >>>>>>>> is
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a
> > >>>>>> defaultBranch
> > >>>>>>>>> they
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>> call
> > >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?)
> > just as
> > >>>>>>>>> easily. In
> > >>>>>>>>>>>>>>>>>>>> fact I
> > >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API
> - a
> > >>>>> user
> > >>>>>>>> could
> > >>>>>>>>>>>>> specify
> > >>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach
> the
> > >>>>>> default
> > >>>>>>>>> branch,
> > >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
> That
> > >>>>> seems
> > >>>>>>> like
> > >>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,
> > which allows
> > >>>>>> for
> > >>>>>>>> the
> > >>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> subtle
> > >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
> dropped.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has
> > to be
> > >>>>>> well
> > >>>>>>>>>>>>>>>>>>>> documented, but
> > >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> > >>>>> InternalTopologyBuilder
> > >>>>>>> to
> > >>>>>>>>> track
> > >>>>>>>>>>>>>>>>>>>> dangling
> > >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
> > a clear
> > >>>>>>> error
> > >>>>>>>>>>>>> before it
> > >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there is
> a
> > >>>>> "build
> > >>>>>>>> step"
> > >>>>>>>>>>>>> where
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
> > >>>>>> StreamsBuilder.build()
> > >>>>>>> is
> > >>>>>>>>>>>>> called.
> > >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I
> agree
> > >>>>> that
> > >>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>> critical to
> > >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
> > stream.
> > >>>>>>> With
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>> fluent
> > >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all
> other
> > >>>>>>> operations
> > >>>>>>>>> do -
> > >>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
> multiple
> > >>>>> times,
> > >>>>>>> you
> > >>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>> need the
> > >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
> > operations
> > >>>>>> on
> > >>>>>>> it
> > >>>>>>>>> as
> > >>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> desire.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thoughts?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>> Paul
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> > >>>>>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hello Paul,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not
> > always need
> > >>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
> operation we
> > >>>>> don't
> > >>>>>>>> know
> > >>>>>>>>>>>>> when to
> > >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument,
> > so we
> > >>>>> can
> > >>>>>> do
> > >>>>>>>>>>>>> something
> > >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
> special
> > >>>>> object
> > >>>>>>>>>>>>> construction
> > >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods.
> But
> > >>>>> here
> > >>>>>> we
> > >>>>>>>>> have a
> > >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the
> > flow,
> > >>>>> so
> > >>>>>> I
> > >>>>>>>>> think
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> still idiomatic.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> > >>>>>>>>>>>>>>>>>>>>>> Ivan,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this
> > API, but I
> > >>>>>> find
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> onTopOff()
> > >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
> > contrasts the
> > >>>>>>> fluency
> > >>>>>>>>> of
> > >>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to
> > just call
> > >>>>> a
> > >>>>>>>>> method on
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> stream
> > >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch
> > cases
> > >>>>> are
> > >>>>>>>>> defined
> > >>>>>>>>>>>>>>>>>>>>>> fluently.
> > >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase)
> > is very
> > >>>>>> nice
> > >>>>>>>>> and the
> > >>>>>>>>>>>>>>>>>>>>>> right
> > >>>>>>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around
> > how we
> > >>>>>>> specify
> > >>>>>>>>> the
> > >>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>>>>>>> stream.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Like:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> stream.branch()
> > >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1,
> this::handle1)
> > >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2,
> this::handle2)
> > >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault);
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
> > >>>>>>>> KStreamBrancher
> > >>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>> something,
> > >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
> > terminated by
> > >>>>>>>>>>>>> defaultBranch()
> > >>>>>>>>>>>>>>>>>>>>>> (which
> > >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously
> > incompatible with
> > >>>>> the
> > >>>>>>>>> current
> > >>>>>>>>>>>>>>>>>>>>>> API, so
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a
> > different
> > >>>>>> name,
> > >>>>>>>> but
> > >>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
> > >>>>>> something
> > >>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>> branched()
> > >>>>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your
> > KIP? It
> > >>>>>> seems
> > >>>>>>>>> like it
> > >>>>>>>>>>>>>>>>>>>>>> does to
> > >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching
> > while also
> > >>>>>>>> allowing
> > >>>>>>>>> you
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
> > KBranchedStreams
> > >>>>>> if
> > >>>>>>>>> desired.
> > >>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>> Paul
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> > >>>>>>>>>>>>>>>>>>>>> <iponomarev@mail.ru.invalid>
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String>
> > ks){
> > >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...)
> > >>>>>>>>>>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
> > String> ks){
> > >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()...
> > >>>>>>>>>>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> ......
> > >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> > >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1,
> > this::handleFirstCase)
> > >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2,
> > this::handleSecondCase)
> > >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....)
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
> KafkaStreamsBrancher
> > >>>>> takes a
> > >>>>>>>>> Consumer
> > >>>>>>>>>>>>> as a
> > >>>>>>>>>>>>>>>>>>>>>>> second
> > >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the
> > example in
> > >>>>>> the
> > >>>>>>>> KIP
> > >>>>>>>>>>>>> shows
> > >>>>>>>>>>>>>>>>>>>>>>>> each
> > >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
> > >>>>>>>>> (KafkaStreams#to()
> > >>>>>>>>>>>>>>>>>>>>>>>> in this
> > >>>>>>>>>>>>>>>>>>>>>>>> case).
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would
> we
> > >>>>> handle
> > >>>>>>> the
> > >>>>>>>>> case
> > >>>>>>>>>>>>>>>>>>>>>>>> where the
> > >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to
> continue
> > >>>>>>>> processing
> > >>>>>>>>> and
> > >>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the
> branched
> > >>>>>> stream
> > >>>>>>>>>>>>> immediately?
> > >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if
> > we had
> > >>>>>>>> something
> > >>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>> this:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
> > >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
> > >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
> > >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> > >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
> > >>>>>>>>>>>>>>>>>>>>>>>> Bill
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck
> <
> > >>>>>>>>> bbejeck@gmail.com <mailto:bbejeck@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> All,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for
> KIP-
> > >>>>> 418.
> > >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
> KIP-418.
> > >>>>> Please
> > >>>>>>>> take
> > >>>>>>>>> a
> > >>>>>>>>>>>>> look
> > >>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any
> > feedback :)
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> > >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> > >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
> > >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
> > >>>>> https://github.com/apache/kafka/pull/6164
> > >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>
> > >
> >
>
|