kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] Streams DSL/StateStore Refactoring
Date Fri, 07 Jul 2017 15:23:38 GMT
I messed the indentation on github code repos; this would be easier to read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi Damian / Kyle,
>
> I think I agree with you guys about the pros / cons of using the builder
> pattern v.s. using some "secondary classes". And I'm thinking if we can
> take a "mid" manner between these two. I spent some time with a slight
> different approach from Damian's current proposal:
>
> https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
> java/org/apache/kafka/streams/RefactoredAPIs.java
>
> The key idea is to tolerate the final "table()" or "stream()" function to
> "upgrade" from the secondary classes to the first citizen classes, while
> having all the specs inside this function. Also this proposal includes some
> other refactoring that people have been discussed about for the builder to
> reduce the overloaded functions as well. WDYT?
>
>
> Guozhang
>
>
> On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> Hi Jan,
>>
>> Thanks very much for the input.
>>
>> On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <Jan.Filipiak@trivago.com>
>> wrote:
>>
>> > Hi Damian,
>> >
>> > I do see your point of something needs to change. But I fully agree with
>> > Gouzhang when he says.
>> > ---
>> >
>> > But since this is a incompatibility change, and we are going to remove
>> the
>> > compatibility annotations soon it means we only have one chance and we
>> > really have to make it right.
>> > ----
>> >
>> >
>> I think we all agree on this one! Hence the discussion.
>>
>>
>> > I fear all suggestions do not go far enough to become something that
>> will
>> > carry on for very much longer.
>> > I am currently working on KAFKA-3705 and try to find the most easy way
>> for
>> > the user to give me all the required functionality. The easiest
>> interface I
>> > could come up so far can be looked at here.
>> >
>> >
>> > https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>> kafka/streams/kstream/internals/KTableImpl.java#L622
>> >
>> >
>> And its already horribly complicated. I am currently unable to find the
>> > right abstraction level to have everything falling into place
>> naturally. To
>> > be honest I already think introducing
>> >
>> >
>> To be fair that is not a particularly easy problem to solve!
>>
>>
>> >
>> > https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>> kafka/streams/kstream/internals/KTableImpl.java#L493
>> >
>> > was unideal and makes everything a mess.
>>
>>
>> I'm not sure i agree that it makes everything a mess, but It could have
>> been done differently.
>>
>> The JoinType:Whatever is also not really flexible. 2 things come to my
>> mind:
>> >
>> > 1. I don't think we should rule out config based decisions say configs
>> like
>> >         streams.$applicationID.joins.$joinname.conf = value
>> >
>>
>> Is this just for config? Or are you suggesting that we could somehow
>> "code"
>> the join in a config file?
>>
>>
>> > This can allow for tremendous changes without single API change and IMO
>> it
>> > was not considered enough yet.
>> >
>> > 2. Push logic from the DSL to the Callback classes. A ValueJoiner for
>> > example can be used to implement different join types as the user
>> wishes.
>> >
>>
>> Do you have an example of how this might look?
>>
>>
>> > As Gouzhang said: stopping to break users is very important.
>>
>>
>> Of course. We want to make it as easy as possible for people to use
>> streams.
>>
>>
>> especially with this changes + All the plans I sadly only have in my head
>> > but hopefully the first link can give a glimpse.
>> >
>> > Thanks for preparing the examples made it way clearer to me what exactly
>> > we are talking about. I would argue to go a bit slower and more
>> carefull on
>> > this one. At some point we need to get it right. Peeking over to the
>> hadoop
>> > guys with their hughe userbase. Config files really work well for them.
>> >
>> > Best Jan
>> >
>> >
>> >
>> >
>> >
>> > On 30.06.2017 09:31, Damian Guy wrote:
>> > > Thanks Matthias
>> > >
>> > > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matthias@confluent.io>
>> > wrote:
>> > >
>> > >> I am just catching up on this thread, so sorry for the long email in
>> > >> advance... Also, it's to some extend a dump of thoughts and not
>> always a
>> > >> clear proposal. Still need to think about this in more detail. But
>> maybe
>> > >> it helps other to get new ideas :)
>> > >>
>> > >>
>> > >>>> However, I don't understand your argument about putting aggregate()
>> > >>>> after the withXX() -- all the calls to withXX() set optional
>> > parameters
>> > >>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>> > >>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
>> might
>> > >>>> be quite confusion for developers.
>> > >>>>
>> > >>>>
>> > >>> I see what you are saying, but the grouped stream is effectively a
>> > no-op
>> > >>> until you call one of the aggregate/count/reduce etc functions. So
>> the
>> > >>> optional params are ones that are applicable to any of the
>> operations
>> > you
>> > >>> can perform on this grouped stream. Then the final
>> > >>> count()/reduce()/aggregate() call has any of the params that are
>> > >>> required/specific to that function.
>> > >>>
>> > >> I understand your argument, but you don't share the conclusion. If we
>> > >> need a "final/terminal" call, the better way might be
>> > >>
>> > >> .groupBy().count().withXX().build()
>> > >>
>> > >> (with a better name for build() though)
>> > >>
>> > >>
>> > > The point is that all the other calls, i.e,withBlah, windowed, etc
>> apply
>> > > too all the aggregate functions. The terminal call being the actual
>> type
>> > of
>> > > aggregation you want to do. I personally find this more natural than
>> > > groupBy().count().withBlah().build()
>> > >
>> > >
>> > >>> groupedStream.count(/** non windowed count**/)
>> > >>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>> > >>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>> > >>
>> > >> I like this. However, I don't see a reason to have windowed() and
>> > >> sessionWindowed(). We should have one top-level `Windows` interface
>> that
>> > >> both `TimeWindows` and `SessionWindows` implement and just have a
>> single
>> > >> windowed() method that accepts all `Windows`. (I did not like the
>> > >> separation of `SessionWindows` in the first place, and this seems to
>> be
>> > >> an opportunity to clean this up. It was hard to change when we
>> > >> introduced session windows)
>> > >>
>> > > Yes - true we should look into that.
>> > >
>> > >
>> > >> Btw: we do you the imperative groupBy() and groupByKey(), and thus we
>> > >> might also want to use windowBy() (instead of windowed()). Not sure
>> how
>> > >> important this is, but it seems to be inconsistent otherwise.
>> > >>
>> > >>
>> > > Makes sense
>> > >
>> > >
>> > >> About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I
>> think,
>> > >> defining an inner/left/outer join is not an optional argument but a
>> > >> first class concept and should have a proper representation in the
>> API
>> > >> (like the current methods join(), leftJoin, outerJoin()).
>> > >>
>> > >>
>> > > Yep, i did originally have it as a required param and maybe that is
>> what
>> > we
>> > > go with. It could have a default, but maybe that is confusing.
>> > >
>> > >
>> > >
>> > >> About the two join API proposals, the second one has too much boiler
>> > >> plate code for my taste. Also, the actual join() operator has only
>> one
>> > >> argument what is weird to me, as in my thinking process, the main
>> > >> operator call, should have one parameter per mandatory argument but
>> your
>> > >> proposal put the mandatory arguments into Joins.streamStreamJoin()
>> call.
>> > >> This is far from intuitive IMHO.
>> > >>
>> > >>
>> > > This is the builder pattern, you only need one param as the builder
>> has
>> > > captured all of the required and optional arguments.
>> > >
>> > >
>> > >> The first join proposal also seems to align better with the pattern
>> > >> suggested for aggregations and having the same pattern for all
>> operators
>> > >> is important (as you stated already).
>> > >>
>> > >>
>> > > This is why i offered two alternatives as i started out with. 1 is the
>> > > builder pattern, the other is the more fluent pattern.
>> > >
>> > >
>> > >>
>> > >> Coming back to the config vs optional parameter. What about having a
>> > >> method withConfig[s](...) that allow to put in the configuration?
>> > >>
>> > >>
>> > > Sure, it is currently called withLogConfig() as that is the only thing
>> > that
>> > > is really config.
>> > >
>> > >
>> > >> This also raises the question if until() is a windows property?
>> > >> Actually, until() seems to be a configuration parameter and thus,
>> should
>> > >> not not have it's own method.
>> > >>
>> > >>
>> > > Hmmm, i don't agree. Until is a property of the window. It is going
>> to be
>> > > potentially different for every window operation you do in a streams
>> app.
>> > >
>> > >
>> > >>
>> > >> Browsing throw your example DSL branch, I also saw this one:
>> > >>
>> > >>> final KTable<Windowed<String>, Long> windowed>
>> > >>   groupedStream.counting()
>> > >>>                   .windowed(TimeWindows.of(10L).until(10))
>> > >>>                   .table();
>> > >> This is an interesting idea, and it remind my on some feedback about
>> "I
>> > >> wanted to count a stream, but there was no count() method -- I first
>> > >> needed to figure out, that I need to group the stream first to be
>> able
>> > >> to count it. It does make sense in hindsight but was not obvious in
>> the
>> > >> beginning". Thus, carrying out this thought, we could also do the
>> > >> following:
>> > >>
>> > >> stream.count().groupedBy().windowedBy().table();
>> > >>
>> > >> -> Note, I use "grouped" and "windowed" instead of imperative here,
>> as
>> > >> it comes after the count()
>> > >>
>> > >> This would be more consistent than your proposal (that has grouping
>> > >> before but windowing after count()). It might even allow us to enrich
>> > >> the API with a some syntactic sugar like `stream.count().table()` to
>> get
>> > >> the overall count of all records (this would obviously not scale,
>> but we
>> > >> could support it -- if not now, maybe later).
>> > >>
>> > >>
>> > > I guess i'd prefer
>> > > stream.groupBy().windowBy().count()
>> > > stream.groupBy().windowBy().reduce()
>> > > stream.groupBy().count()
>> > >
>> > > As i said above, everything that happens before the final aggregate
>> call
>> > > can be applied to any of them. So it makes sense to me to do those
>> things
>> > > ahead of the final aggregate call.
>> > >
>> > >
>> > >> Last about builder pattern. I am convinced that we need some
>> "terminal"
>> > >> operator/method that tells us when to add the processor to the
>> topology.
>> > >> But I don't see the need for a plain builder pattern that feels
>> alien to
>> > >> me (see my argument about the second join proposal). Using .stream()
>> /
>> > >> .table() as use in many examples might work. But maybe a more generic
>> > >> name that we can use in all places like build() or apply() might
>> also be
>> > >> an option.
>> > >>
>> > >>
>> > > Sure, a generic name might be ok.
>> > >
>> > >
>> > >
>> > >
>> > >> -Matthias
>> > >>
>> > >>
>> > >>
>> > >> On 6/29/17 7:37 AM, Damian Guy wrote:
>> > >>> Thanks Kyle.
>> > >>>
>> > >>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <
>> winkelman.kyle@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>>> Hi Damian,
>> > >>>>
>> > >>>>>>>> When trying to program in the fluent API that has been
>> discussed
>> > >> most
>> > >>>> it
>> > >>>>>>>> feels difficult to know when you will actually get an object
>> you
>> > can
>> > >>>> reuse.
>> > >>>>>>>> What if I make one KGroupedStream that I want to reuse, is it
>> > legal
>> > >> to
>> > >>>>>>>> reuse it or does this approach expect you to call grouped each
>> > time?
>> > >>>>>> I'd anticipate that once you have a KGroupedStream you can
>> re-use it
>> > >> as
>> > >>>> you
>> > >>>>>> can today.
>> > >>>> You said it yourself in another post that the grouped stream is
>> > >>>> effectively a no-op until a count, reduce, or aggregate. The way I
>> see
>> > >> it
>> > >>>> you wouldn’t be able to reuse anything except KStreams and KTables,
>> > >> because
>> > >>>> most of this fluent api would continue returning this (this being
>> the
>> > >>>> builder object currently being manipulated).
>> > >>> So, if you ever store a reference to anything but KStreams and
>> KTables
>> > >> and
>> > >>>> you use it in two different ways then its possible you make
>> > conflicting
>> > >>>> withXXX() calls on the same builder.
>> > >>>>
>> > >>>>
>> > >>> No necessarily true. It could return a new instance of the builder,
>> > i.e.,
>> > >>> the builders being immutable. So if you held a reference to the
>> builder
>> > >> it
>> > >>> would always be the same as it was when it was created.
>> > >>>
>> > >>>
>> > >>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes =
>> kStream.grouped();
>> > >>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
>> > >>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
>> > >>>>
>> > >>>> I’ll admit that this shouldn’t happen but some user is going to do
>> it
>> > >>>> eventually…
>> > >>>> Depending on implementation uses of groupedStreamWithDefaultSerdes
>> > would
>> > >>>> most likely be equivalent to the version withDeclaredSerdes. One
>> work
>> > >>>> around would be to always make copies of the config objects you are
>> > >>>> building, but this approach has its own problem because now we
>> have to
>> > >>>> identify which configs are equivalent so we don’t create repeated
>> > >>>> processors.
>> > >>>>
>> > >>>> The point of this long winded example is that we always have to be
>> > >>>> thinking about all of the possible ways it could be misused by a
>> user
>> > >>>> (causing them to see hard to diagnose problems).
>> > >>>>
>> > >>> Exactly! That is the point of the discussion really.
>> > >>>
>> > >>>
>> > >>>> In my attempt at a couple methods with builders I feel that I could
>> > >>>> confidently say the user couldn’t really mess it up.
>> > >>>>> // Count
>> > >>>>> KTable<String, Long> count =
>> > >>>>>
>> > kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
>> > >>>> The kGroupedStream is reusable and if they attempted to reuse the
>> > Count
>> > >>>> for some reason it would throw an error message saying that a store
>> > >> named
>> > >>>> “my-store” already exists.
>> > >>>>
>> > >>>>
>> > >>> Yes i agree and i think using builders is my preferred pattern.
>> > >>>
>> > >>> Cheers,
>> > >>> Damian
>> > >>>
>> > >>>
>> > >>>> Thanks,
>> > >>>> Kyle
>> > >>>>
>> > >>>> From: Damian Guy
>> > >>>> Sent: Thursday, June 29, 2017 3:59 AM
>> > >>>> To: dev@kafka.apache.org
>> > >>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>> > >>>>
>> > >>>> Hi Kyle,
>> > >>>>
>> > >>>> Thanks for your input. Really appreciated.
>> > >>>>
>> > >>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <
>> winkelman.kyle@gmail.com
>> > >
>> > >>>> wrote:
>> > >>>>
>> > >>>>> I like more of a builder pattern even though others have voiced
>> > against
>> > >>>>> it. The reason I like it is because it makes it clear to the user
>> > that
>> > >> a
>> > >>>>> call to KGroupedStream#count will return a KTable not some
>> > intermediate
>> > >>>>> class that I need to undetstand.
>> > >>>>>
>> > >>>> Yes, that makes sense.
>> > >>>>
>> > >>>>
>> > >>>>> When trying to program in the fluent API that has been discussed
>> most
>> > >> it
>> > >>>>> feels difficult to know when you will actually get an object you
>> can
>> > >>>> reuse.
>> > >>>>> What if I make one KGroupedStream that I want to reuse, is it
>> legal
>> > to
>> > >>>>> reuse it or does this approach expect you to call grouped each
>> time?
>> > >>>>
>> > >>>> I'd anticipate that once you have a KGroupedStream you can re-use
>> it
>> > as
>> > >> you
>> > >>>> can today.
>> > >>>>
>> > >>>>
>> > >>>>> This question doesn’t pop into my head at all in the builder
>> pattern
>> > I
>> > >>>>> assume I can reuse everything.
>> > >>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big
>> > fan
>> > >> of
>> > >>>>> the grouped.
>> > >>>>>
>> > >>>>> Yes, grouped() was more for demonstration and because groupBy()
>> and
>> > >>>> groupByKey() were taken! So i'd imagine the api would actually
>> want to
>> > >> be
>> > >>>> groupByKey(/** no required args***/).withOptionalArg() and
>> > >>>> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all
>> > >> depends
>> > >>>> on maintaining backward compatibility.
>> > >>>>
>> > >>>>
>> > >>>>> Unfortunately, the below approach would require atleast 2
>> (probably
>> > 3)
>> > >>>>> overloads (one for returning a KTable and one for returning a
>> KTable
>> > >> with
>> > >>>>> Windowed Key, probably would want to split windowed and
>> > sessionwindowed
>> > >>>> for
>> > >>>>> ease of implementation) of each count, reduce, and aggregate.
>> > >>>>> Obviously not exhaustive but enough for you to get the picture.
>> > Count,
>> > >>>>> Reduce, and Aggregate supply 3 static methods to initialize the
>> > >> builder:
>> > >>>>> // Count
>> > >>>>> KTable<String, Long> count =
>> > >>>>>
>> > groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>> > >>>>>
>> > >>>>> // Windowed Count
>> > >>>>> KTable<Windowed<String>, Long> windowedCount =
>> > >>>>>
>> > >>
>> > groupedStream.count(Count.windowed(TimeWindows.of(10L).until
>> (10)).withQueryableStoreName("my-windowed-store"));
>> > >>>>> // Session Count
>> > >>>>> KTable<Windowed<String>, Long> sessionCount =
>> > >>>>>
>> > >>
>> > groupedStream.count(Count.sessionWindowed(SessionWindows.
>> with(10L)).withQueryableStoreName("my-session-windowed-store"));
>> > >>>>>
>> > >>>> Above and below, i think i'd prefer it to be:
>> > >>>> groupedStream.count(/** non windowed count**/)
>> > >>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>> > >>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>> // Reduce
>> > >>>>> Reducer<Long> reducer;
>> > >>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer,
>> > >>>>> Reduce.reduce().withQueryableStoreName("my-store"));
>> > >>>>>
>> > >>>>> // Aggregate Windowed with Custom Store
>> > >>>>> Initializer<String> initializer;
>> > >>>>> Aggregator<String, Long, String> aggregator;
>> > >>>>> KTable<Windowed<String>, String> aggregate =
>> > >>>>> groupedStream.aggregate(initializer, aggregator,
>> > >>>>>
>> > >>
>> > Aggregate.windowed(TimeWindows.of(10L).until(10)).
>> withStateStoreSupplier(stateStoreSupplier)));
>> > >>>>> // Cogroup SessionWindowed
>> > >>>>> KTable<String, String> cogrouped =
>> > groupedStream1.cogroup(aggregator1)
>> > >>>>>          .cogroup(groupedStream2, aggregator2)
>> > >>>>>          .aggregate(initializer, aggregator,
>> > >>>>> Aggregate.sessionWindowed(SessionWindows.with(10L),
>> > >>>>> sessionMerger).withQueryableStoreName("my-store"));
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> public class Count {
>> > >>>>>
>> > >>>>>      public static class Windowed extends Count {
>> > >>>>>          private Windows windows;
>> > >>>>>      }
>> > >>>>>      public static class SessionWindowed extends Count {
>> > >>>>>          private SessionWindows sessionWindows;
>> > >>>>>      }
>> > >>>>>
>> > >>>>>      public static Count count();
>> > >>>>>      public static Windowed windowed(Windows windows);
>> > >>>>>      public static SessionWindowed sessionWindowed(SessionWindows
>> > >>>>> sessionWindows);
>> > >>>>>
>> > >>>>>      // All withXXX(...) methods.
>> > >>>>> }
>> > >>>>>
>> > >>>>> public class KGroupedStream {
>> > >>>>>      public KTable<K, Long> count(Count count);
>> > >>>>>      public KTable<Windowed<K>, Long> count(Count.Windowed count);
>> > >>>>>      public KTable<Windowed<K>, Long> count(Count.SessionWindowed
>> > >> count);
>> > >>>>> …
>> > >>>>> }
>> > >>>>>
>> > >>>>>
>> > >>>>> Thanks,
>> > >>>>> Kyle
>> > >>>>>
>> > >>>>> From: Guozhang Wang
>> > >>>>> Sent: Wednesday, June 28, 2017 7:45 PM
>> > >>>>> To: dev@kafka.apache.org
>> > >>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>> > >>>>>
>> > >>>>> I played the current proposal a bit with
>> > >> https://github.com/dguy/kafka/
>> > >>>>> tree/dsl-experiment <
>> > https://github.com/dguy/kafka/tree/dsl-experiment
>> > >>> ,
>> > >>>>> and here are my observations:
>> > >>>>>
>> > >>>>> 1. Personally I prefer
>> > >>>>>
>> > >>>>>      "stream.group(mapper) / stream.groupByKey()"
>> > >>>>>
>> > >>>>> than
>> > >>>>>
>> > >>>>>      "stream.group().withKeyMapper(mapper) / stream.group()"
>> > >>>>>
>> > >>>>> Since 1) withKeyMapper is not enforced programmatically though it
>> is
>> > >> not
>> > >>>>> "really" optional like others, 2) syntax-wise it reads more
>> natural.
>> > >>>>>
>> > >>>>> I think it is okay to add the APIs in (
>> > >>>>>
>> > >>>>>
>> > >>
>> > https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr
>> c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
>> > >>>>> )
>> > >>>>> in KGroupedStream.
>> > >>>>>
>> > >>>>>
>> > >>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to
>> > pass
>> > >> in
>> > >>>>> the most-inner state store supplier (e.g. then one whose get()
>> return
>> > >>>>> RocksDBStore), or it is supposed to return the most-outer supplier
>> > with
>> > >>>>> logging / metrics / etc? I think it would be more useful to only
>> > >> require
>> > >>>>> users pass in the inner state store supplier while specifying
>> > caching /
>> > >>>>> logging through other APIs.
>> > >>>>>
>> > >>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to
>> me:
>> > we
>> > >>>> are
>> > >>>>> allowing users to call other APIs like "withQueryableName"
>> multiple
>> > >> time,
>> > >>>>> but only call "withStateStoreSupplier" only once in the end. Why
>> is
>> > >> that?
>> > >>>>>
>> > >>>>> 3. The current DSL seems to be only for aggregations, what about
>> > joins?
>> > >>>>>
>> > >>>>>
>> > >>>>> 4. I think it is okay to keep the "withLogConfig": for the
>> > >>>>> StateStoreSupplier it will still be user code specifying the
>> topology
>> > >> so
>> > >>>> I
>> > >>>>> do not see there is a big difference.
>> > >>>>>
>> > >>>>>
>> > >>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take
>> the
>> > >>>>> windowed state store supplier to enforce typing?
>> > >>>>>
>> > >>>>>
>> > >>>>> Below are minor ones:
>> > >>>>>
>> > >>>>> 6. "withQueryableName": maybe better "withQueryableStateName"?
>> > >>>>>
>> > >>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> Guozhang
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
>> > >> matthias@confluent.io>
>> > >>>>> wrote:
>> > >>>>>
>> > >>>>>> I see your point about "when to add the processor to the
>> topology".
>> > >>>> That
>> > >>>>>> is indeed an issue. Not sure it we could allow "updates" to the
>> > >>>>> topology...
>> > >>>>>> I don't see any problem with having all the withXX() in KTable
>> > >>>> interface
>> > >>>>>> -- but this might be subjective.
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> However, I don't understand your argument about putting
>> aggregate()
>> > >>>>>> after the withXX() -- all the calls to withXX() set optional
>> > >> parameters
>> > >>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>> > >>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
>> > might
>> > >>>>>> be quite confusion for developers.
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> -Matthias
>> > >>>>>>
>> > >>>>>> On 6/28/17 2:55 AM, Damian Guy wrote:
>> > >>>>>>>> I also think that mixing optional parameters with configs is a
>> bad
>> > >>>>> idea.
>> > >>>>>>>> Have not proposal for this atm but just wanted to mention it.
>> Hope
>> > >>>> to
>> > >>>>>>>> find some time to come up with something.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>> Yes, i don't like the mix of config either. But the only real
>> > config
>> > >>>>> here
>> > >>>>>>> is the logging config - which we don't really need as it can
>> > already
>> > >>>> be
>> > >>>>>>> done via a custom StateStoreSupplier.
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>> What I don't like in the current proposal is the
>> > >>>>>>>> .grouped().withKeyMapper() -- the current solution with
>> > >>>> .groupBy(...)
>> > >>>>>>>> and .groupByKey() seems better. For clarity, we could rename to
>> > >>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we should
>> > find
>> > >>>>>>>> some better names).
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>> it could be groupByKey(), groupBy() or something different bt
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>> The proposed pattern "chains" grouping and aggregation too
>> close
>> > >>>>>>>> together. I would rather separate both more than less, ie, do
>> into
>> > >>>> the
>> > >>>>>>>> opposite direction.
>> > >>>>>>>>
>> > >>>>>>>> I am also wondering, if we could so something more "fluent".
>> The
>> > >>>>> initial
>> > >>>>>>>> proposal was like:
>> > >>>>>>>>
>> > >>>>>>>>>> groupedStream.count()
>> > >>>>>>>>>>     .withStoreName("name")
>> > >>>>>>>>>>     .withCachingEnabled(false)
>> > >>>>>>>>>>     .withLoggingEnabled(config)
>> > >>>>>>>>>>     .table()
>> > >>>>>>>> The .table() statement in the end was kinda alien.
>> > >>>>>>>>
>> > >>>>>>> I agree, but then all of the withXXX methods need to be on
>> KTable
>> > >>>> which
>> > >>>>>> is
>> > >>>>>>> worse in my opinion. You also need something that is going to
>> > "build"
>> > >>>>> the
>> > >>>>>>> internal processors and add them to the topology.
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>> The current proposal put the count() into the end -- ie, the
>> > >>>> optional
>> > >>>>>>>> parameter for count() have to specified on the .grouped() call
>> --
>> > >>>> this
>> > >>>>>>>> does not seems to be the best way either.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>> I actually prefer this method as you are building a grouped
>> stream
>> > >>>> that
>> > >>>>>> you
>> > >>>>>>> will aggregate. So
>> > >>>> table.grouped(...).withOptionalStuff().aggregate(..)
>> > >>>>>> etc
>> > >>>>>>> seems natural to me.
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>> I did not think this through in detail, but can't we just do
>> the
>> > >>>>> initial
>> > >>>>>>>> proposal with the .table() ?
>> > >>>>>>>>
>> > >>>>>>>> groupedStream.count().withStoreName("name").mapValues(...)
>> > >>>>>>>>
>> > >>>>>>>> Each .withXXX(...) return the current KTable and all the
>> > .withXXX()
>> > >>>>> are
>> > >>>>>>>> just added to the KTable interface. Or do I miss anything why
>> this
>> > >>>>> wont'
>> > >>>>>>>> work or any obvious disadvantage?
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>> See above.
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>> -Matthias
>> > >>>>>>>>
>> > >>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote:
>> > >>>>>>>>> Thanks everyone. My latest attempt is below. It builds on the
>> > >>>> fluent
>> > >>>>>>>>> approach, but i think it is slightly nicer.
>> > >>>>>>>>> I agree with some of what Eno said about mixing configy stuff
>> in
>> > >>>> the
>> > >>>>>> DSL,
>> > >>>>>>>>> but i think that enabling caching and enabling logging are
>> things
>> > >>>>> that
>> > >>>>>>>>> aren't actually config. I'd probably not add
>> withLogConfig(...)
>> > >>>> (even
>> > >>>>>>>>> though it is below) as this is actually config and we already
>> > have
>> > >>>> a
>> > >>>>>> way
>> > >>>>>>>> of
>> > >>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could use
>> the
>> > >>>>>>>>> StateStoreSupplier for disabling caching etc, but as it stands
>> > that
>> > >>>>> is
>> > >>>>>> a
>> > >>>>>>>>> bit of a tedious process for someone that just wants to use
>> the
>> > >>>>> default
>> > >>>>>>>>> storage engine, but not have caching enabled.
>> > >>>>>>>>>
>> > >>>>>>>>> There is also an orthogonal concern that Guozhang alluded
>> to....
>> > If
>> > >>>>> you
>> > >>>>>>>>> want to plug in a custom storage engine and you want it to be
>> > >>>> logged
>> > >>>>>> etc,
>> > >>>>>>>>> you would currently need to implement that yourself. Ideally
>> we
>> > can
>> > >>>>>>>> provide
>> > >>>>>>>>> a way where we will wrap the custom store with logging,
>> metrics,
>> > >>>>> etc. I
>> > >>>>>>>>> need to think about where this fits, it is probably more
>> > >>>> appropriate
>> > >>>>> on
>> > >>>>>>>> the
>> > >>>>>>>>> Stores API.
>> > >>>>>>>>>
>> > >>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = null;
>> > >>>>>>>>> // count with mapped key
>> > >>>>>>>>> final KTable<Long, Long> count = stream.grouped()
>> > >>>>>>>>>          .withKeyMapper(keyMapper)
>> > >>>>>>>>>          .withKeySerde(Serdes.Long())
>> > >>>>>>>>>          .withValueSerde(Serdes.String())
>> > >>>>>>>>>          .withQueryableName("my-store")
>> > >>>>>>>>>          .count();
>> > >>>>>>>>>
>> > >>>>>>>>> // windowed count
>> > >>>>>>>>> final KTable<Windowed<String>, Long> windowedCount =
>> > >>>> stream.grouped()
>> > >>>>>>>>>          .withQueryableName("my-window-store")
>> > >>>>>>>>>          .windowed(TimeWindows.of(10L).until(10))
>> > >>>>>>>>>          .count();
>> > >>>>>>>>>
>> > >>>>>>>>> // windowed reduce
>> > >>>>>>>>> final Reducer<String> windowedReducer = null;
>> > >>>>>>>>> final KTable<Windowed<String>, String> windowedReduce =
>> > >>>>>> stream.grouped()
>> > >>>>>>>>>          .withQueryableName("my-window-store")
>> > >>>>>>>>>          .windowed(TimeWindows.of(10L).until(10))
>> > >>>>>>>>>          .reduce(windowedReducer);
>> > >>>>>>>>>
>> > >>>>>>>>> final Aggregator<String, String, Long> aggregator = null;
>> > >>>>>>>>> final Initializer<Long> init = null;
>> > >>>>>>>>>
>> > >>>>>>>>> // aggregate
>> > >>>>>>>>> final KTable<String, Long> aggregate = stream.grouped()
>> > >>>>>>>>>          .withQueryableName("my-aggregate-store")
>> > >>>>>>>>>          .aggregate(aggregator, init, Serdes.Long());
>> > >>>>>>>>>
>> > >>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>>
>> > >>>>>> stateStoreSupplier
>> > >>>>>>>> = null;
>> > >>>>>>>>> // aggregate with custom store
>> > >>>>>>>>> final KTable<String, Long> aggWithCustomStore =
>> stream.grouped()
>> > >>>>>>>>>          .withStateStoreSupplier(stateStoreSupplier)
>> > >>>>>>>>>          .aggregate(aggregator, init);
>> > >>>>>>>>>
>> > >>>>>>>>> // disable caching
>> > >>>>>>>>> stream.grouped()
>> > >>>>>>>>>          .withQueryableName("name")
>> > >>>>>>>>>          .withCachingEnabled(false)
>> > >>>>>>>>>          .count();
>> > >>>>>>>>>
>> > >>>>>>>>> // disable logging
>> > >>>>>>>>> stream.grouped()
>> > >>>>>>>>>          .withQueryableName("q")
>> > >>>>>>>>>          .withLoggingEnabled(false)
>> > >>>>>>>>>          .count();
>> > >>>>>>>>>
>> > >>>>>>>>> // override log config
>> > >>>>>>>>> final Reducer<String> reducer = null;
>> > >>>>>>>>> stream.grouped()
>> > >>>>>>>>>          .withLogConfig(Collections.sin
>> gletonMap("segment.size",
>> > >>>>> "10"))
>> > >>>>>>>>>          .reduce(reducer);
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> If anyone wants to play around with this you can find the code
>> > >>>> here:
>> > >>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment
>> > >>>>>>>>>
>> > >>>>>>>>> Note: It won't actually work as most of the methods just
>> return
>> > >>>> null.
>> > >>>>>>>>> Thanks,
>> > >>>>>>>>> Damian
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ismael@juma.me.uk>
>> > >>>> wrote:
>> > >>>>>>>>>> Thanks Damian. I think both options have pros and cons. And
>> both
>> > >>>> are
>> > >>>>>>>> better
>> > >>>>>>>>>> than overload abuse.
>> > >>>>>>>>>>
>> > >>>>>>>>>> The fluent API approach reads better, no mention of builder
>> or
>> > >>>> build
>> > >>>>>>>>>> anywhere. The main downside is that the method signatures
>> are a
>> > >>>>> little
>> > >>>>>>>> less
>> > >>>>>>>>>> clear. By reading the method signature, one doesn't
>> necessarily
>> > >>>>> knows
>> > >>>>>>>> what
>> > >>>>>>>>>> it returns. Also, one needs to figure out the special method
>> > >>>>>> (`table()`
>> > >>>>>>>> in
>> > >>>>>>>>>> this case) that gives you what you actually care about
>> (`KTable`
>> > >>>> in
>> > >>>>>> this
>> > >>>>>>>>>> case). Not major issues, but worth mentioning while doing the
>> > >>>>>>>> comparison.
>> > >>>>>>>>>> The builder approach avoids the issues mentioned above, but
>> it
>> > >>>>> doesn't
>> > >>>>>>>> read
>> > >>>>>>>>>> as well.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Ismael
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <
>> > damian.guy@gmail.com
>> > >>>>>>>> wrote:
>> > >>>>>>>>>>> Hi,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> I'd like to get a discussion going around some of the API
>> > choices
>> > >>>>>> we've
>> > >>>>>>>>>>> made in the DLS. In particular those that relate to stateful
>> > >>>>>> operations
>> > >>>>>>>>>>> (though this could expand).
>> > >>>>>>>>>>> As it stands we lean heavily on overloaded methods in the
>> API,
>> > >>>> i.e,
>> > >>>>>>>> there
>> > >>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming
>> > >>>> noisy
>> > >>>>>> and
>> > >>>>>>>> i
>> > >>>>>>>>>>> feel it is only going to get worse as we add more optional
>> > >>>> params.
>> > >>>>> In
>> > >>>>>>>>>>> particular we've had some requests to be able to turn
>> caching
>> > >>>> off,
>> > >>>>> or
>> > >>>>>>>>>>> change log configs,  on a per operator basis (note this can
>> be
>> > >>>> done
>> > >>>>>> now
>> > >>>>>>>>>> if
>> > >>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit
>> > >>>>> cumbersome).
>> > >>>>>>>>>>> So this is a bit of an open question. How can we change the
>> DSL
>> > >>>>>>>> overloads
>> > >>>>>>>>>>> so that it flows, is simple to use and understand, and is
>> > easily
>> > >>>>>>>> extended
>> > >>>>>>>>>>> in the future?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> One option would be to use a fluent API approach for
>> providing
>> > >>>> the
>> > >>>>>>>>>> optional
>> > >>>>>>>>>>> params, so something like this:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> groupedStream.count()
>> > >>>>>>>>>>>     .withStoreName("name")
>> > >>>>>>>>>>>     .withCachingEnabled(false)
>> > >>>>>>>>>>>     .withLoggingEnabled(config)
>> > >>>>>>>>>>>     .table()
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Another option would be to provide a Builder to the count
>> > method,
>> > >>>>> so
>> > >>>>>> it
>> > >>>>>>>>>>> would look something like this:
>> > >>>>>>>>>>> groupedStream.count(new
>> > >>>>>>>>>>> CountBuilder("storeName").with
>> CachingEnabled(false).build())
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Another option is to say: Hey we don't need this, what are
>> you
>> > on
>> > >>>>>>>> about!
>> > >>>>>>>>>>> The above has focussed on state store related overloads, but
>> > the
>> > >>>>> same
>> > >>>>>>>>>> ideas
>> > >>>>>>>>>>> could  be applied to joins etc, where we presently have many
>> > join
>> > >>>>>>>> methods
>> > >>>>>>>>>>> and many overloads.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Anyway, i look forward to hearing your opinions.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks,
>> > >>>>>>>>>>> Damian
>> > >>>>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>>> --
>> > >>>>> -- Guozhang
>> > >>>>>
>> > >>>>>
>> > >>>>
>> > >>
>> >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message