kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup
Date Thu, 08 Jun 2017 21:44:22 GMT
Note that although the internal `AbstractStoreSupplier` does maintain the
key-value serdes, we do not enforce the interface of `StateStoreSupplier`
to always retain that information, and hence we cannot assume that
StateStoreSuppliers always retain key / value serdes.

On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xavier@confluent.io> wrote:

> Another reason for the serde not to be in the first cogroup call, is that
> the serde should not be required if you pass a StateStoreSupplier to
> aggregate()
>
> Regarding the aggregated type <T> I don't the why initializer should be
> favored over aggregator to define the type. In my mind separating the
> initializer into the last aggregate call clearly indicates that the
> initializer is independent of any of the aggregators or streams and that we
> don't wait for grouped1 events to initialize the co-group.
>
> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > On a second thought... This is the current proposal API
> >
> >
> > ```
> >
> > <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer,
> final
> > Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
> > aggValueSerde)
> >
> > ```
> >
> >
> > If we do not have the initializer in the first co-group it might be a bit
> > awkward for users to specify the aggregator that returns a typed <T>
> value?
> > Maybe it is still better to put these two functions in the same api?
> >
> >
> >
> > Guozhang
> >
> > On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > This suggestion lgtm. I would vote for the first alternative than
> adding
> > > it to the `KStreamBuilder` though.
> > >
> > > On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xavier@confluent.io>
> > > wrote:
> > >
> > >> I have a minor suggestion to make the API a little bit more symmetric.
> > >> I feel it would make more sense to move the initializer and serde to
> the
> > >> final aggregate statement, since the serde only applies to the state
> > >> store,
> > >> and the initializer doesn't bear any relation to the first group in
> > >> particular. It would end up looking like this:
> > >>
> > >> KTable<K, CG> cogrouped =
> > >>     grouped1.cogroup(aggregator1)
> > >>             .cogroup(grouped2, aggregator2)
> > >>             .cogroup(grouped3, aggregator3)
> > >>             .aggregate(initializer1, aggValueSerde, storeName1);
> > >>
> > >> Alternatively, we could move the the first cogroup() method to
> > >> KStreamBuilder, similar to how we have .merge()
> > >> and end up with an api that would be even more symmetric.
> > >>
> > >> KStreamBuilder.cogroup(grouped1, aggregator1)
> > >>               .cogroup(grouped2, aggregator2)
> > >>               .cogroup(grouped3, aggregator3)
> > >>               .aggregate(initializer1, aggValueSerde, storeName1);
> > >>
> > >> This doesn't have to be a blocker, but I thought it would make the API
> > >> just
> > >> a tad cleaner.
> > >>
> > >> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >>
> > >> > Kyle,
> > >> >
> > >> > Thanks a lot for the updated KIP. It looks good to me.
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <jim@jagunet.com>
> > wrote:
> > >> >
> > >> > > This makes much more sense to me. +1
> > >> > >
> > >> > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
> > >> winkelman.kyle@gmail.com>
> > >> > > wrote:
> > >> > > >
> > >> > > > I have updated the KIP and my PR. Let me know what you think.
> > >> > > > To created a cogrouped stream just call cogroup on a
> > KgroupedStream
> > >> and
> > >> > > > supply the initializer, aggValueSerde, and an aggregator. Then
> > >> continue
> > >> > > > adding kgroupedstreams and aggregators. Then call one of the
> many
> > >> > > aggregate
> > >> > > > calls to create a KTable.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Kyle
> > >> > > >
> > >> > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian.guy@gmail.com>
> > wrote:
> > >> > > >
> > >> > > >> Hi Kyle,
> > >> > > >>
> > >> > > >> Thanks for the update. I think just one initializer makes sense
> > as
> > >> it
> > >> > > >> should only be called once per key and generally it is just
> going
> > >> to
> > >> > > create
> > >> > > >> a new instance of whatever the Aggregate class is.
> > >> > > >>
> > >> > > >> Cheers,
> > >> > > >> Damian
> > >> > > >>
> > >> > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
> > >> winkelman.kyle@gmail.com
> > >> > >
> > >> > > >> wrote:
> > >> > > >>
> > >> > > >>> Hello all,
> > >> > > >>>
> > >> > > >>> I have spent some more time on this and the best alternative I
> > >> have
> > >> > > come
> > >> > > >> up
> > >> > > >>> with is:
> > >> > > >>> KGroupedStream has a single cogroup call that takes an
> > initializer
> > >> > and
> > >> > > an
> > >> > > >>> aggregator.
> > >> > > >>> CogroupedKStream has a cogroup call that takes additional
> > >> > groupedStream
> > >> > > >>> aggregator pairs.
> > >> > > >>> CogroupedKStream has multiple aggregate methods that create
> the
> > >> > > different
> > >> > > >>> stores.
> > >> > > >>>
> > >> > > >>> I plan on updating the kip but I want people's input on if we
> > >> should
> > >> > > have
> > >> > > >>> the initializer be passed in once at the beginning or if we
> > should
> > >> > > >> instead
> > >> > > >>> have the initializer be required for each call to one of the
> > >> > aggregate
> > >> > > >>> calls. The first makes more sense to me but doesnt allow the
> > user
> > >> to
> > >> > > >>> specify different initializers for different tables.
> > >> > > >>>
> > >> > > >>> Thanks,
> > >> > > >>> Kyle
> > >> > > >>>
> > >> > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
> > >> winkelman.kyle@gmail.com>
> > >> > > >>> wrote:
> > >> > > >>>
> > >> > > >>>> Yea I really like that idea I'll see what I can do to update
> > the
> > >> kip
> > >> > > >> and
> > >> > > >>>> my pr when I have some time. I'm not sure how well creating
> the
> > >> > > >>>> kstreamaggregates will go though because at that point I will
> > >> have
> > >> > > >> thrown
> > >> > > >>>> away the type of the values. It will be type safe I just may
> > >> need to
> > >> > > >> do a
> > >> > > >>>> little forcing.
> > >> > > >>>>
> > >> > > >>>> Thanks,
> > >> > > >>>> Kyle
> > >> > > >>>>
> > >> > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangguoz@gmail.com
> >
> > >> > wrote:
> > >> > > >>>>
> > >> > > >>>>> Kyle,
> > >> > > >>>>>
> > >> > > >>>>> Thanks for the explanations, my previous read on the wiki
> > >> examples
> > >> > > was
> > >> > > >>>>> wrong.
> > >> > > >>>>>
> > >> > > >>>>> So I guess my motivation should be "reduced" to: can we move
> > the
> > >> > > >> window
> > >> > > >>>>> specs param from "KGroupedStream#cogroup(..)" to
> > >> > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
> > >> > > >>>>>
> > >> > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream
> > from
> > >> 3
> > >> > to
> > >> > > >> 2.
> > >> > > >>>>> 2. major: this is for extensibility of the APIs, and since
> we
> > >> are
> > >> > > >>> removing
> > >> > > >>>>> the "Evolving" annotations on Streams it may be harder to
> > >> change it
> > >> > > >>> again
> > >> > > >>>>> in the future. The extended use cases are that people wanted
> > to
> > >> > have
> > >> > > >>>>> windowed running aggregates on different granularities, e.g.
> > >> "give
> > >> > me
> > >> > > >>> the
> > >> > > >>>>> counts per-minute, per-hour, per-day and per-week", and
> today
> > in
> > >> > DSL
> > >> > > >> we
> > >> > > >>>>> need to specify that case in multiple aggregate operators,
> > which
> > >> > gets
> > >> > > >> a
> > >> > > >>>>> state store / changelog, etc. And it is possible to optimize
> > it
> > >> as
> > >> > > >> well
> > >> > > >>> to
> > >> > > >>>>> a single state store. Its implementation would be tricky as
> > you
> > >> > need
> > >> > > >> to
> > >> > > >>>>> contain different lengthed windows within your window store
> > but
> > >> > just
> > >> > > >>> from
> > >> > > >>>>> the public API point of view, it could be specified as:
> > >> > > >>>>>
> > >> > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
> > >> > > >>>>> "state-store-name");
> > >> > > >>>>>
> > >> > > >>>>> table1 = stream.aggregate(/*per-minute window*/)
> > >> > > >>>>> table2 = stream.aggregate(/*per-hour window*/)
> > >> > > >>>>> table3 = stream.aggregate(/*per-day window*/)
> > >> > > >>>>>
> > >> > > >>>>> while underlying we are only using a single store
> > >> > "state-store-name"
> > >> > > >> for
> > >> > > >>>>> it.
> > >> > > >>>>>
> > >> > > >>>>>
> > >> > > >>>>> Although this feature is out of the scope of this KIP, I'd
> > like
> > >> to
> > >> > > >>> discuss
> > >> > > >>>>> if we can "leave the door open" to make such changes without
> > >> > > modifying
> > >> > > >>> the
> > >> > > >>>>> public APIs .
> > >> > > >>>>>
> > >> > > >>>>> Guozhang
> > >> > > >>>>>
> > >> > > >>>>>
> > >> > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> > >> > > >>> winkelman.kyle@gmail.com
> > >> > > >>>>>>
> > >> > > >>>>> wrote:
> > >> > > >>>>>
> > >> > > >>>>>> I allow defining a single window/sessionwindow one time
> when
> > >> you
> > >> > > >> make
> > >> > > >>>>> the
> > >> > > >>>>>> cogroup call from a KGroupedStream. From then on you are
> > using
> > >> the
> > >> > > >>>>> cogroup
> > >> > > >>>>>> call from with in CogroupedKStream which doesnt accept any
> > >> > > >> additional
> > >> > > >>>>>> windows/sessionwindows.
> > >> > > >>>>>>
> > >> > > >>>>>> Is this what you meant by your question or did I
> > misunderstand?
> > >> > > >>>>>>
> > >> > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <
> wangguoz@gmail.com
> > >
> > >> > > >> wrote:
> > >> > > >>>>>>
> > >> > > >>>>>> Another question that came to me is on "window alignment":
> > from
> > >> > the
> > >> > > >>> KIP
> > >> > > >>>>> it
> > >> > > >>>>>> seems you are allowing users to specify a (potentially
> > >> different)
> > >> > > >>> window
> > >> > > >>>>>> spec in each co-grouped input stream. So if these window
> > specs
> > >> are
> > >> > > >>>>>> different how should we "align" them with different input
> > >> > streams? I
> > >> > > >>>>> think
> > >> > > >>>>>> it is more natural to only specify on window spec in the
> > >> > > >>>>>>
> > >> > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>> And remove it from the cogroup() functions. WDYT?
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>> Guozhang
> > >> > > >>>>>>
> > >> > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
> > >> > wangguoz@gmail.com>
> > >> > > >>>>> wrote:
> > >> > > >>>>>>
> > >> > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use
> > case
> > >> to
> > >> > > >>>>> support
> > >> > > >>>>>>> such multi-way table join (i.e. N source tables with N
> > >> aggregate
> > >> > > >>> func)
> > >> > > >>>>>> with
> > >> > > >>>>>>> a single store and N+1 serdes, I have seen lots of people
> > >> using
> > >> > > >> the
> > >> > > >>>>>>> low-level PAPI to achieve this goal.
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> > >> > > >>>>>> winkelman.kyle@gmail.com
> > >> > > >>>>>>>> wrote:
> > >> > > >>>>>>>
> > >> > > >>>>>>>> I like your point about not handling other cases such as
> > >> count
> > >> > > >> and
> > >> > > >>>>>> reduce.
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> I think that reduce may not make sense because reduce
> > assumes
> > >> > > >> that
> > >> > > >>>>> the
> > >> > > >>>>>>>> input values are the same as the output values. With
> > cogroup
> > >> > > >> there
> > >> > > >>>>> may
> > >> > > >>>>>> be
> > >> > > >>>>>>>> multiple different input types and then your output type
> > >> cant be
> > >> > > >>>>>> multiple
> > >> > > >>>>>>>> different things. In the case where you have all matching
> > >> value
> > >> > > >>> types
> > >> > > >>>>>> you
> > >> > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce.
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> As for count I think it is possible to call count on all
> > the
> > >> > > >>>>> individual
> > >> > > >>>>>>>> grouped streams and then do joins. Otherwise we could
> maybe
> > >> make
> > >> > > >> a
> > >> > > >>>>>> special
> > >> > > >>>>>>>> call in groupedstream for this case. Because in this case
> > we
> > >> > dont
> > >> > > >>>>> need
> > >> > > >>>>>> to
> > >> > > >>>>>>>> do type checking on the values. It could be similar to
> the
> > >> > > >> current
> > >> > > >>>>> count
> > >> > > >>>>>>>> methods but accept a var args of additonal grouped
> streams
> > as
> > >> > > >> well
> > >> > > >>>>> and
> > >> > > >>>>>>>> make
> > >> > > >>>>>>>> sure they have a key type of K.
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> The way I have put the kip together is to ensure that we
> do
> > >> type
> > >> > > >>>>>> checking.
> > >> > > >>>>>>>> I don't see a way we could group them all first and then
> > >> make a
> > >> > > >>> call
> > >> > > >>>>> to
> > >> > > >>>>>>>> count, reduce, or aggregate because with aggregate they
> > would
> > >> > > >> need
> > >> > > >>> to
> > >> > > >>>>>> pass
> > >> > > >>>>>>>> a list of aggregators and we would have no way of type
> > >> checking
> > >> > > >>> that
> > >> > > >>>>>> they
> > >> > > >>>>>>>> match the grouped streams.
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> Thanks,
> > >> > > >>>>>>>> Kyle
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <
> > >> xavier@confluent.io>
> > >> > > >>>>> wrote:
> > >> > > >>>>>>>>
> > >> > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a
> > very
> > >> > > >>> useful
> > >> > > >>>>>>>>> addition and wanted to provide an additional use-case
> and
> > >> some
> > >> > > >>> more
> > >> > > >>>>>>>>> comments.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> This is actually a very common analytics use-case in the
> > >> > > >> ad-tech
> > >> > > >>>>>>>> industry.
> > >> > > >>>>>>>>> The typical setup will have an auction stream, an
> > impression
> > >> > > >>>>> stream,
> > >> > > >>>>>>>> and a
> > >> > > >>>>>>>>> click stream. Those three streams need to be combined to
> > >> > > >> compute
> > >> > > >>>>>>>> aggregate
> > >> > > >>>>>>>>> statistics (e.g. impression statistics, and
> click-through
> > >> > > >> rates),
> > >> > > >>>>>> since
> > >> > > >>>>>>>>> most of the attributes of interest are only present the
> > >> auction
> > >> > > >>>>>> stream.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> A simple way to do this is to co-group all the streams
> by
> > >> the
> > >> > > >>>>> auction
> > >> > > >>>>>>>> key,
> > >> > > >>>>>>>>> and process updates to the co-group as events for each
> > >> stream
> > >> > > >>> come
> > >> > > >>>>> in,
> > >> > > >>>>>>>>> keeping only one value from each stream before sending
> > >> > > >> downstream
> > >> > > >>>>> for
> > >> > > >>>>>>>>> further processing / aggregation.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> One could view the result of that co-group operation as
> a
> > >> > > >>> "KTable"
> > >> > > >>>>>> with
> > >> > > >>>>>>>>> multiple values per key. The key being the grouping key,
> > and
> > >> > > >> the
> > >> > > >>>>>> values
> > >> > > >>>>>>>>> consisting of one value per stream.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> What I like about Kyle's approach is that allows elegant
> > >> > > >>>>> co-grouping
> > >> > > >>>>>> of
> > >> > > >>>>>>>>> multiple streams without having to worry about the
> number
> > of
> > >> > > >>>>> streams,
> > >> > > >>>>>>>> and
> > >> > > >>>>>>>>> avoids dealing with Tuple types or other generic
> > interfaces
> > >> > > >> that
> > >> > > >>>>> could
> > >> > > >>>>>>>> get
> > >> > > >>>>>>>>> messy if we wanted to preserve all the value types in
> the
> > >> > > >>> resulting
> > >> > > >>>>>>>>> co-grouped stream.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> My only concern is that we only allow the cogroup +
> > >> aggregate
> > >> > > >>>>> combined
> > >> > > >>>>>>>>> operation. This forces the user to build their own tuple
> > >> > > >>>>> serialization
> > >> > > >>>>>>>>> format if they want to preserve the individual input
> > stream
> > >> > > >>> values
> > >> > > >>>>> as
> > >> > > >>>>>> a
> > >> > > >>>>>>>>> group. It also deviates quite a bit from our approach in
> > >> > > >>>>>> KGroupedStream
> > >> > > >>>>>>>>> which offers other operations, such as count and reduce,
> > >> which
> > >> > > >>>>> should
> > >> > > >>>>>>>> also
> > >> > > >>>>>>>>> be applicable to a co-grouped stream.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> Overall I still think this is a really useful addition,
> > but
> > >> I
> > >> > > >>> feel
> > >> > > >>>>> we
> > >> > > >>>>>>>>> haven't spend much time trying to explore alternative
> DSLs
> > >> that
> > >> > > >>>>> could
> > >> > > >>>>>>>> maybe
> > >> > > >>>>>>>>> generalize better or match our existing syntax more
> > closely.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
> > >> > > >>>>>> winkelman.kyle@gmail.com
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> wrote:
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>>> Eno, is there anyone else that is an expert in the
> kafka
> > >> > > >>> streams
> > >> > > >>>>>> realm
> > >> > > >>>>>>>>> that
> > >> > > >>>>>>>>>> I should reach out to for input?
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing
> this
> > >> more
> > >> > > >>> in
> > >> > > >>>>>> depth
> > >> > > >>>>>>>>> so I
> > >> > > >>>>>>>>>> will wait for his inputs before continuing.
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <
> > >> > > >> eno.thereska@gmail.com
> > >> > > >>>>
> > >> > > >>>>>>>> wrote:
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>>> Thanks Kyle, good arguments.
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>> Eno
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> > >> > > >>>>>>>> winkelman.kyle@gmail.com
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to
> > what
> > >> > > >>>>> Jay’s
> > >> > > >>>>>>>>> example
> > >> > > >>>>>>>>>>> is,
> > >> > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this
> super
> > >> > > >>>>>> concrete?*
> > >> > > >>>>>>>>>>>> I have added a more concrete example to the KIP.
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> *- my main concern is that we’re exposing this
> > >> > > >> optimization
> > >> > > >>>>> to
> > >> > > >>>>>> the
> > >> > > >>>>>>>>> DSL.
> > >> > > >>>>>>>>>>> In
> > >> > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing
> > DSL
> > >> > > >>> and
> > >> > > >>>>> do
> > >> > > >>>>>>>> the
> > >> > > >>>>>>>>>> right
> > >> > > >>>>>>>>>>>> thing under the covers (create just one state store,
> > >> > > >>> arrange
> > >> > > >>>>> the
> > >> > > >>>>>>>>> nodes
> > >> > > >>>>>>>>>>>> etc). The original DSL had a bunch of small,
> composable
> > >> > > >>>>> pieces
> > >> > > >>>>>>>>> (group,
> > >> > > >>>>>>>>>>>> aggregate, join) that this proposal groups together.
> > I’d
> > >> > > >>>>> like to
> > >> > > >>>>>>>> hear
> > >> > > >>>>>>>>>>> your
> > >> > > >>>>>>>>>>>> thoughts on whether it’s possible to do this
> > optimization
> > >> > > >>>>> with
> > >> > > >>>>>> the
> > >> > > >>>>>>>>>>> current
> > >> > > >>>>>>>>>>>> DSL, at the topology builder level.*
> > >> > > >>>>>>>>>>>> You would have to make a lot of checks to understand
> if
> > >> > > >> it
> > >> > > >>> is
> > >> > > >>>>>> even
> > >> > > >>>>>>>>>>> possible
> > >> > > >>>>>>>>>>>> to make this optimization:
> > >> > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins
> > >> > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for
> > anything
> > >> > > >>>>> else.
> > >> > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This
> may
> > be
> > >> > > >>>>>>>> impossible
> > >> > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the
> > >> > > >>> topology
> > >> > > >>>>> has
> > >> > > >>>>>>>>>> already
> > >> > > >>>>>>>>>>>> been built.)
> > >> > > >>>>>>>>>>>> You would then need to make decisions during the
> > >> > > >>>>> optimization:
> > >> > > >>>>>>>>>>>> 1. Your new initializer would the composite of all
> the
> > >> > > >>>>>> individual
> > >> > > >>>>>>>>>>>> initializers and the valueJoiners.
> > >> > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you
> would
> > >> > > >>> turn
> > >> > > >>>>> the
> > >> > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that
> > >> > > >> would
> > >> > > >>>>> work
> > >> > > >>>>>> on
> > >> > > >>>>>>>>> the
> > >> > > >>>>>>>>>>>> final object, but this may be possible.
> > >> > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared
> > >> > > >> would
> > >> > > >>>>> be
> > >> > > >>>>>> for
> > >> > > >>>>>>>>> the
> > >> > > >>>>>>>>>>>> aggregate values. None of the declared ones would be
> > >> > > >>>>> guaranteed
> > >> > > >>>>>> to
> > >> > > >>>>>>>>> hold
> > >> > > >>>>>>>>>>> the
> > >> > > >>>>>>>>>>>> final object. This would mean you must created a new
> > >> > > >> state
> > >> > > >>>>> store
> > >> > > >>>>>>>> and
> > >> > > >>>>>>>>>> not
> > >> > > >>>>>>>>>>>> created any of the declared ones.
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> The main argument I have against it is even if it
> could
> > >> > > >> be
> > >> > > >>>>> done
> > >> > > >>>>>> I
> > >> > > >>>>>>>>> don't
> > >> > > >>>>>>>>>>>> know that we would want to have this be an
> optimization
> > >> > > >> in
> > >> > > >>>>> the
> > >> > > >>>>>>>>>> background
> > >> > > >>>>>>>>>>>> because the user would still be required to think
> about
> > >> > > >> all
> > >> > > >>>>> of
> > >> > > >>>>>> the
> > >> > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry
> > >> > > >> about
> > >> > > >>>>> if
> > >> > > >>>>>>>> they
> > >> > > >>>>>>>>>> only
> > >> > > >>>>>>>>>>>> care about the final object.
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it
> > >> > > >>> should
> > >> > > >>>>> be
> > >> > > >>>>>>>> part
> > >> > > >>>>>>>>> of
> > >> > > >>>>>>>>>>> the
> > >> > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we
> > >> > > >> want
> > >> > > >>> to
> > >> > > >>>>>>>> allow
> > >> > > >>>>>>>>>>> people
> > >> > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way.
> > Right
> > >> > > >>>>> now I
> > >> > > >>>>>>>> don't
> > >> > > >>>>>>>>>>> think
> > >> > > >>>>>>>>>>>> we give them ways of handling this use case easily.
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> *-I think there will be scope for several such
> > >> > > >>> optimizations
> > >> > > >>>>> in
> > >> > > >>>>>>>> the
> > >> > > >>>>>>>>>>> future
> > >> > > >>>>>>>>>>>> and perhaps at some point we need to think about
> > >> > > >> decoupling
> > >> > > >>>>> the
> > >> > > >>>>>>>> 1:1
> > >> > > >>>>>>>>>>> mapping
> > >> > > >>>>>>>>>>>> from the DSL into the physical topology.*
> > >> > > >>>>>>>>>>>> I would argue that cogroup is not just an
> optimization
> > it
> > >> > > >>> is
> > >> > > >>>>> a
> > >> > > >>>>>> new
> > >> > > >>>>>>>>> way
> > >> > > >>>>>>>>>>> for
> > >> > > >>>>>>>>>>>> the users to look at accomplishing a problem that
> > >> > > >> requires
> > >> > > >>>>>>>> multiple
> > >> > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't
> > >> > > >> think
> > >> > > >>>>>> users
> > >> > > >>>>>>>>>> should
> > >> > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal
> with
> > >> > > >>> their
> > >> > > >>>>>>>>>>> initializers,
> > >> > > >>>>>>>>>>>> serdes and stores if all they care about is the final
> > >> > > >>> object.
> > >> > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't
> > >> > > >> supply
> > >> > > >>>>>>>> additional
> > >> > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a
> > >> > > >> single
> > >> > > >>>>>>>> grouped
> > >> > > >>>>>>>>>>> stream
> > >> > > >>>>>>>>>>>> making an aggregate call. This case is what I view an
> > >> > > >>>>>> optimization
> > >> > > >>>>>>>>> as,
> > >> > > >>>>>>>>>> we
> > >> > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there
> was
> > >> > > >>> just
> > >> > > >>>>> a
> > >> > > >>>>>>>> call
> > >> > > >>>>>>>>> to
> > >> > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling
> > >> > > >>>>>>>> KGroupedStream#cogroup.
> > >> > > >>>>>>>>> (I
> > >> > > >>>>>>>>>>>> would prefer to just write a warning saying that this
> > is
> > >> > > >>> not
> > >> > > >>>>> how
> > >> > > >>>>>>>>>> cogroup
> > >> > > >>>>>>>>>>> is
> > >> > > >>>>>>>>>>>> to be used.)
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> Thanks,
> > >> > > >>>>>>>>>>>> Kyle
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> > >> > > >>>>>>>> eno.thereska@gmail.com
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Hi Kyle,
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments:
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to
> > what
> > >> > > >>>>> Jay’s
> > >> > > >>>>>>>>> example
> > >> > > >>>>>>>>>>> is,
> > >> > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this
> > super
> > >> > > >>>>>> concrete?
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> - my main concern is that we’re exposing this
> > >> > > >> optimization
> > >> > > >>>>> to
> > >> > > >>>>>> the
> > >> > > >>>>>>>>> DSL.
> > >> > > >>>>>>>>>>> In
> > >> > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing
> > DSL
> > >> > > >>>>> and do
> > >> > > >>>>>>>> the
> > >> > > >>>>>>>>>>> right
> > >> > > >>>>>>>>>>>>> thing under the covers (create just one state store,
> > >> > > >>> arrange
> > >> > > >>>>>> the
> > >> > > >>>>>>>>> nodes
> > >> > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small,
> > composable
> > >> > > >>>>> pieces
> > >> > > >>>>>>>>> (group,
> > >> > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together.
> > I’d
> > >> > > >>>>> like
> > >> > > >>>>>> to
> > >> > > >>>>>>>>> hear
> > >> > > >>>>>>>>>>> your
> > >> > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this
> > >> > > >> optimization
> > >> > > >>>>> with
> > >> > > >>>>>>>> the
> > >> > > >>>>>>>>>>> current
> > >> > > >>>>>>>>>>>>> DSL, at the topology builder level.
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> I think there will be scope for several such
> > >> > > >> optimizations
> > >> > > >>>>> in
> > >> > > >>>>>> the
> > >> > > >>>>>>>>>> future
> > >> > > >>>>>>>>>>>>> and perhaps at some point we need to think about
> > >> > > >>> decoupling
> > >> > > >>>>> the
> > >> > > >>>>>>>> 1:1
> > >> > > >>>>>>>>>>> mapping
> > >> > > >>>>>>>>>>>>> from the DSL into the physical topology.
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Thanks
> > >> > > >>>>>>>>>>>>> Eno
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <
> > >> > > >> jay@confluent.io>
> > >> > > >>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is
> > >> > > >>> pretty
> > >> > > >>>>>>>> common.
> > >> > > >>>>>>>>> An
> > >> > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified
> > >> > > >> customer
> > >> > > >>>>>>>> profile"
> > >> > > >>>>>>>>> use
> > >> > > >>>>>>>>>>>>> case
> > >> > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen
> > systems
> > >> > > >>>>> each
> > >> > > >>>>>> of
> > >> > > >>>>>>>>> which
> > >> > > >>>>>>>>>>> has
> > >> > > >>>>>>>>>>>>>> some information about your customer (account
> > details,
> > >> > > >>>>>> settings,
> > >> > > >>>>>>>>>>> billing
> > >> > > >>>>>>>>>>>>>> info, customer service contacts, purchase history,
> > >> > > >> etc).
> > >> > > >>>>> Your
> > >> > > >>>>>>>> goal
> > >> > > >>>>>>>>> is
> > >> > > >>>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>> join/munge these into a single profile record for
> > each
> > >> > > >>>>>> customer
> > >> > > >>>>>>>>> that
> > >> > > >>>>>>>>>>> has
> > >> > > >>>>>>>>>>>>>> all the relevant info in a usable form and is
> > >> > > >> up-to-date
> > >> > > >>>>> with
> > >> > > >>>>>>>> all
> > >> > > >>>>>>>>> the
> > >> > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams
> > as
> > >> > > >> a
> > >> > > >>>>>>>> sequence
> > >> > > >>>>>>>>> of
> > >> > > >>>>>>>>>>>>> joins
> > >> > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1
> intermediate
> > >> > > >>>>> tables.
> > >> > > >>>>>>>> But
> > >> > > >>>>>>>>>>> clearly
> > >> > > >>>>>>>>>>>>>> you only need a single stage to group all these
> > things
> > >> > > >>> that
> > >> > > >>>>>> are
> > >> > > >>>>>>>>>> already
> > >> > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do
> this
> > >> > > >>> under
> > >> > > >>>>> the
> > >> > > >>>>>>>>> covers
> > >> > > >>>>>>>>>>>>> which
> > >> > > >>>>>>>>>>>>>> is arguably better (at least when it does the right
> > >> > > >>> thing)
> > >> > > >>>>> and
> > >> > > >>>>>>>>>> perhaps
> > >> > > >>>>>>>>>>> we
> > >> > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know
> the
> > >> > > >>>>>>>> partitioning
> > >> > > >>>>>>>>> so
> > >> > > >>>>>>>>>> we
> > >> > > >>>>>>>>>>>>> may
> > >> > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they
> > are
> > >> > > >>>>>> already
> > >> > > >>>>>>>>>>>>>> co-partitioned.
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> -Jay
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> > >> > > >>>>>>>>>>> winkelman.kyle@gmail.com
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>> Yea thats a good way to look at it.
> > >> > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple
> > >> > > >> other
> > >> > > >>>>>>>> platforms
> > >> > > >>>>>>>>>>> like
> > >> > > >>>>>>>>>>>>>>> spark and pig.
> > >> > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/
> 0.6.2/api/core/spark/
> > >> > > >>>>>>>>>>>>> PairRDDFunctions.html
> > >> > > >>>>>>>>>>>>>>>
> > https://www.tutorialspoint.com/apache_pig/apache_pig_
> > >> > > >>>>>>>>>>>>> cogroup_operator.htm
> > >> > > >>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <
> > >> > > >>>>> damian.guy@gmail.com>
> > >> > > >>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>> Hi Kyle,
> > >> > > >>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way
> > >> > > >> outer
> > >> > > >>>>>> join?
> > >> > > >>>>>>>> So
> > >> > > >>>>>>>>> an
> > >> > > >>>>>>>>>>>>> input
> > >> > > >>>>>>>>>>>>>>>> on any stream will always produce a new
> aggregated
> > >> > > >>> value
> > >> > > >>>>> -
> > >> > > >>>>>> is
> > >> > > >>>>>>>>> that
> > >> > > >>>>>>>>>>>>>>> correct?
> > >> > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the
> > >> > > >> current
> > >> > > >>>>>> value,
> > >> > > >>>>>>>>>>>>> aggregates
> > >> > > >>>>>>>>>>>>>>>> and forwards the result.
> > >> > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit
> > more,
> > >> > > >>>>> but it
> > >> > > >>>>>>>>> seems
> > >> > > >>>>>>>>>>> like
> > >> > > >>>>>>>>>>>>>>> it
> > >> > > >>>>>>>>>>>>>>>> could be a useful optimization.
> > >> > > >>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> > >> > > >>>>>>>>>> winkelman.kyle@gmail.com
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> I sure can. I have added the following
> description
> > >> > > >> to
> > >> > > >>> my
> > >> > > >>>>>>>> KIP. If
> > >> > > >>>>>>>>>>> this
> > >> > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some
> more
> > >> > > >>> time
> > >> > > >>>>> to
> > >> > > >>>>>>>>> build a
> > >> > > >>>>>>>>>>>>>>>> diagram
> > >> > > >>>>>>>>>>>>>>>>> and make more of a step by step description:
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> Example with Current API:
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 =
> > >> > > >>>>>>>>>>>>>>>>> builder.stream("topic1").group
> ByKey().aggregate(
> > >> > > >>>>>> initializer1
> > >> > > >>>>>>>> ,
> > >> > > >>>>>>>>>>>>>>>> aggregator1,
> > >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1);
> > >> > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 =
> > >> > > >>>>>>>>>>>>>>>>> builder.stream("topic2").group
> ByKey().aggregate(
> > >> > > >>>>>> initializer2
> > >> > > >>>>>>>> ,
> > >> > > >>>>>>>>>>>>>>>> aggregator2,
> > >> > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2);
> > >> > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 =
> > >> > > >>>>>>>>>>>>>>>>> builder.stream("topic3").group
> ByKey().aggregate(
> > >> > > >>>>>> initializer3
> > >> > > >>>>>>>> ,
> > >> > > >>>>>>>>>>>>>>>> aggregator3,
> > >> > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3);
> > >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
> table1.outerJoin(table2,
> > >> > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3,
> > >> > > >>>>> joinerOneTwoAndThree);
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores,
> > requires
> > >> > > >> 3
> > >> > > >>>>>>>>>> initializers,
> > >> > > >>>>>>>>>>>>>>> and 3
> > >> > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to
> > user
> > >> > > >> to
> > >> > > >>>>>> define
> > >> > > >>>>>>>>> what
> > >> > > >>>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2,
> V3).
> > >> > > >> They
> > >> > > >>>>> are
> > >> > > >>>>>>>> left
> > >> > > >>>>>>>>>>> with a
> > >> > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all
> > the
> > >> > > >>>>> same
> > >> > > >>>>>> as
> > >> > > >>>>>>>> CG
> > >> > > >>>>>>>>>> and
> > >> > > >>>>>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>> two
> > >> > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make
> them
> > >> > > >>>>>>>> intermediate
> > >> > > >>>>>>>>>>> states
> > >> > > >>>>>>>>>>>>>>>> such
> > >> > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the
> > >> > > >> joiners
> > >> > > >>>>> use
> > >> > > >>>>>>>> those
> > >> > > >>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>>> build
> > >> > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something
> > the
> > >> > > >>> user
> > >> > > >>>>>>>> could
> > >> > > >>>>>>>>>> avoid
> > >> > > >>>>>>>>>>>>>>>>> thinking about with this KIP.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> > >> > > >> will
> > >> > > >>>>> first
> > >> > > >>>>>>>> go
> > >> > > >>>>>>>>>>> through
> > >> > > >>>>>>>>>>>>>>> a
> > >> > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate
> > from
> > >> > > >>>>>>>> storeName1.
> > >> > > >>>>>>>>>> It
> > >> > > >>>>>>>>>>>>>>> will
> > >> > > >>>>>>>>>>>>>>>>> produce this in the form of the first
> intermediate
> > >> > > >>> value
> > >> > > >>>>>> and
> > >> > > >>>>>>>> get
> > >> > > >>>>>>>>>>> sent
> > >> > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will
> look
> > >> > > >> up
> > >> > > >>>>> the
> > >> > > >>>>>>>>> current
> > >> > > >>>>>>>>>>>>> value
> > >> > > >>>>>>>>>>>>>>>> of
> > >> > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first
> > joiner
> > >> > > >> to
> > >> > > >>>>>>>> calculate
> > >> > > >>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>> second
> > >> > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an
> > >> > > >>> additional
> > >> > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the
> > >> > > >>> current
> > >> > > >>>>>>>> value of
> > >> > > >>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>> key
> > >> > > >>>>>>>>>>>>>>>> in
> > >> > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build
> the
> > >> > > >>> final
> > >> > > >>>>>>>>> aggregate
> > >> > > >>>>>>>>>>>>>>> value.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> If you think through all possibilities for
> > incoming
> > >> > > >>>>> topics
> > >> > > >>>>>>>> you
> > >> > > >>>>>>>>>> will
> > >> > > >>>>>>>>>>>>> see
> > >> > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through
> all
> > >> > > >>> three
> > >> > > >>>>>>>> stores
> > >> > > >>>>>>>>>> are
> > >> > > >>>>>>>>>>>>>>>> queried
> > >> > > >>>>>>>>>>>>>>>>> and all of the joiners must get used.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this
> creates
> > N
> > >> > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1)
> KTableKTableOuterJoins,
> > >> > > >> and
> > >> > > >>>>> N-1
> > >> > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> Example with Proposed API:
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 =
> > >> > > >>>>> builder.stream("topic1").
> > >> > > >>>>>>>>>>>>>>> groupByKey();
> > >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 =
> > >> > > >>>>> builder.stream("topic2").
> > >> > > >>>>>>>>>>>>>>> groupByKey();
> > >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 =
> > >> > > >>>>> builder.stream("topic3").
> > >> > > >>>>>>>>>>>>>>> groupByKey();
> > >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
> > >> > > >>> grouped1.cogroup(initializer1,
> > >> > > >>>>>>>>>>> aggregator1,
> > >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1)
> > >> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped2, aggregator2)
> > >> > > >>>>>>>>>>>>>>>>>      .cogroup(grouped3, aggregator3)
> > >> > > >>>>>>>>>>>>>>>>>      .aggregate();
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore,
> > requires 1
> > >> > > >>>>>>>>> initializer,
> > >> > > >>>>>>>>>>> and
> > >> > > >>>>>>>>>>>>> 1
> > >> > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry
> > about
> > >> > > >>> the
> > >> > > >>>>>>>>>>> intermediate
> > >> > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think
> > about
> > >> > > >>> is
> > >> > > >>>>> how
> > >> > > >>>>>>>> each
> > >> > > >>>>>>>>>>>>> stream
> > >> > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> > >> > > >> will
> > >> > > >>>>> first
> > >> > > >>>>>>>> go
> > >> > > >>>>>>>>>>> through
> > >> > > >>>>>>>>>>>>>>> a
> > >> > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate
> > from
> > >> > > >>>>>>>> storeName1.
> > >> > > >>>>>>>>>> It
> > >> > > >>>>>>>>>>>>>>> will
> > >> > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update
> > the
> > >> > > >>>>> store
> > >> > > >>>>>>>> and
> > >> > > >>>>>>>>>> pass
> > >> > > >>>>>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>> new
> > >> > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through
> the
> > >> > > >>>>>>>> KStreamCogroup
> > >> > > >>>>>>>>>>> which
> > >> > > >>>>>>>>>>>>>>> is
> > >> > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and
> you
> > >> > > >> are
> > >> > > >>>>> done.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api
> > >> > > >> will
> > >> > > >>>>> only
> > >> > > >>>>>>>> every
> > >> > > >>>>>>>>>>>>>>> create N
> > >> > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax
> <
> > >> > > >>>>>>>>>>>>> matthias@confluent.io
> > >> > > >>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>> Kyle,
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little
> > slow,
> > >> > > >>>>> but I
> > >> > > >>>>>>>> could
> > >> > > >>>>>>>>>> not
> > >> > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more
> > >> > > >>> concrete
> > >> > > >>>>>>>> example,
> > >> > > >>>>>>>>>>> like
> > >> > > >>>>>>>>>>>>>>> 3
> > >> > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected
> > result),
> > >> > > >>> and
> > >> > > >>>>>> show
> > >> > > >>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>>>> difference between current way to to implement
> it
> > >> > > >> and
> > >> > > >>>>> the
> > >> > > >>>>>>>>>> proposed
> > >> > > >>>>>>>>>>>>>>> API?
> > >> > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing
> to
> > >> > > >> see
> > >> > > >>>>> what
> > >> > > >>>>>>>> store
> > >> > > >>>>>>>>>>> calls
> > >> > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc.
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you
> propose,
> > >> > > >> and
> > >> > > >>> it
> > >> > > >>>>>>>> would
> > >> > > >>>>>>>>>> help
> > >> > > >>>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>>>>>> understand it better.
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>> Thanks a lot!
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>> -Matthias
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> > >> > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found
> > here.
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my
> > >> > > >> classes
> > >> > > >>>>> and
> > >> > > >>>>>> get
> > >> > > >>>>>>>>>> around
> > >> > > >>>>>>>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api
> > >> > > >> additions.
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the
> > >> > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate
> > >> > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream#
> > >> > > >>>>>>>>> repartitionIfRequired
> > >> > > >>>>>>>>>>>>>>>> method.
> > >> > > >>>>>>>>>>>>>>>>> I
> > >> > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more
> than
> > >> > > >> one
> > >> > > >>>>>>>> grouped
> > >> > > >>>>>>>>>>> stream
> > >> > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some
> > >> > > >> name
> > >> > > >>>>> that
> > >> > > >>>>>>>>> someone
> > >> > > >>>>>>>>>>>>>>> can
> > >> > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow
> > it
> > >> > > >> to
> > >> > > >>>>> fall
> > >> > > >>>>>>>> back
> > >> > > >>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name?
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped
> > tables?
> > >> > > >>> This
> > >> > > >>>>>>>> would
> > >> > > >>>>>>>>> be
> > >> > > >>>>>>>>>>>>>>>> pretty
> > >> > > >>>>>>>>>>>>>>>>>> easy
> > >> > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing
> session
> > >> > > >>> stores
> > >> > > >>>>>> and
> > >> > > >>>>>>>>>>> windowed
> > >> > > >>>>>>>>>>>>>>>>>> stores
> > >> > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate
> and
> > >> > > >>>>>>>>>>>>>>> KTableWindowAggregate
> > >> > > >>>>>>>>>>>>>>>>>>> implementations.
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>> Thanks,
> > >> > > >>>>>>>>>>>>>>>>>>> Kyle
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> > >> > > >>>>>>>>> eno.thereska@gmail.com>
> > >> > > >>>>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
> > >> > > >>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>> Eno
> > >> > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> > >> > > >>>>>>>>> damian.guy@gmail.com>
> > >> > > >>>>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle,
> > >> > > >>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i
> haven't
> > >> > > >> had
> > >> > > >>>>> the
> > >> > > >>>>>>>>> chance
> > >> > > >>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>>>> look
> > >> > > >>>>>>>>>>>>>>>>>> at
> > >> > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to
> > look
> > >> > > >>>>> into
> > >> > > >>>>>> it
> > >> > > >>>>>>>>>>>>>>> tomorrow.
> > >> > > >>>>>>>>>>>>>>>>> For
> > >> > > >>>>>>>>>>>>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against
> > kafka
> > >> > > >>>>> trunk
> > >> > > >>>>>>>> and
> > >> > > >>>>>>>>>> mark
> > >> > > >>>>>>>>>>>>>>> it
> > >> > > >>>>>>>>>>>>>>>> as
> > >> > > >>>>>>>>>>>>>>>>>>>> WIP?
> > >> > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have
> > done.
> > >> > > >>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>> Thanks,
> > >> > > >>>>>>>>>>>>>>>>>>>>> Damian
> > >> > > >>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> > >> > > >>>>>>>>>>>>>>>> winkelman.kyle@gmail.com
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw
> > >> > > >> some
> > >> > > >>>>>>>> attention
> > >> > > >>>>>>>>> to
> > >> > > >>>>>>>>>>> my
> > >> > > >>>>>>>>>>>>>>>> KIP
> > >> > > >>>>>>>>>>>>>>>>>> as
> > >> > > >>>>>>>>>>>>>>>>>>>> I
> > >> > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days.
> > >> > > >> This
> > >> > > >>>>> is my
> > >> > > >>>>>>>>> first
> > >> > > >>>>>>>>>>> KIP
> > >> > > >>>>>>>>>>>>>>>> and
> > >> > > >>>>>>>>>>>>>>>>>> my
> > >> > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so
> > I'm
> > >> > > >>>>> sure I
> > >> > > >>>>>>>> did
> > >> > > >>>>>>>>>>>>>>>> something
> > >> > > >>>>>>>>>>>>>>>>>>>> wrong.
> > >> > > >>>>>>>>>>>>>>>>>>>>>> ;)
> > >> > > >>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> > >> > > >>>>>>>>>>>>>>>> winkelman.kyle@gmail.com>
> > >> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> Hello all,
> > >> > > >>>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate
> > >> > > >> discussion
> > >> > > >>>>> about
> > >> > > >>>>>>>>> adding
> > >> > > >>>>>>>>>>>>>>>>> cogroup
> > >> > > >>>>>>>>>>>>>>>>>> to
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL.
> > >> > > >>>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here:
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > >> > > >>>>>> confluence/display/KAFKA/KIP-
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> > >> > > >>>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation
> here:
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> > >> > > >>>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >> > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman
> > >> > > >>>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>> --
> > >> > > >>>>>>> -- Guozhang
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>> --
> > >> > > >>>>>> -- Guozhang
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>>>
> > >> > > >>>>>
> > >> > > >>>>> --
> > >> > > >>>>> -- Guozhang
> > >> > > >>>>>
> > >> > > >>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message