kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Date Thu, 11 Jul 2019 23:00:09 GMT
Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:
> Thanks for the input John!
> 
>> under your suggestion, it seems that the name is required
> 
> If you want to get the `KStream` as part of the `Map` back using a
> `Function`, yes. If you follow the "embedded chaining" pattern using a
> `Consumer`, no.
> 
> Allowing for a default name via `split()` can of course be done.
> Similarly, using `Named` instead of `String` is possible.
> 
> I wanted to sketch out a high level proposal to merge both patterns
> only. Your suggestions to align the new API with the existing API make
> totally sense.
> 
> 
> 
> One follow up question: Would `Named` be optional or required in
> `split()` and `branch()`? It's unclear from your example.
> 
> If both are mandatory, what do we gain by it? The returned `Map` only
> contains the corresponding branches, so why should we prefix all of
> them? If only `Named` is mandatory in `branch()`, but optional in
> `split()`, the same question raises?
> 
> Requiring `Named` in `split()` seems only to make sense, if `Named` is
> optional in `branch()` and we generate `-X` suffix using a counter for
> different branch name. However, this might lead to the problem of
> changing names if branches are added/removed. Also, how would the names
> be generated if `Consumer` is mixed in (ie, not all branches are
> returned in the `Map`).
> 
> If `Named` is optional for both, it could happen that a user misses to
> specify a name for a branch what would lead to runtime issues.
> 
> 
> Hence, I am actually in favor to not allow a default name but keep
> `split()` without parameter and make `Named` in `branch()` required if a
> `Function` is used. This makes it explicit to the user that specifying a
> name is required if a `Function` is used.
> 
> 
> 
> About
> 
>> KBranchedStream#branch(BranchConfig)
> 
> I don't think that the branching predicate is a configuration and hence
> would not include it in a configuration object.
> 
>>     withChain(...);
> 
> Similar, `withChain()` (that would only take a `Consumer`?) does not
> seem to be a configuration. We can also not prevent a user to call
> `withName()` in combination of `withChain()` what does not make sense
> IMHO. We could of course throw an RTE but not have a compile time check
> seems less appealing. Also, it could happen that neither `withChain()`
> not `withName()` is called and the branch is missing in the returned
> `Map` what lead to runtime issues, too.
> 
> Hence, I don't think that we should add `BranchConfig`. A config object
> is helpful if each configuration can be set independently of all others,
> but this seems not to be the case here. If we add new configuration
> later, we can also just move forward by deprecating the methods that
> accept `Named` and add new methods that accepted `BranchConfig` (that
> would of course implement `Named`).
> 
> 
> Thoughts?
> 
> 
> @Ivan, what do you think about the general idea to blend the two main
> approaches of returning a `Map` plus an "embedded chaining"?
> 
> 
> 
> -Matthias
> 
> 
> 
> On 6/4/19 10:33 AM, John Roesler wrote:
>> Thanks for the idea, Matthias, it does seem like this would satisfy
>> everyone. Returning the map from the terminal operations also solves
>> the problem of merging/joining the branched streams, if we want to add
>> support for the compliment later on.
>>
>> Under your suggestion, it seems that the name is required. Otherwise,
>> we wouldn't have keys for the map to return. I this this is actually
>> not too bad, since experience has taught us that, although names for
>> operations are not required to define stream processing logic, it does
>> significantly improve the operational experience when you can map the
>> topology, logs, metrics, etc. back to the source code. Since you
>> wouldn't (have to) reference the name to chain extra processing onto
>> the branch (thanks to the second argument), you can avoid the
>> "unchecked name" problem that Ivan pointed out.
>>
>> In the current implementation of Branch, you can name the branch
>> operator itself, and then all the branches get index-suffixed names
>> built from the branch operator name. I guess under this proposal, we
>> could naturally append the branch name to the branching operator name,
>> like this:
>>
>>    stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>               .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>               .defaultBranch(...) // creates node "mysplit-default"
>>
>> It does make me wonder about the DSL syntax itself, though.
>>
>> We don't have a defined grammar, so there's plenty of room to debate
>> the "best" syntax in the context of each operation, but in general,
>> the KStream DSL operators follow this pattern:
>>
>>     operator(function, config_object?) OR operator(config_object)
>>
>> where config_object is often just Named in the "function" variant.
>> Even when the config_object isn't a Named, but some other config
>> class, that config class _always_ implements NamedOperation.
>>
>> Here, we're introducing a totally different pattern:
>>
>>   operator(function, function, string)
>>
>> where the string is the name.
>> My first question is whether the name should instead be specified with
>> the NamedOperation interface.
>>
>> My second question is whether we should just roll all these arguments
>> up into a config object like:
>>
>>    KBranchedStream#branch(BranchConfig)
>>
>>    interface BranchConfig extends NamedOperation {
>>     withPredicate(...);
>>     withChain(...);
>>     withName(...);
>>   }
>>
>> Although I guess we'd like to call BranchConfig something more like
>> "Branched", even if I don't particularly like that pattern.
>>
>> This makes the source code a little noisier, but it also makes us more
>> future-proof, as we can deal with a wide range of alternatives purely
>> in the config interface, and never have to deal with adding overloads
>> to the KBranchedStream if/when we decide we want the name to be
>> optional, or the KStream->KStream to be optional.
>>
>> WDYT?
>>
>> Thanks,
>> -John
>>
>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
>> <michael.drogalis@confluent.io> wrote:
>>>
>>> Matthias: I think that's pretty reasonable from my point of view. Good
>>> suggestion.
>>>
>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>
>>>> Interesting discussion.
>>>>
>>>> I am wondering, if we cannot unify the advantage of both approaches:
>>>>
>>>>
>>>>
>>>> KStream#split() -> KBranchedStream
>>>>
>>>> // branch is not easily accessible in current scope
>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>>   -> KBranchedStream
>>>>
>>>> // assign a name to the branch and
>>>> // return the sub-stream to the current scope later
>>>> //
>>>> // can be simple as `#branch(p, s->s, "name")`
>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>>>>   -> KBranchedStream
>>>>
>>>> // default branch is not easily accessible
>>>> // return map of all named sub-stream into current scope
>>>> KBranchedStream#default(Cosumer<KStream>)
>>>>   -> Map<String,KStream>
>>>>
>>>> // assign custom name to default-branch
>>>> // return map of all named sub-stream into current scope
>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>>   -> Map<String,KStream>
>>>>
>>>> // assign a default name for default
>>>> // return map of all named sub-stream into current scope
>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>>   -> Map<String,KStream>
>>>>
>>>> // return map of all names sub-stream into current scope
>>>> KBranchedStream#noDefaultBranch()
>>>>   -> Map<String,KStream>
>>>>
>>>>
>>>>
>>>> Hence, for each sub-stream, the user can pick to add a name and return
>>>> the branch "result" to the calling scope or not. The implementation can
>>>> also check at runtime that all returned names are unique. The returned
>>>> Map can be empty and it's also optional to use the Map.
>>>>
>>>> To me, it seems like a good way to get best of both worlds.
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>>
>>>> On 5/6/19 5:15 PM, John Roesler wrote:
>>>>> Ivan,
>>>>>
>>>>> That's a very good point about the "start" operator in the dynamic case.
>>>>> I had no problem with "split()"; I was just questioning the necessity.
>>>>> Since you've provided a proof of necessity, I'm in favor of the
>>>>> "split()" start operator. Thanks!
>>>>>
>>>>> Separately, I'm interested to see where the present discussion leads.
>>>>> I've written enough Javascript code in my life to be suspicious of
>>>>> nested closures. You have a good point about using method references (or
>>>>> indeed function literals also work). It should be validating that this
>>>>> was also the JS community's first approach to flattening the logic when
>>>>> their nested closure situation got out of hand. Unfortunately, it's
>>>>> replacing nesting with redirection, both of which disrupt code
>>>>> readability (but in different ways for different reasons). In other
>>>>> words, I agree that function references is *the* first-order solution if
>>>>> the nested code does indeed become a problem.
>>>>>
>>>>> However, the history of JS also tells us that function references aren't
>>>>> the end of the story either, and you can see that by observing that
>>>>> there have been two follow-on eras, as they continue trying to cope with
>>>>> the consequences of living in such a callback-heavy language. First, you
>>>>> have Futures/Promises, which essentially let you convert nested code to
>>>>> method-chained code (Observables/FP is a popular variation on this).
>>>>> Most lately, you have async/await, which is an effort to apply language
>>>>> (not just API) syntax to the problem, and offer the "flattest" possible
>>>>> programming style to solve the problem (because you get back to just one
>>>>> code block per functional unit).
>>>>>
>>>>> Stream-processing is a different domain, and Java+KStreams is nowhere
>>>>> near as callback heavy as JS, so I don't think we have to take the JS
>>>>> story for granted, but then again, I think we can derive some valuable
>>>>> lessons by looking sideways to adjacent domains. I'm just bringing this
>>>>> up to inspire further/deeper discussion. At the same time, just like JS,
>>>>> we can afford to take an iterative approach to the problem.
>>>>>
>>>>> Separately again, I'm interested in the post-branch merge (and I'd also
>>>>> add join) problem that Paul brought up. We can clearly punt on it, by
>>>>> terminating the nested branches with sink operators. But is there a DSL
>>>>> way to do it?
>>>>>
>>>>> Thanks again for your driving this,
>>>>> -John
>>>>>
>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwhalen@gmail.com
>>>>> <mailto:pgwhalen@gmail.com>> wrote:
>>>>>
>>>>>     Ivan, I’ll definitely forfeit my point on the clumsiness of the
>>>>>     branch(predicate, consumer) solution, I don’t see any real drawbacks
>>>>>     for the dynamic case.
>>>>>
>>>>>     IMO the one trade off to consider at this point is the scope
>>>>>     question. I don’t know if I totally agree that “we rarely need them
>>>>>     in the same scope” since merging the branches back together later
>>>>>     seems like a perfectly plausible use case that can be a lot nicer
>>>>>     when the branched streams are in the same scope. That being said,
>>>>>     for the reasons Ivan listed, I think it is overall the better
>>>>>     solution - working around the scope thing is easy enough if you need
>>>>>     to.
>>>>>
>>>>>     > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>>>>>     <iponomarev@mail.ru.invalid> wrote:
>>>>>     >
>>>>>     > Hello everyone, thank you all for joining the discussion!
>>>>>     >
>>>>>     > Well, I don't think the idea of named branches, be it a
>>>>>     LinkedHashMap (no other Map will do, because order of definition
>>>>>     matters) or `branch` method  taking name and Consumer has more
>>>>>     advantages than drawbacks.
>>>>>     >
>>>>>     > In my opinion, the only real positive outcome from Michael's
>>>>>     proposal is that all the returned branches are in the same scope.
>>>>>     But 1) we rarely need them in the same scope 2) there is a
>>>>>     workaround for the scope problem, described in the KIP.
>>>>>     >
>>>>>     > 'Inlining the complex logic' is not a problem, because we can use
>>>>>     method references instead of lambdas. In real world scenarios you
>>>>>     tend to split the complex logic to methods anyway, so the code is
>>>>>     going to be clean.
>>>>>     >
>>>>>     > The drawbacks are strong. The cohesion between predicates and
>>>>>     handlers is lost. We have to define predicates in one place, and
>>>>>     handlers in another. This opens the door for bugs:
>>>>>     >
>>>>>     > - what if we forget to define a handler for a name? or a name for
>>>>>     a handler?
>>>>>     > - what if we misspell a name?
>>>>>     > - what if we copy-paste and duplicate a name?
>>>>>     >
>>>>>     > What Michael propose would have been totally OK if we had been
>>>>>     writing the API in Lua, Ruby or Python. In those languages the
>>>>>     "dynamic naming" approach would have looked most concise and
>>>>>     beautiful. But in Java we expect all the problems related to
>>>>>     identifiers to be eliminated in compile time.
>>>>>     >
>>>>>     > Do we have to invent duck-typing for the Java API?
>>>>>     >
>>>>>     > And if we do, what advantage are we supposed to get besides having
>>>>>     all the branches in the same scope? Michael, maybe I'm missing your
>>>>>     point?
>>>>>     >
>>>>>     > ---
>>>>>     >
>>>>>     > Earlier in this discussion John Roesler also proposed to do
>>>>>     without "start branching" operator, and later Paul mentioned that in
>>>>>     the case when we have to add a dynamic number of branches, the
>>>>>     current KIP is 'clumsier' compared to Michael's 'Map' solution. Let
>>>>>     me address both comments here.
>>>>>     >
>>>>>     > 1) "Start branching" operator (I think that *split* is a good name
>>>>>     for it indeed) is critical when we need to do a dynamic branching,
>>>>>     see example below.
>>>>>     >
>>>>>     > 2) No, dynamic branching in current KIP is not clumsy at all.
>>>>>     Imagine a real-world scenario when you need one branch per enum
>>>>>     value (say, RecordType). You can have something like this:
>>>>>     >
>>>>>     > /*John:if we had to start with stream.branch(...) here, it would
>>>>>     have been much messier.*/
>>>>>     > KBranchedStream branched = stream.split();
>>>>>     >
>>>>>     > /*Not clumsy at all :-)*/
>>>>>     > for (RecordType recordType : RecordType.values())
>>>>>     >             branched = branched.branch((k, v) -> v.getRecType() ==
>>>>>     recordType,
>>>>>     >                     recordType::processRecords);
>>>>>     >
>>>>>     > Regards,
>>>>>     >
>>>>>     > Ivan
>>>>>     >
>>>>>     >
>>>>>     > 02.05.2019 14:40, Matthias J. Sax пишет:
>>>>>     >> I also agree with Michael's observation about the core problem of
>>>>>     >> current `branch()` implementation.
>>>>>     >>
>>>>>     >> However, I also don't like to pass in a clumsy Map object. My
>>>>>     thinking
>>>>>     >> was more aligned with Paul's proposal to just add a name to each
>>>>>     >> `branch()` statement and return a `Map<String,KStream>`.
>>>>>     >>
>>>>>     >> It makes the code easier to read, and also make the order of
>>>>>     >> `Predicates` (that is essential) easier to grasp.
>>>>>     >>
>>>>>     >>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>     >>>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>     >>>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>     >>>>>>    .defaultBranch("defaultBranch");
>>>>>     >> An open question is the case for which no defaultBranch() should
>>>> be
>>>>>     >> specified. Atm, `split()` and `branch()` would return
>>>>>     `BranchedKStream`
>>>>>     >> and the call to `defaultBranch()` that returns the `Map` is
>>>> mandatory
>>>>>     >> (what is not the case atm). Or is this actually not a real
>>>> problem,
>>>>>     >> because users can just ignore the branch returned by
>>>>>     `defaultBranch()`
>>>>>     >> in the result `Map` ?
>>>>>     >>
>>>>>     >>
>>>>>     >> About "inlining": So far, it seems to be a matter of personal
>>>>>     >> preference. I can see arguments for both, but no "killer
>>>>>     argument" yet
>>>>>     >> that clearly make the case for one or the other.
>>>>>     >>
>>>>>     >>
>>>>>     >> -Matthias
>>>>>     >>
>>>>>     >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>>>>     >>> Perhaps inlining is the wrong terminology. It doesn’t require
>>>>>     that a lambda with the full downstream topology be defined inline -
>>>>>     it can be a method reference as with Ivan’s original suggestion.
>>>>>     The advantage of putting the predicate and its downstream logic
>>>>>     (Consumer) together in branch() is that they are required to be near
>>>>>     to each other.
>>>>>     >>>
>>>>>     >>> Ultimately the downstream code has to live somewhere, and deep
>>>>>     branch trees will be hard to read regardless.
>>>>>     >>>
>>>>>     >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
>>>>>     <michael.drogalis@confluent.io
>>>>>     <mailto:michael.drogalis@confluent.io>> wrote:
>>>>>     >>>>
>>>>>     >>>> I'm less enthusiastic about inlining the branch logic with its
>>>>>     downstream
>>>>>     >>>> functionality. Programs that have deep branch trees will
>>>>>     quickly become
>>>>>     >>>> harder to read as a single unit.
>>>>>     >>>>
>>>>>     >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
>>>>>     <pgwhalen@gmail.com <mailto:pgwhalen@gmail.com>> wrote:
>>>>>     >>>>>
>>>>>     >>>>> Also +1 on the issues/goals as Michael outlined them, I think
>>>>>     that sets a
>>>>>     >>>>> great framework for the discussion.
>>>>>     >>>>>
>>>>>     >>>>> Regarding the SortedMap solution, my understanding is that the
>>>>>     current
>>>>>     >>>>> proposal in the KIP is what is in my PR which (pending naming
>>>>>     decisions) is
>>>>>     >>>>> roughly this:
>>>>>     >>>>>
>>>>>     >>>>> stream.split()
>>>>>     >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>     >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>     >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
>>>>>     >>>>>
>>>>>     >>>>> Obviously some ordering is necessary, since branching as a
>>>>>     construct
>>>>>     >>>>> doesn't work without it, but this solution seems like it
>>>>>     provides as much
>>>>>     >>>>> associativity as the SortedMap solution, because each branch()
>>>>>     call
>>>>>     >>>>> directly associates the "conditional" with the "code block."
>>>>>     The value it
>>>>>     >>>>> provides over the KIP solution is the accessing of streams in
>>>>>     the same
>>>>>     >>>>> scope.
>>>>>     >>>>>
>>>>>     >>>>> The KIP solution is less "dynamic" than the SortedMap solution
>>>>>     in the sense
>>>>>     >>>>> that it is slightly clumsier to add a dynamic number of
>>>>>     branches, but it is
>>>>>     >>>>> certainly possible.  It seems to me like the API should favor
>>>>>     the "static"
>>>>>     >>>>> case anyway, and should make it simple and readable to
>>>>>     fluently declare and
>>>>>     >>>>> access your branches in-line.  It also makes it impossible to
>>>>>     ignore a
>>>>>     >>>>> branch, and it is possible to build an (almost) identical
>>>>>     SortedMap
>>>>>     >>>>> solution on top of it.
>>>>>     >>>>>
>>>>>     >>>>> I could also see a middle ground where instead of a raw
>>>>>     SortedMap being
>>>>>     >>>>> taken in, branch() takes a name and not a Consumer.  Something
>>>>>     like this:
>>>>>     >>>>>
>>>>>     >>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>     >>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>     >>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>     >>>>>    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>>>>>     >>>>>
>>>>>     >>>>> Pros for that solution:
>>>>>     >>>>> - accessing branched KStreams in same scope
>>>>>     >>>>> - no double brace initialization, hopefully slightly more
>>>>>     readable than
>>>>>     >>>>> SortedMap
>>>>>     >>>>>
>>>>>     >>>>> Cons
>>>>>     >>>>> - downstream branch logic cannot be specified inline which
>>>>>     makes it harder
>>>>>     >>>>> to read top to bottom (like existing API and SortedMap, but
>>>>>     unlike the KIP)
>>>>>     >>>>> - you can forget to "handle" one of the branched streams (like
>>>>>     existing
>>>>>     >>>>> API and SortedMap, but unlike the KIP)
>>>>>     >>>>>
>>>>>     >>>>> (KBranchedStreams could even work *both* ways but perhaps
>>>>>     that's overdoing
>>>>>     >>>>> it).
>>>>>     >>>>>
>>>>>     >>>>> Overall I'm curious how important it is to be able to easily
>>>>>     access the
>>>>>     >>>>> branched KStream in the same scope as the original.  It's
>>>>>     possible that it
>>>>>     >>>>> doesn't need to be handled directly by the API, but instead
>>>>>     left up to the
>>>>>     >>>>> user.  I'm sort of in the middle on it.
>>>>>     >>>>>
>>>>>     >>>>> Paul
>>>>>     >>>>>
>>>>>     >>>>>
>>>>>     >>>>>
>>>>>     >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
>>>>>     <sophie@confluent.io <mailto:sophie@confluent.io>>
>>>>>     >>>>> wrote:
>>>>>     >>>>>
>>>>>     >>>>>> I'd like to +1 what Michael said about the issues with the
>>>>>     existing
>>>>>     >>>>> branch
>>>>>     >>>>>> method, I agree with what he's outlined and I think we should
>>>>>     proceed by
>>>>>     >>>>>> trying to alleviate these problems. Specifically it seems
>>>>>     important to be
>>>>>     >>>>>> able to cleanly access the individual branches (eg by mapping
>>>>>     >>>>>> name->stream), which I thought was the original intention of
>>>>>     this KIP.
>>>>>     >>>>>>
>>>>>     >>>>>> That said, I don't think we should so easily give in to the
>>>>>     double brace
>>>>>     >>>>>> anti-pattern or force ours users into it if at all possible to
>>>>>     >>>>> avoid...just
>>>>>     >>>>>> my two cents.
>>>>>     >>>>>>
>>>>>     >>>>>> Cheers,
>>>>>     >>>>>> Sophie
>>>>>     >>>>>>
>>>>>     >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>>     >>>>>> michael.drogalis@confluent.io
>>>>>     <mailto:michael.drogalis@confluent.io>> wrote:
>>>>>     >>>>>>
>>>>>     >>>>>>> I’d like to propose a different way of thinking about this.
>>>>>     To me,
>>>>>     >>>>> there
>>>>>     >>>>>>> are three problems with the existing branch signature:
>>>>>     >>>>>>>
>>>>>     >>>>>>> 1. If you use it the way most people do, Java raises unsafe
>>>> type
>>>>>     >>>>>> warnings.
>>>>>     >>>>>>> 2. The way in which you use the stream branches is
>>>>>     positionally coupled
>>>>>     >>>>>> to
>>>>>     >>>>>>> the ordering of the conditionals.
>>>>>     >>>>>>> 3. It is brittle to extend existing branch calls with
>>>>>     additional code
>>>>>     >>>>>>> paths.
>>>>>     >>>>>>>
>>>>>     >>>>>>> Using associative constructs instead of relying on ordered
>>>>>     constructs
>>>>>     >>>>>> would
>>>>>     >>>>>>> be a stronger approach. Consider a signature that instead
>>>>>     looks like
>>>>>     >>>>>> this:
>>>>>     >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
>>>>>     Predicate<?
>>>>>     >>>>>>> super K,? super V>>);
>>>>>     >>>>>>>
>>>>>     >>>>>>> Branches are given names in a map, and as a result, the API
>>>>>     returns a
>>>>>     >>>>>>> mapping of names to streams. The ordering of the
>>>> conditionals is
>>>>>     >>>>>> maintained
>>>>>     >>>>>>> because it’s a sorted map. Insert order determines the order
>>>> of
>>>>>     >>>>>> evaluation.
>>>>>     >>>>>>> This solves problem 1 because there are no more varargs. It
>>>>>     solves
>>>>>     >>>>>> problem
>>>>>     >>>>>>> 2 because you no longer lean on ordering to access the
>>>>>     branch you’re
>>>>>     >>>>>>> interested in. It solves problem 3 because you can introduce
>>>>>     another
>>>>>     >>>>>>> conditional by simply attaching another name to the
>>>>>     structure, rather
>>>>>     >>>>>> than
>>>>>     >>>>>>> messing with the existing indices.
>>>>>     >>>>>>>
>>>>>     >>>>>>> One of the drawbacks is that creating the map inline is
>>>>>     historically
>>>>>     >>>>>>> awkward in Java. I know it’s an anti-pattern to use
>>>>>     voluminously, but
>>>>>     >>>>>>> double brace initialization would clean up the aesthetics.
>>>>>     >>>>>>>
>>>>>     >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
>>>>>     <john@confluent.io <mailto:john@confluent.io>>
>>>>>     >>>>> wrote:
>>>>>     >>>>>>>> Hi Ivan,
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> Thanks for the update.
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> FWIW, I agree with Matthias that the current "start
>>>> branching"
>>>>>     >>>>> operator
>>>>>     >>>>>>> is
>>>>>     >>>>>>>> confusing when named the same way as the actual branches.
>>>>>     "Split"
>>>>>     >>>>> seems
>>>>>     >>>>>>>> like a good name. Alternatively, we can do without a "start
>>>>>     >>>>> branching"
>>>>>     >>>>>>>> operator at all, and just do:
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> stream
>>>>>     >>>>>>>>      .branch(Predicate)
>>>>>     >>>>>>>>      .branch(Predicate)
>>>>>     >>>>>>>>      .defaultBranch();
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> Tentatively, I think that this branching operation should be
>>>>>     >>>>> terminal.
>>>>>     >>>>>>> That
>>>>>     >>>>>>>> way, we don't create ambiguity about how to use it. That
>>>>>     is, `branch`
>>>>>     >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is
>>>>>     `void`, to
>>>>>     >>>>>>>> enforce that it comes last, and that there is only one
>>>>>     definition of
>>>>>     >>>>>> the
>>>>>     >>>>>>>> default branch. Potentially, we should log a warning if
>>>>>     there's no
>>>>>     >>>>>>> default,
>>>>>     >>>>>>>> and additionally log a warning (or throw an exception) if a
>>>>>     record
>>>>>     >>>>>> falls
>>>>>     >>>>>>>> though with no default.
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> Thoughts?
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> Thanks,
>>>>>     >>>>>>>> -John
>>>>>     >>>>>>>>
>>>>>     >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>>     >>>>> matthias@confluent.io <mailto:matthias@confluent.io>
>>>>>     >>>>>>>> wrote:
>>>>>     >>>>>>>>
>>>>>     >>>>>>>>> Thanks for updating the KIP and your answers.
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>> this is to make the name similar to String#split
>>>>>     >>>>>>>>>>> that also returns an array, right?
>>>>>     >>>>>>>>> The intend was to avoid name duplication. The return type
>>>>>     should
>>>>>     >>>>>> _not_
>>>>>     >>>>>>>>> be an array.
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> The current proposal is
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> stream.branch()
>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>     >>>>>>>>>      .defaultBranch();
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> IMHO, this reads a little odd, because the first
>>>>>     `branch()` does
>>>>>     >>>>> not
>>>>>     >>>>>>>>> take any parameters and has different semantics than the
>>>> later
>>>>>     >>>>>>>>> `branch()` calls. Note, that from the code snippet above,
>>>> it's
>>>>>     >>>>> hidden
>>>>>     >>>>>>>>> that the first call is `KStream#branch()` while the others
>>>> are
>>>>>     >>>>>>>>> `KBranchedStream#branch()` what makes reading the code
>>>> harder.
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`,
>>>>>     I though
>>>>>     >>>>>> it
>>>>>     >>>>>>>>> might be better to also rename `KStream#branch()` to avoid
>>>> the
>>>>>     >>>>> naming
>>>>>     >>>>>>>>> overlap that seems to be confusing. The following reads
>>>> much
>>>>>     >>>>> cleaner
>>>>>     >>>>>> to
>>>>>     >>>>>>>> me:
>>>>>     >>>>>>>>> stream.split()
>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>     >>>>>>>>>      .defaultBranch();
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> Maybe there is a better alternative to `split()` though to
>>>>>     avoid
>>>>>     >>>>> the
>>>>>     >>>>>>>>> naming overlap.
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately
>>>> we
>>>>>     >>>>> cannot
>>>>>     >>>>>>> have
>>>>>     >>>>>>>>> a method with such name :-)
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up
>>>>>     with a
>>>>>     >>>>> short
>>>>>     >>>>>>>> name?
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP
>>>>>     with all
>>>>>     >>>>> it's
>>>>>     >>>>>>>>> methods? It will be part of public API and should be
>>>>>     contained in
>>>>>     >>>>> the
>>>>>     >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
>>>>>     >>>>>>>>> `defaultBranch()` is.
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> You did not comment on the idea to add a
>>>>>     `KBranchedStream#get(int
>>>>>     >>>>>>> index)
>>>>>     >>>>>>>>> -> KStream` method to get the individually
>>>>>     branched-KStreams. Would
>>>>>     >>>>>> be
>>>>>     >>>>>>>>> nice to get your feedback about it. It seems you suggest
>>>>>     that users
>>>>>     >>>>>>>>> would need to write custom utility code otherwise, to
>>>>>     access them.
>>>>>     >>>>> We
>>>>>     >>>>>>>>> should discuss the pros and cons of both approaches. It
>>>> feels
>>>>>     >>>>>>>>> "incomplete" to me atm, if the API has no built-in support
>>>>>     to get
>>>>>     >>>>> the
>>>>>     >>>>>>>>> branched-KStreams directly.
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>> -Matthias
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>     >>>>>>>>>> Hi all!
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> I have updated the KIP-418 according to the new vision.
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Matthias, thanks for your comment!
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>     >>>>>>>>>> I can see your point: this is to make the name similar to
>>>>>     >>>>>>> String#split
>>>>>     >>>>>>>>>> that also returns an array, right? But is it worth the
>>>>>     loss of
>>>>>     >>>>>>>> backwards
>>>>>     >>>>>>>>>> compatibility? We can have overloaded branch() as well
>>>>>     without
>>>>>     >>>>>>>> affecting
>>>>>     >>>>>>>>>> the existing code. Maybe the old array-based `branch`
>>>> method
>>>>>     >>>>> should
>>>>>     >>>>>>> be
>>>>>     >>>>>>>>>> deprecated, but this is a subject for discussion.
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>>     >>>>> BranchingKStream#branch(),
>>>>>     >>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>> BranchingKStream#default()
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default'
>>>> is,
>>>>>     >>>>>>> however, a
>>>>>     >>>>>>>>>> reserved word, so unfortunately we cannot have a method
>>>>>     with such
>>>>>     >>>>>>> name
>>>>>     >>>>>>>>> :-)
>>>>>     >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument,
>>>> but I
>>>>>     >>>>> think
>>>>>     >>>>>>> that
>>>>>     >>>>>>>>>> is not required?
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Absolutely! I think that was just copy-paste error or
>>>>>     something.
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Dear colleagues,
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> please revise the new version of the KIP and Paul's PR
>>>>>     >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Any new suggestions/objections?
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>>
>>>>>     >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>     >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems
>>>> that
>>>>>     >>>>>>> everybody
>>>>>     >>>>>>>>>>> agrees that the current branch() method using arrays is
>>>> not
>>>>>     >>>>>> optimal.
>>>>>     >>>>>>>>>>> I had a quick look into the PR and I like the overall
>>>>>     proposal.
>>>>>     >>>>>>> There
>>>>>     >>>>>>>>>>> are some minor things we need to consider. I would
>>>>>     recommend the
>>>>>     >>>>>>>>>>> following renaming:
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> KStream#branch() -> #split()
>>>>>     >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>>>>>     >>>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>     BranchingKStream#default()
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> It's just a suggestion to get slightly shorter method
>>>> names.
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> In the current PR, defaultBranch() does take an
>>>>>     `Predicate` as
>>>>>     >>>>>>>> argument,
>>>>>     >>>>>>>>>>> but I think that is not required?
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> Also, we should consider KIP-307, that was recently
>>>>>     accepted and
>>>>>     >>>>>> is
>>>>>     >>>>>>>>>>> currently implemented:
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>     >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`
>>>>>     parameter.
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> For the issue that the created `KStream` object are in
>>>>>     different
>>>>>     >>>>>>>> scopes:
>>>>>     >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
>>>>>     index)` method
>>>>>     >>>>>>> that
>>>>>     >>>>>>>>>>> returns the corresponding "branched" result `KStream`
>>>>>     object?
>>>>>     >>>>>> Maybe,
>>>>>     >>>>>>>> the
>>>>>     >>>>>>>>>>> second argument of `addBranch()` should not be a
>>>>>     >>>>>> `Consumer<KStream>`
>>>>>     >>>>>>>> but
>>>>>     >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return
>>>>>     whatever
>>>>>     >>>>>> the
>>>>>     >>>>>>>>>>> `Function` returns?
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
>>>>>     current
>>>>>     >>>>>>>>>>> proposal. That makes it easier to review.
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>> -Matthias
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>>
>>>>>     >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>     >>>>>>>>>>>> Ivan,
>>>>>     >>>>>>>>>>>>
>>>>>     >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it
>>>>>     makes sense
>>>>>     >>>>>> for
>>>>>     >>>>>>>> you
>>>>>     >>>>>>>>> to
>>>>>     >>>>>>>>>>>> revise the KIP and continue the discussion.  Obviously
>>>>>     we'll
>>>>>     >>>>> need
>>>>>     >>>>>>>> some
>>>>>     >>>>>>>>>>>> buy-in from committers that have actual binding votes on
>>>>>     >>>>> whether
>>>>>     >>>>>>> the
>>>>>     >>>>>>>>> KIP
>>>>>     >>>>>>>>>>>> could be adopted.  It would be great to hear if they
>>>>>     think this
>>>>>     >>>>>> is
>>>>>     >>>>>>> a
>>>>>     >>>>>>>>> good
>>>>>     >>>>>>>>>>>> idea overall.  I'm not sure if that happens just by
>>>>>     starting a
>>>>>     >>>>>>> vote,
>>>>>     >>>>>>>>> or if
>>>>>     >>>>>>>>>>>> there is generally some indication of interest
>>>> beforehand.
>>>>>     >>>>>>>>>>>>
>>>>>     >>>>>>>>>>>> That being said, I'll continue the discussion a bit:
>>>>>     assuming
>>>>>     >>>>> we
>>>>>     >>>>>> do
>>>>>     >>>>>>>>> move
>>>>>     >>>>>>>>>>>> forward the solution of "stream.branch() returns
>>>>>     >>>>>> KBranchedStream",
>>>>>     >>>>>>> do
>>>>>     >>>>>>>>> we
>>>>>     >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"?  I
>>>> would
>>>>>     >>>>> favor
>>>>>     >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs
>>>> that
>>>>>     >>>>>>> accomplish
>>>>>     >>>>>>>>> the
>>>>>     >>>>>>>>>>>> same thing is confusing, especially when they're fairly
>>>>>     similar
>>>>>     >>>>>>>>> anyway.  We
>>>>>     >>>>>>>>>>>> just need to be sure we're not making something
>>>>>     >>>>>>> impossible/difficult
>>>>>     >>>>>>>>> that
>>>>>     >>>>>>>>>>>> is currently possible/easy.
>>>>>     >>>>>>>>>>>>
>>>>>     >>>>>>>>>>>> Regarding my PR - I think the general structure would
>>>> work,
>>>>>     >>>>> it's
>>>>>     >>>>>>>> just a
>>>>>     >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
>>>>>     >>>>>>> particular,
>>>>>     >>>>>>>>>>>> passing in the "predicates" and "children" lists which
>>>> get
>>>>>     >>>>>> modified
>>>>>     >>>>>>>> in
>>>>>     >>>>>>>>>>>> KBranchedStream but read from all the way
>>>>>     KStreamLazyBranch is
>>>>>     >>>>> a
>>>>>     >>>>>>> bit
>>>>>     >>>>>>>>>>>> complicated to follow.
>>>>>     >>>>>>>>>>>>
>>>>>     >>>>>>>>>>>> Thanks,
>>>>>     >>>>>>>>>>>> Paul
>>>>>     >>>>>>>>>>>>
>>>>>     >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>>>     >>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>
>>>>>     >>>>>>>>> wrote:
>>>>>     >>>>>>>>>>>>> Hi Paul!
>>>>>     >>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>> I read your code carefully and now I am fully
>>>>>     convinced: your
>>>>>     >>>>>>>> proposal
>>>>>     >>>>>>>>>>>>> looks better and should work. We just have to document
>>>> the
>>>>>     >>>>>> crucial
>>>>>     >>>>>>>>> fact
>>>>>     >>>>>>>>>>>>> that KStream consumers are invoked as they're added.
>>>>>     And then
>>>>>     >>>>>> it's
>>>>>     >>>>>>>> all
>>>>>     >>>>>>>>>>>>> going to be very nice.
>>>>>     >>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and
>>>>>     resume the
>>>>>     >>>>>>>>>>>>> discussion here, right?
>>>>>     >>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
>>>>>     >>>>> starting
>>>>>     >>>>>>>> point
>>>>>     >>>>>>>>> if
>>>>>     >>>>>>>>>>>>> we go in this direction'? To me it looks like a good
>>>>>     starting
>>>>>     >>>>>>> point.
>>>>>     >>>>>>>>> But
>>>>>     >>>>>>>>>>>>> as a novice in this project I might miss some important
>>>>>     >>>>> details.
>>>>>     >>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>     >>>>>>>>>>>>>> Ivan,
>>>>>     >>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>>>>     >>>>> stream.branch()
>>>>>     >>>>>>>>> solution
>>>>>     >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
>>>>>     >>>>> invoked
>>>>>     >>>>>> as
>>>>>     >>>>>>>>> they’re
>>>>>     >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user
>>>>>     still
>>>>>     >>>>>> ought
>>>>>     >>>>>>> to
>>>>>     >>>>>>>>> be
>>>>>     >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and
>>>>>     depend on
>>>>>     >>>>> the
>>>>>     >>>>>>>>> branched
>>>>>     >>>>>>>>>>>>> streams having been set.
>>>>>     >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to
>>>>>     access
>>>>>     >>>>> the
>>>>>     >>>>>>>>> branched
>>>>>     >>>>>>>>>>>>> streams in the same scope as the original stream (that
>>>>>     is, not
>>>>>     >>>>>>>> inside
>>>>>     >>>>>>>>> the
>>>>>     >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
>>>>>     >>>>> solutions.
>>>>>     >>>>>> It
>>>>>     >>>>>>>>> can be
>>>>>     >>>>>>>>>>>>> worked around though.
>>>>>     >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m
>>>>>     excited
>>>>>     >>>>> to
>>>>>     >>>>>>>> hear
>>>>>     >>>>>>>>>>>>> your thoughts!]
>>>>>     >>>>>>>>>>>>>> Paul
>>>>>     >>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>>>     >>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>
>>>>>     >>>>>>>>> wrote:
>>>>>     >>>>>>>>>>>>>>> Hi Paul!
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
>>>>>     >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at
>>>> first
>>>>>     >>>>> glance,
>>>>>     >>>>>>> but
>>>>>     >>>>>>>>> ---
>>>>>     >>>>>>>>>>>>>>>> the newly branched streams are not available in the
>>>>>     same
>>>>>     >>>>>> scope
>>>>>     >>>>>>> as
>>>>>     >>>>>>>>> each
>>>>>     >>>>>>>>>>>>> other.  That is, if we wanted to merge them back
>>>> together
>>>>>     >>>>> again
>>>>>     >>>>>> I
>>>>>     >>>>>>>>> don't see
>>>>>     >>>>>>>>>>>>> a way to do that.
>>>>>     >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was
>>>>>     just
>>>>>     >>>>>> going
>>>>>     >>>>>>> to
>>>>>     >>>>>>>>>>>>> write in details about this issue.
>>>>>     >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say
>>>>>     we need
>>>>>     >>>>> to
>>>>>     >>>>>>>>> identify
>>>>>     >>>>>>>>>>>>> customers who have bought coffee and made a purchase
>>>>>     in the
>>>>>     >>>>>>>>> electronics
>>>>>     >>>>>>>>>>>>> store to give them coupons.
>>>>>     >>>>>>>>>>>>>>> This is the code I usually write under these
>>>>>     circumstances
>>>>>     >>>>>> using
>>>>>     >>>>>>>> my
>>>>>     >>>>>>>>>>>>> 'brancher' class:
>>>>>     >>>>>>>>>>>>>>> @Setter
>>>>>     >>>>>>>>>>>>>>> class CouponIssuer{
>>>>>     >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
>>>>>     >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>   KStream<...> coupons(){
>>>>>     >>>>>>>>>>>>>>>       return
>>>>>     >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>     >>>>>>>>>>>>>>>       /*In the real world the code here can be
>>>>>     complex, so
>>>>>     >>>>>>>>> creation of
>>>>>     >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in
>>>>>     order to
>>>>>     >>>>>>>> separate
>>>>>     >>>>>>>>>>>>> classes' responsibilities.*/
>>>>>     >>>>>>>>>>>>>>>  }
>>>>>     >>>>>>>>>>>>>>> }
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>     >>>>>>>>>>>>>>>     .branch(predicate1,
>>>> couponIssuer::setCoffePurchases)
>>>>>     >>>>>>>>>>>>>>>     .branch(predicate2,
>>>>>     >>>>>> couponIssuer::setElectronicsPurchases)
>>>>>     >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
>>>>>     everything
>>>>>     >>>>>>>> later,
>>>>>     >>>>>>>>>>>>> without the terminal operation!!!*/
>>>>>     >>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> Does this make sense?  In order to properly
>>>>>     initialize the
>>>>>     >>>>>>>>> CouponIssuer
>>>>>     >>>>>>>>>>>>> we need the terminal operation to be called before
>>>>>     >>>>>>>>> streamsBuilder.build()
>>>>>     >>>>>>>>>>>>> is called.
>>>>>     >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
>>>>>     essentially
>>>>>     >>>>>> the
>>>>>     >>>>>>>>> next
>>>>>     >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts
>>>>>     based on
>>>>>     >>>>> my
>>>>>     >>>>>>>>> experience,
>>>>>     >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>>     >>>>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>     >>>>>>>>>>>>>>>> Ivan,
>>>>>     >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a
>>>>>     fluent
>>>>>     >>>>> API
>>>>>     >>>>>>>> based
>>>>>     >>>>>>>>>>>>> off of
>>>>>     >>>>>>>>>>>>>>>> KStream here
>>>>>     (https://github.com/apache/kafka/pull/6512),
>>>>>     >>>>>> and
>>>>>     >>>>>>> I
>>>>>     >>>>>>>>> think
>>>>>     >>>>>>>>>>>>> I
>>>>>     >>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>     >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect earlier about
>>>>>     >>>>>>> compatibility
>>>>>     >>>>>>>>>>>>> issues,
>>>>>     >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was unaware
>>>>>     that Java
>>>>>     >>>>> is
>>>>>     >>>>>>>> smart
>>>>>     >>>>>>>>>>>>> enough to
>>>>>     >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...)
>>>>>     returning one
>>>>>     >>>>>>> thing
>>>>>     >>>>>>>>> and
>>>>>     >>>>>>>>>>>>> branch()
>>>>>     >>>>>>>>>>>>>>>>    with no arguments returning another thing.
>>>>>     >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't actually
>>>>>     need
>>>>>     >>>>> it.
>>>>>     >>>>>>> We
>>>>>     >>>>>>>>> can
>>>>>     >>>>>>>>>>>>> just
>>>>>     >>>>>>>>>>>>>>>>    build up the branches in the KBranchedStream who
>>>>>     shares
>>>>>     >>>>>> its
>>>>>     >>>>>>>>> state
>>>>>     >>>>>>>>>>>>> with the
>>>>>     >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do the
>>>>>     branching.
>>>>>     >>>>>>> It's
>>>>>     >>>>>>>>> not
>>>>>     >>>>>>>>>>>>> terribly
>>>>>     >>>>>>>>>>>>>>>>    pretty in its current form, but I think it
>>>>>     demonstrates
>>>>>     >>>>>> its
>>>>>     >>>>>>>>>>>>> feasibility.
>>>>>     >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should
>>>> be
>>>>>     >>>>> final
>>>>>     >>>>>> or
>>>>>     >>>>>>>>> even a
>>>>>     >>>>>>>>>>>>>>>> starting point if we go in this direction, I just
>>>>>     wanted to
>>>>>     >>>>>> see
>>>>>     >>>>>>>> how
>>>>>     >>>>>>>>>>>>>>>> challenging it would be to get the API working.
>>>>>     >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing
>>>>>     solution
>>>>>     >>>>>>> could
>>>>>     >>>>>>>> be
>>>>>     >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
>>>>>     >>>>> suggested
>>>>>     >>>>>>>> was a
>>>>>     >>>>>>>>>>>>>>>> possibility.  The reason is that the newly branched
>>>>>     streams
>>>>>     >>>>>> are
>>>>>     >>>>>>>> not
>>>>>     >>>>>>>>>>>>>>>> available in the same scope as each other.  That
>>>>>     is, if we
>>>>>     >>>>>>> wanted
>>>>>     >>>>>>>>> to
>>>>>     >>>>>>>>>>>>> merge
>>>>>     >>>>>>>>>>>>>>>> them back together again I don't see a way to do
>>>>>     that.  The
>>>>>     >>>>>> KIP
>>>>>     >>>>>>>>>>>>> proposal
>>>>>     >>>>>>>>>>>>>>>> has the same issue, though - all this means is that
>>>> for
>>>>>     >>>>>> either
>>>>>     >>>>>>>>>>>>> solution,
>>>>>     >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the
>>>>>     table.
>>>>>     >>>>>>>>>>>>>>>> Thanks,
>>>>>     >>>>>>>>>>>>>>>> Paul
>>>>>     >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
>>>>>     >>>>>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>>
>>>>>     >>>>>>>>>>>>> wrote:
>>>>>     >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to
>>>> this
>>>>>     >>>>>> point.
>>>>>     >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that
>>>>>     branch API
>>>>>     >>>>>>> needs
>>>>>     >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
>>>>>     >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
>>>>>     >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...).... //onTopOf
>>>>>     returns
>>>>>     >>>>>> its
>>>>>     >>>>>>>>> argument
>>>>>     >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code
>>>> won't
>>>>>     >>>>> make
>>>>>     >>>>>>>> sense
>>>>>     >>>>>>>>>>>>> until
>>>>>     >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher
>>>>>     instance
>>>>>     >>>>>>>>> contrasts the
>>>>>     >>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> stream
>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
>>>>>     >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or noDefault(). Both
>>>>>     >>>>>>>>> defaultBranch(..)
>>>>>     >>>>>>>>>>>>> and
>>>>>     >>>>>>>>>>>>>>>>> noDefault() return void
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface
>>>> is
>>>>>     >>>>>> defined.
>>>>>     >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>>     >>>>>>>> (defaultBranch(ks->)
>>>>>     >>>>>>>>> and
>>>>>     >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to
>>>>>     miss the
>>>>>     >>>>>> fact
>>>>>     >>>>>>>>> that one
>>>>>     >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
>>>>>     methods
>>>>>     >>>>>> are
>>>>>     >>>>>>>> not
>>>>>     >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do
>>>> better?
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>     >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>     >>>>>>>>>>>>>>>>>>> Paul,
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
>>>>>     >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
>>>>>     implemented the
>>>>>     >>>>>>> easy
>>>>>     >>>>>>>>> way.
>>>>>     >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
>>>> assumes
>>>>>     >>>>> nothing
>>>>>     >>>>>>>> will
>>>>>     >>>>>>>>>>>>> reach
>>>>>     >>>>>>>>>>>>>>>>>>>> the default branch,
>>>>>     >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only
>>>> option
>>>>>     >>>>>> besides
>>>>>     >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we
>>>>>     want to
>>>>>     >>>>>> just
>>>>>     >>>>>>>>> silently
>>>>>     >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
>>>>>     predicate. 2)
>>>>>     >>>>>>> Throwing
>>>>>     >>>>>>>>> an
>>>>>     >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing
>>>>>     looks
>>>>>     >>>>>> like a
>>>>>     >>>>>>>> bad
>>>>>     >>>>>>>>>>>>> idea.
>>>>>     >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to
>>>>>     emit a
>>>>>     >>>>>>>> special
>>>>>     >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly
>>>> where
>>>>>     >>>>>>> `default`
>>>>>     >>>>>>>>> can
>>>>>     >>>>>>>>>>>>> be
>>>>>     >>>>>>>>>>>>>>>>>>> used.
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>     >>>>> InternalTopologyBuilder
>>>>>     >>>>>>> to
>>>>>     >>>>>>>>> track
>>>>>     >>>>>>>>>>>>>>>>>>>> dangling
>>>>>     >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>>     a clear
>>>>>     >>>>>>> error
>>>>>     >>>>>>>>>>>>> before it
>>>>>     >>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
>>>>>     >>>>> compiled
>>>>>     >>>>>>> and
>>>>>     >>>>>>>>> run?
>>>>>     >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't
>>>>>     compile if
>>>>>     >>>>> used
>>>>>     >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
>>>>>     method chain
>>>>>     >>>>>>>> starting
>>>>>     >>>>>>>>>>>>> from
>>>>>     >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference
>>>>>     between
>>>>>     >>>>>>>> runtime
>>>>>     >>>>>>>>> and
>>>>>     >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
>>>>>     >>>>> instantly
>>>>>     >>>>>> on
>>>>>     >>>>>>>>> unit
>>>>>     >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a
>>>>>     compilation
>>>>>     >>>>>>>> failure.
>>>>>     >>>>>>>>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>     >>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
>>>>>     required.
>>>>>     >>>>>>> But
>>>>>     >>>>>>>> is
>>>>>     >>>>>>>>>>>>> that
>>>>>     >>>>>>>>>>>>>>>>>>>> really
>>>>>     >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't want a
>>>>>     >>>>>> defaultBranch
>>>>>     >>>>>>>>> they
>>>>>     >>>>>>>>>>>>> can
>>>>>     >>>>>>>>>>>>>>>>>>>> call
>>>>>     >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?)
>>>>>     just as
>>>>>     >>>>>>>>> easily.  In
>>>>>     >>>>>>>>>>>>>>>>>>>> fact I
>>>>>     >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API
>>>> - a
>>>>>     >>>>> user
>>>>>     >>>>>>>> could
>>>>>     >>>>>>>>>>>>> specify
>>>>>     >>>>>>>>>>>>>>>>> a
>>>>>     >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach
>>>> the
>>>>>     >>>>>> default
>>>>>     >>>>>>>>> branch,
>>>>>     >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>> That
>>>>>     >>>>> seems
>>>>>     >>>>>>> like
>>>>>     >>>>>>>>> an
>>>>>     >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,
>>>>>     which allows
>>>>>     >>>>>> for
>>>>>     >>>>>>>> the
>>>>>     >>>>>>>>>>>>> more
>>>>>     >>>>>>>>>>>>>>>>>>>> subtle
>>>>>     >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
>>>> dropped.
>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has
>>>>>     to be
>>>>>     >>>>>> well
>>>>>     >>>>>>>>>>>>>>>>>>>> documented, but
>>>>>     >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>     >>>>> InternalTopologyBuilder
>>>>>     >>>>>>> to
>>>>>     >>>>>>>>> track
>>>>>     >>>>>>>>>>>>>>>>>>>> dangling
>>>>>     >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>>     a clear
>>>>>     >>>>>>> error
>>>>>     >>>>>>>>>>>>> before it
>>>>>     >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that there is
>>>> a
>>>>>     >>>>> "build
>>>>>     >>>>>>>> step"
>>>>>     >>>>>>>>>>>>> where
>>>>>     >>>>>>>>>>>>>>>>> the
>>>>>     >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>>     >>>>>> StreamsBuilder.build()
>>>>>     >>>>>>> is
>>>>>     >>>>>>>>>>>>> called.
>>>>>     >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I
>>>> agree
>>>>>     >>>>> that
>>>>>     >>>>>>> it's
>>>>>     >>>>>>>>>>>>>>>>>>>> critical to
>>>>>     >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
>>>>>     stream.
>>>>>     >>>>>>> With
>>>>>     >>>>>>>>> the
>>>>>     >>>>>>>>>>>>>>>>> fluent
>>>>>     >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all
>>>> other
>>>>>     >>>>>>> operations
>>>>>     >>>>>>>>> do -
>>>>>     >>>>>>>>>>>>> if
>>>>>     >>>>>>>>>>>>>>>>> you
>>>>>     >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
>>>> multiple
>>>>>     >>>>> times,
>>>>>     >>>>>>> you
>>>>>     >>>>>>>>> just
>>>>>     >>>>>>>>>>>>>>>>>>>> need the
>>>>>     >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
>>>>>     operations
>>>>>     >>>>>> on
>>>>>     >>>>>>> it
>>>>>     >>>>>>>>> as
>>>>>     >>>>>>>>>>>>> you
>>>>>     >>>>>>>>>>>>>>>>>>>> desire.
>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> Best,
>>>>>     >>>>>>>>>>>>>>>>>>>> Paul
>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
>>>>>     >>>>>>>>> iponomarev@mail.ru <mailto:iponomarev@mail.ru>
>>>>>     >>>>>>>>>>>>>>>>>>>> wrote:
>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not
>>>>>     always need
>>>>>     >>>>>> the
>>>>>     >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
>>>> operation we
>>>>>     >>>>> don't
>>>>>     >>>>>>>> know
>>>>>     >>>>>>>>>>>>> when to
>>>>>     >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument,
>>>>>     so we
>>>>>     >>>>> can
>>>>>     >>>>>> do
>>>>>     >>>>>>>>>>>>> something
>>>>>     >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
>>>> special
>>>>>     >>>>> object
>>>>>     >>>>>>>>>>>>> construction
>>>>>     >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods.
>>>> But
>>>>>     >>>>> here
>>>>>     >>>>>> we
>>>>>     >>>>>>>>> have a
>>>>>     >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the
>>>>>     flow,
>>>>>     >>>>> so
>>>>>     >>>>>> I
>>>>>     >>>>>>>>> think
>>>>>     >>>>>>>>>>>>> this
>>>>>     >>>>>>>>>>>>>>>>> is
>>>>>     >>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>     >>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this
>>>>>     API, but I
>>>>>     >>>>>> find
>>>>>     >>>>>>>> the
>>>>>     >>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>     >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
>>>>>     contrasts the
>>>>>     >>>>>>> fluency
>>>>>     >>>>>>>>> of
>>>>>     >>>>>>>>>>>>> other
>>>>>     >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd like to
>>>>>     just call
>>>>>     >>>>> a
>>>>>     >>>>>>>>> method on
>>>>>     >>>>>>>>>>>>> the
>>>>>     >>>>>>>>>>>>>>>>>>>>> stream
>>>>>     >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch
>>>>>     cases
>>>>>     >>>>> are
>>>>>     >>>>>>>>> defined
>>>>>     >>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>     >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase)
>>>>>     is very
>>>>>     >>>>>> nice
>>>>>     >>>>>>>>> and the
>>>>>     >>>>>>>>>>>>>>>>>>>>>> right
>>>>>     >>>>>>>>>>>>>>>>>>>>> way
>>>>>     >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around
>>>>>     how we
>>>>>     >>>>>>> specify
>>>>>     >>>>>>>>> the
>>>>>     >>>>>>>>>>>>> source
>>>>>     >>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> Like:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>     >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
>>>> this::handle1)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
>>>> this::handle2)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>           .defaultBranch(this::handleDefault);
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
>>>>>     >>>>>>>> KStreamBrancher
>>>>>     >>>>>>>>> or
>>>>>     >>>>>>>>>>>>>>>>>>>>> something,
>>>>>     >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
>>>>>     terminated by
>>>>>     >>>>>>>>>>>>> defaultBranch()
>>>>>     >>>>>>>>>>>>>>>>>>>>>> (which
>>>>>     >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
>>>>>     incompatible with
>>>>>     >>>>> the
>>>>>     >>>>>>>>> current
>>>>>     >>>>>>>>>>>>>>>>>>>>>> API, so
>>>>>     >>>>>>>>>>>>>>>>>>>>> the
>>>>>     >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a
>>>>>     different
>>>>>     >>>>>> name,
>>>>>     >>>>>>>> but
>>>>>     >>>>>>>>> that
>>>>>     >>>>>>>>>>>>>>>>>>>>>> seems
>>>>>     >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
>>>>>     >>>>>> something
>>>>>     >>>>>>>> like
>>>>>     >>>>>>>>>>>>>>>>>>>>>> branched()
>>>>>     >>>>>>>>>>>>>>>>>>>>> or
>>>>>     >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your
>>>>>     KIP?  It
>>>>>     >>>>>> seems
>>>>>     >>>>>>>>> like it
>>>>>     >>>>>>>>>>>>>>>>>>>>>> does to
>>>>>     >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching
>>>>>     while also
>>>>>     >>>>>>>> allowing
>>>>>     >>>>>>>>> you
>>>>>     >>>>>>>>>>>>> to
>>>>>     >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
>>>>>     KBranchedStreams
>>>>>     >>>>>> if
>>>>>     >>>>>>>>> desired.
>>>>>     >>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>     >>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>>     >>>>>>>>>>>>>>>>>>>>> <iponomarev@mail.ru.invalid>
>>>>>     >>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String>
>>>>>     ks){
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>           ks.filter(....).mapValues(...)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
>>>>>     String> ks){
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>           ks.selectKey(...).groupByKey()...
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> ......
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
>>>>>     this::handleFirstCase)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
>>>>>     this::handleSecondCase)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
>>>> KafkaStreamsBrancher
>>>>>     >>>>> takes a
>>>>>     >>>>>>>>> Consumer
>>>>>     >>>>>>>>>>>>> as a
>>>>>     >>>>>>>>>>>>>>>>>>>>>>> second
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the
>>>>>     example in
>>>>>     >>>>>> the
>>>>>     >>>>>>>> KIP
>>>>>     >>>>>>>>>>>>> shows
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> each
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
>>>>>     >>>>>>>>> (KafkaStreams#to()
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would
>>>> we
>>>>>     >>>>> handle
>>>>>     >>>>>>> the
>>>>>     >>>>>>>>> case
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to
>>>> continue
>>>>>     >>>>>>>> processing
>>>>>     >>>>>>>>> and
>>>>>     >>>>>>>>>>>>> not
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the
>>>> branched
>>>>>     >>>>>> stream
>>>>>     >>>>>>>>>>>>> immediately?
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if
>>>>>     we had
>>>>>     >>>>>>>> something
>>>>>     >>>>>>>>> like
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck
>>>> <
>>>>>     >>>>>>>>> bbejeck@gmail.com <mailto:bbejeck@gmail.com>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for
>>>> KIP-
>>>>>     >>>>> 418.
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
>>>> KIP-418.
>>>>>     >>>>> Please
>>>>>     >>>>>>>> take
>>>>>     >>>>>>>>> a
>>>>>     >>>>>>>>>>>>> look
>>>>>     >>>>>>>>>>>>>>>>> at
>>>>>     >>>>>>>>>>>>>>>>>>>>> the
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any
>>>>>     feedback :)
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>>     >>>>>
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>     >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>>     >>>>> https://github.com/apache/kafka/pull/6164
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>     >>>>>>>>>
>>>>>     >
>>>>>
>>>>
> 


Mime
View raw message