kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup
Date Thu, 01 Jun 2017 09:03:00 GMT
Hi Kyle,

Thanks for the update. I think just one initializer makes sense as it
should only be called once per key and generally it is just going to create
a new instance of whatever the Aggregate class is.

Cheers,
Damian

On Wed, 31 May 2017 at 20:09 Kyle Winkelman <winkelman.kyle@gmail.com>
wrote:

> Hello all,
>
> I have spent some more time on this and the best alternative I have come up
> with is:
> KGroupedStream has a single cogroup call that takes an initializer and an
> aggregator.
> CogroupedKStream has a cogroup call that takes additional groupedStream
> aggregator pairs.
> CogroupedKStream has multiple aggregate methods that create the different
> stores.
>
> I plan on updating the kip but I want people's input on if we should have
> the initializer be passed in once at the beginning or if we should instead
> have the initializer be required for each call to one of the aggregate
> calls. The first makes more sense to me but doesnt allow the user to
> specify different initializers for different tables.
>
> Thanks,
> Kyle
>
> On May 24, 2017 7:46 PM, "Kyle Winkelman" <winkelman.kyle@gmail.com>
> wrote:
>
> > Yea I really like that idea I'll see what I can do to update the kip and
> > my pr when I have some time. I'm not sure how well creating the
> > kstreamaggregates will go though because at that point I will have thrown
> > away the type of the values. It will be type safe I just may need to do a
> > little forcing.
> >
> > Thanks,
> > Kyle
> >
> > On May 24, 2017 3:28 PM, "Guozhang Wang" <wangguoz@gmail.com> wrote:
> >
> >> Kyle,
> >>
> >> Thanks for the explanations, my previous read on the wiki examples was
> >> wrong.
> >>
> >> So I guess my motivation should be "reduced" to: can we move the window
> >> specs param from "KGroupedStream#cogroup(..)" to
> >> "CogroupedKStream#aggregate(..)", and my motivations are:
> >>
> >> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to 2.
> >> 2. major: this is for extensibility of the APIs, and since we are
> removing
> >> the "Evolving" annotations on Streams it may be harder to change it
> again
> >> in the future. The extended use cases are that people wanted to have
> >> windowed running aggregates on different granularities, e.g. "give me
> the
> >> counts per-minute, per-hour, per-day and per-week", and today in DSL we
> >> need to specify that case in multiple aggregate operators, which gets a
> >> state store / changelog, etc. And it is possible to optimize it as well
> to
> >> a single state store. Its implementation would be tricky as you need to
> >> contain different lengthed windows within your window store but just
> from
> >> the public API point of view, it could be specified as:
> >>
> >> CogroupedKStream stream = stream1.cogroup(stream2, ...
> >> "state-store-name");
> >>
> >> table1 = stream.aggregate(/*per-minute window*/)
> >> table2 = stream.aggregate(/*per-hour window*/)
> >> table3 = stream.aggregate(/*per-day window*/)
> >>
> >> while underlying we are only using a single store "state-store-name" for
> >> it.
> >>
> >>
> >> Although this feature is out of the scope of this KIP, I'd like to
> discuss
> >> if we can "leave the door open" to make such changes without modifying
> the
> >> public APIs .
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> winkelman.kyle@gmail.com
> >> >
> >> wrote:
> >>
> >> > I allow defining a single window/sessionwindow one time when you make
> >> the
> >> > cogroup call from a KGroupedStream. From then on you are using the
> >> cogroup
> >> > call from with in CogroupedKStream which doesnt accept any additional
> >> > windows/sessionwindows.
> >> >
> >> > Is this what you meant by your question or did I misunderstand?
> >> >
> >> > On May 23, 2017 9:33 PM, "Guozhang Wang" <wangguoz@gmail.com> wrote:
> >> >
> >> > Another question that came to me is on "window alignment": from the
> KIP
> >> it
> >> > seems you are allowing users to specify a (potentially different)
> window
> >> > spec in each co-grouped input stream. So if these window specs are
> >> > different how should we "align" them with different input streams? I
> >> think
> >> > it is more natural to only specify on window spec in the
> >> >
> >> > KTable<RK, V> CogroupedKStream#aggregate(Windows);
> >> >
> >> >
> >> > And remove it from the cogroup() functions. WDYT?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >> >
> >> > > Thanks for the proposal Kyle, this is a quite common use case to
> >> support
> >> > > such multi-way table join (i.e. N source tables with N aggregate
> func)
> >> > with
> >> > > a single store and N+1 serdes, I have seen lots of people using the
> >> > > low-level PAPI to achieve this goal.
> >> > >
> >> > >
> >> > > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> >> > winkelman.kyle@gmail.com
> >> > > > wrote:
> >> > >
> >> > >> I like your point about not handling other cases such as count and
> >> > reduce.
> >> > >>
> >> > >> I think that reduce may not make sense because reduce assumes that
> >> the
> >> > >> input values are the same as the output values. With cogroup there
> >> may
> >> > be
> >> > >> multiple different input types and then your output type cant be
> >> > multiple
> >> > >> different things. In the case where you have all matching value
> types
> >> > you
> >> > >> can do KStreamBuilder#merge followed by the reduce.
> >> > >>
> >> > >> As for count I think it is possible to call count on all the
> >> individual
> >> > >> grouped streams and then do joins. Otherwise we could maybe make a
> >> > special
> >> > >> call in groupedstream for this case. Because in this case we dont
> >> need
> >> > to
> >> > >> do type checking on the values. It could be similar to the current
> >> count
> >> > >> methods but accept a var args of additonal grouped streams as well
> >> and
> >> > >> make
> >> > >> sure they have a key type of K.
> >> > >>
> >> > >> The way I have put the kip together is to ensure that we do type
> >> > checking.
> >> > >> I don't see a way we could group them all first and then make a
> call
> >> to
> >> > >> count, reduce, or aggregate because with aggregate they would need
> to
> >> > pass
> >> > >> a list of aggregators and we would have no way of type checking
> that
> >> > they
> >> > >> match the grouped streams.
> >> > >>
> >> > >> Thanks,
> >> > >> Kyle
> >> > >>
> >> > >> On May 19, 2017 11:42 AM, "Xavier Léauté" <xavier@confluent.io>
> >> wrote:
> >> > >>
> >> > >> > Sorry to jump on this thread so late. I agree this is a very
> useful
> >> > >> > addition and wanted to provide an additional use-case and some
> more
> >> > >> > comments.
> >> > >> >
> >> > >> > This is actually a very common analytics use-case in the ad-tech
> >> > >> industry.
> >> > >> > The typical setup will have an auction stream, an impression
> >> stream,
> >> > >> and a
> >> > >> > click stream. Those three streams need to be combined to compute
> >> > >> aggregate
> >> > >> > statistics (e.g. impression statistics, and click-through rates),
> >> > since
> >> > >> > most of the attributes of interest are only present the auction
> >> > stream.
> >> > >> >
> >> > >> > A simple way to do this is to co-group all the streams by the
> >> auction
> >> > >> key,
> >> > >> > and process updates to the co-group as events for each stream
> come
> >> in,
> >> > >> > keeping only one value from each stream before sending downstream
> >> for
> >> > >> > further processing / aggregation.
> >> > >> >
> >> > >> > One could view the result of that co-group operation as a
> "KTable"
> >> > with
> >> > >> > multiple values per key. The key being the grouping key, and the
> >> > values
> >> > >> > consisting of one value per stream.
> >> > >> >
> >> > >> > What I like about Kyle's approach is that allows elegant
> >> co-grouping
> >> > of
> >> > >> > multiple streams without having to worry about the number of
> >> streams,
> >> > >> and
> >> > >> > avoids dealing with Tuple types or other generic interfaces that
> >> could
> >> > >> get
> >> > >> > messy if we wanted to preserve all the value types in the
> resulting
> >> > >> > co-grouped stream.
> >> > >> >
> >> > >> > My only concern is that we only allow the cogroup + aggregate
> >> combined
> >> > >> > operation. This forces the user to build their own tuple
> >> serialization
> >> > >> > format if they want to preserve the individual input stream
> values
> >> as
> >> > a
> >> > >> > group. It also deviates quite a bit from our approach in
> >> > KGroupedStream
> >> > >> > which offers other operations, such as count and reduce, which
> >> should
> >> > >> also
> >> > >> > be applicable to a co-grouped stream.
> >> > >> >
> >> > >> > Overall I still think this is a really useful addition, but I
> feel
> >> we
> >> > >> > haven't spend much time trying to explore alternative DSLs that
> >> could
> >> > >> maybe
> >> > >> > generalize better or match our existing syntax more closely.
> >> > >> >
> >> > >> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
> >> > winkelman.kyle@gmail.com
> >> > >> >
> >> > >> > wrote:
> >> > >> >
> >> > >> > > Eno, is there anyone else that is an expert in the kafka
> streams
> >> > realm
> >> > >> > that
> >> > >> > > I should reach out to for input?
> >> > >> > >
> >> > >> > > I believe Damian Guy is still planning on reviewing this more
> in
> >> > depth
> >> > >> > so I
> >> > >> > > will wait for his inputs before continuing.
> >> > >> > >
> >> > >> > > On May 9, 2017 7:30 AM, "Eno Thereska" <eno.thereska@gmail.com
> >
> >> > >> wrote:
> >> > >> > >
> >> > >> > > > Thanks Kyle, good arguments.
> >> > >> > > >
> >> > >> > > > Eno
> >> > >> > > >
> >> > >> > > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> >> > >> winkelman.kyle@gmail.com
> >> > >> > >
> >> > >> > > > wrote:
> >> > >> > > > >
> >> > >> > > > > *- minor: could you add an exact example (similar to what
> >> Jay’s
> >> > >> > example
> >> > >> > > > is,
> >> > >> > > > > or like your Spark/Pig pointers had) to make this super
> >> > concrete?*
> >> > >> > > > > I have added a more concrete example to the KIP.
> >> > >> > > > >
> >> > >> > > > > *- my main concern is that we’re exposing this optimization
> >> to
> >> > the
> >> > >> > DSL.
> >> > >> > > > In
> >> > >> > > > > an ideal world, an optimizer would take the existing DSL
> and
> >> do
> >> > >> the
> >> > >> > > right
> >> > >> > > > > thing under the covers (create just one state store,
> arrange
> >> the
> >> > >> > nodes
> >> > >> > > > > etc). The original DSL had a bunch of small, composable
> >> pieces
> >> > >> > (group,
> >> > >> > > > > aggregate, join) that this proposal groups together. I’d
> >> like to
> >> > >> hear
> >> > >> > > > your
> >> > >> > > > > thoughts on whether it’s possible to do this optimization
> >> with
> >> > the
> >> > >> > > > current
> >> > >> > > > > DSL, at the topology builder level.*
> >> > >> > > > > You would have to make a lot of checks to understand if it
> is
> >> > even
> >> > >> > > > possible
> >> > >> > > > > to make this optimization:
> >> > >> > > > > 1. Make sure they are all KTableKTableOuterJoins
> >> > >> > > > > 2. None of the intermediate KTables are used for anything
> >> else.
> >> > >> > > > > 3. None of the intermediate stores are used. (This may be
> >> > >> impossible
> >> > >> > > > > especially if they use KafkaStreams#store after the
> topology
> >> has
> >> > >> > > already
> >> > >> > > > > been built.)
> >> > >> > > > > You would then need to make decisions during the
> >> optimization:
> >> > >> > > > > 1. Your new initializer would the composite of all the
> >> > individual
> >> > >> > > > > initializers and the valueJoiners.
> >> > >> > > > > 2. I am having a hard time thinking about how you would
> turn
> >> the
> >> > >> > > > > aggregators and valueJoiners into an aggregator that would
> >> work
> >> > on
> >> > >> > the
> >> > >> > > > > final object, but this may be possible.
> >> > >> > > > > 3. Which state store would you use? The ones declared would
> >> be
> >> > for
> >> > >> > the
> >> > >> > > > > aggregate values. None of the declared ones would be
> >> guaranteed
> >> > to
> >> > >> > hold
> >> > >> > > > the
> >> > >> > > > > final object. This would mean you must created a new state
> >> store
> >> > >> and
> >> > >> > > not
> >> > >> > > > > created any of the declared ones.
> >> > >> > > > >
> >> > >> > > > > The main argument I have against it is even if it could be
> >> done
> >> > I
> >> > >> > don't
> >> > >> > > > > know that we would want to have this be an optimization in
> >> the
> >> > >> > > background
> >> > >> > > > > because the user would still be required to think about all
> >> of
> >> > the
> >> > >> > > > > intermediate values that they shouldn't need to worry about
> >> if
> >> > >> they
> >> > >> > > only
> >> > >> > > > > care about the final object.
> >> > >> > > > >
> >> > >> > > > > In my opinion cogroup is a common enough case that it
> should
> >> be
> >> > >> part
> >> > >> > of
> >> > >> > > > the
> >> > >> > > > > composable pieces (group, aggregate, join) because we want
> to
> >> > >> allow
> >> > >> > > > people
> >> > >> > > > > to join more than 2 or more streams in an easy way. Right
> >> now I
> >> > >> don't
> >> > >> > > > think
> >> > >> > > > > we give them ways of handling this use case easily.
> >> > >> > > > >
> >> > >> > > > > *-I think there will be scope for several such
> optimizations
> >> in
> >> > >> the
> >> > >> > > > future
> >> > >> > > > > and perhaps at some point we need to think about decoupling
> >> the
> >> > >> 1:1
> >> > >> > > > mapping
> >> > >> > > > > from the DSL into the physical topology.*
> >> > >> > > > > I would argue that cogroup is not just an optimization it
> is
> >> a
> >> > new
> >> > >> > way
> >> > >> > > > for
> >> > >> > > > > the users to look at accomplishing a problem that requires
> >> > >> multiple
> >> > >> > > > > streams. I may sound like a broken record but I don't think
> >> > users
> >> > >> > > should
> >> > >> > > > > have to build the N-1 intermediate tables and deal with
> their
> >> > >> > > > initializers,
> >> > >> > > > > serdes and stores if all they care about is the final
> object.
> >> > >> > > > > Now if for example someone uses cogroup but doesn't supply
> >> > >> additional
> >> > >> > > > > streams and aggregators this case is equivalent to a single
> >> > >> grouped
> >> > >> > > > stream
> >> > >> > > > > making an aggregate call. This case is what I view an
> >> > optimization
> >> > >> > as,
> >> > >> > > we
> >> > >> > > > > could remove the KStreamCogroup and act as if there was
> just
> >> a
> >> > >> call
> >> > >> > to
> >> > >> > > > > KGroupedStream#aggregate instead of calling
> >> > >> KGroupedStream#cogroup.
> >> > >> > (I
> >> > >> > > > > would prefer to just write a warning saying that this is
> not
> >> how
> >> > >> > > cogroup
> >> > >> > > > is
> >> > >> > > > > to be used.)
> >> > >> > > > >
> >> > >> > > > > Thanks,
> >> > >> > > > > Kyle
> >> > >> > > > >
> >> > >> > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> >> > >> eno.thereska@gmail.com
> >> > >> > >
> >> > >> > > > wrote:
> >> > >> > > > >
> >> > >> > > > >> Hi Kyle,
> >> > >> > > > >>
> >> > >> > > > >> Thanks for the KIP again. A couple of comments:
> >> > >> > > > >>
> >> > >> > > > >> - minor: could you add an exact example (similar to what
> >> Jay’s
> >> > >> > example
> >> > >> > > > is,
> >> > >> > > > >> or like your Spark/Pig pointers had) to make this super
> >> > concrete?
> >> > >> > > > >>
> >> > >> > > > >> - my main concern is that we’re exposing this optimization
> >> to
> >> > the
> >> > >> > DSL.
> >> > >> > > > In
> >> > >> > > > >> an ideal world, an optimizer would take the existing DSL
> >> and do
> >> > >> the
> >> > >> > > > right
> >> > >> > > > >> thing under the covers (create just one state store,
> arrange
> >> > the
> >> > >> > nodes
> >> > >> > > > >> etc). The original DSL had a bunch of small, composable
> >> pieces
> >> > >> > (group,
> >> > >> > > > >> aggregate, join) that this proposal groups together. I’d
> >> like
> >> > to
> >> > >> > hear
> >> > >> > > > your
> >> > >> > > > >> thoughts on whether it’s possible to do this optimization
> >> with
> >> > >> the
> >> > >> > > > current
> >> > >> > > > >> DSL, at the topology builder level.
> >> > >> > > > >>
> >> > >> > > > >> I think there will be scope for several such optimizations
> >> in
> >> > the
> >> > >> > > future
> >> > >> > > > >> and perhaps at some point we need to think about
> decoupling
> >> the
> >> > >> 1:1
> >> > >> > > > mapping
> >> > >> > > > >> from the DSL into the physical topology.
> >> > >> > > > >>
> >> > >> > > > >> Thanks
> >> > >> > > > >> Eno
> >> > >> > > > >>
> >> > >> > > > >>> On May 5, 2017, at 4:39 PM, Jay Kreps <jay@confluent.io>
> >> > wrote:
> >> > >> > > > >>>
> >> > >> > > > >>> I haven't digested the proposal but the use case is
> pretty
> >> > >> common.
> >> > >> > An
> >> > >> > > > >>> example would be the "customer 360" or "unified customer
> >> > >> profile"
> >> > >> > use
> >> > >> > > > >> case
> >> > >> > > > >>> we often use. In that use case you have a dozen systems
> >> each
> >> > of
> >> > >> > which
> >> > >> > > > has
> >> > >> > > > >>> some information about your customer (account details,
> >> > settings,
> >> > >> > > > billing
> >> > >> > > > >>> info, customer service contacts, purchase history, etc).
> >> Your
> >> > >> goal
> >> > >> > is
> >> > >> > > > to
> >> > >> > > > >>> join/munge these into a single profile record for each
> >> > customer
> >> > >> > that
> >> > >> > > > has
> >> > >> > > > >>> all the relevant info in a usable form and is up-to-date
> >> with
> >> > >> all
> >> > >> > the
> >> > >> > > > >>> source systems. If you implement that with kstreams as a
> >> > >> sequence
> >> > >> > of
> >> > >> > > > >> joins
> >> > >> > > > >>> i think today we'd fully materialize N-1 intermediate
> >> tables.
> >> > >> But
> >> > >> > > > clearly
> >> > >> > > > >>> you only need a single stage to group all these things
> that
> >> > are
> >> > >> > > already
> >> > >> > > > >>> co-partitioned. A distributed database would do this
> under
> >> the
> >> > >> > covers
> >> > >> > > > >> which
> >> > >> > > > >>> is arguably better (at least when it does the right
> thing)
> >> and
> >> > >> > > perhaps
> >> > >> > > > we
> >> > >> > > > >>> could do the same thing but I'm not sure we know the
> >> > >> partitioning
> >> > >> > so
> >> > >> > > we
> >> > >> > > > >> may
> >> > >> > > > >>> need an explicit cogroup command that impllies they are
> >> > already
> >> > >> > > > >>> co-partitioned.
> >> > >> > > > >>>
> >> > >> > > > >>> -Jay
> >> > >> > > > >>>
> >> > >> > > > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> >> > >> > > > winkelman.kyle@gmail.com
> >> > >> > > > >>>
> >> > >> > > > >>> wrote:
> >> > >> > > > >>>
> >> > >> > > > >>>> Yea thats a good way to look at it.
> >> > >> > > > >>>> I have seen this type of functionality in a couple other
> >> > >> platforms
> >> > >> > > > like
> >> > >> > > > >>>> spark and pig.
> >> > >> > > > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> >> > >> > > > >> PairRDDFunctions.html
> >> > >> > > > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> >> > >> > > > >> cogroup_operator.htm
> >> > >> > > > >>>>
> >> > >> > > > >>>>
> >> > >> > > > >>>> On May 5, 2017 7:43 AM, "Damian Guy" <
> >> damian.guy@gmail.com>
> >> > >> > wrote:
> >> > >> > > > >>>>
> >> > >> > > > >>>>> Hi Kyle,
> >> > >> > > > >>>>>
> >> > >> > > > >>>>> If i'm reading this correctly it is like an N way outer
> >> > join?
> >> > >> So
> >> > >> > an
> >> > >> > > > >> input
> >> > >> > > > >>>>> on any stream will always produce a new aggregated
> value
> >> -
> >> > is
> >> > >> > that
> >> > >> > > > >>>> correct?
> >> > >> > > > >>>>> Effectively, each Aggregator just looks up the current
> >> > value,
> >> > >> > > > >> aggregates
> >> > >> > > > >>>>> and forwards the result.
> >> > >> > > > >>>>> I need to look into it and think about it a bit more,
> >> but it
> >> > >> > seems
> >> > >> > > > like
> >> > >> > > > >>>> it
> >> > >> > > > >>>>> could be a useful optimization.
> >> > >> > > > >>>>>
> >> > >> > > > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> >> > >> > > winkelman.kyle@gmail.com
> >> > >> > > > >
> >> > >> > > > >>>>> wrote:
> >> > >> > > > >>>>>
> >> > >> > > > >>>>>> I sure can. I have added the following description to
> my
> >> > >> KIP. If
> >> > >> > > > this
> >> > >> > > > >>>>>> doesn't help let me know and I will take some more
> time
> >> to
> >> > >> > build a
> >> > >> > > > >>>>> diagram
> >> > >> > > > >>>>>> and make more of a step by step description:
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> Example with Current API:
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> KTable<K, V1> table1 =
> >> > >> > > > >>>>>> builder.stream("topic1").groupByKey().aggregate(
> >> > initializer1
> >> > >> ,
> >> > >> > > > >>>>> aggregator1,
> >> > >> > > > >>>>>> aggValueSerde1, storeName1);
> >> > >> > > > >>>>>> KTable<K, V2> table2 =
> >> > >> > > > >>>>>> builder.stream("topic2").groupByKey().aggregate(
> >> > initializer2
> >> > >> ,
> >> > >> > > > >>>>> aggregator2,
> >> > >> > > > >>>>>> aggValueSerde2, storeName2);
> >> > >> > > > >>>>>> KTable<K, V3> table3 =
> >> > >> > > > >>>>>> builder.stream("topic3").groupByKey().aggregate(
> >> > initializer3
> >> > >> ,
> >> > >> > > > >>>>> aggregator3,
> >> > >> > > > >>>>>> aggValueSerde3, storeName3);
> >> > >> > > > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> >> > >> > > > >>>>>> joinerOneAndTwo).outerJoin(table3,
> >> joinerOneTwoAndThree);
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> As you can see this creates 3 StateStores, requires 3
> >> > >> > > initializers,
> >> > >> > > > >>>> and 3
> >> > >> > > > >>>>>> aggValueSerdes. This also adds the pressure to user to
> >> > define
> >> > >> > what
> >> > >> > > > the
> >> > >> > > > >>>>>> intermediate values are going to be (V1, V2, V3). They
> >> are
> >> > >> left
> >> > >> > > > with a
> >> > >> > > > >>>>>> couple choices, first to make V1, V2, and V3 all the
> >> same
> >> > as
> >> > >> CG
> >> > >> > > and
> >> > >> > > > >> the
> >> > >> > > > >>>>> two
> >> > >> > > > >>>>>> joiners are more like mergers, or second make them
> >> > >> intermediate
> >> > >> > > > states
> >> > >> > > > >>>>> such
> >> > >> > > > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners
> >> use
> >> > >> those
> >> > >> > > to
> >> > >> > > > >>>> build
> >> > >> > > > >>>>>> the final aggregate CG value. This is something the
> user
> >> > >> could
> >> > >> > > avoid
> >> > >> > > > >>>>>> thinking about with this KIP.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it will
> >> first
> >> > >> go
> >> > >> > > > through
> >> > >> > > > >>>> a
> >> > >> > > > >>>>>> KStreamAggregate grabbing the current aggregate from
> >> > >> storeName1.
> >> > >> > > It
> >> > >> > > > >>>> will
> >> > >> > > > >>>>>> produce this in the form of the first intermediate
> value
> >> > and
> >> > >> get
> >> > >> > > > sent
> >> > >> > > > >>>>>> through a KTableKTableOuterJoin where it will look up
> >> the
> >> > >> > current
> >> > >> > > > >> value
> >> > >> > > > >>>>> of
> >> > >> > > > >>>>>> the key in storeName2. It will use the first joiner to
> >> > >> calculate
> >> > >> > > the
> >> > >> > > > >>>>> second
> >> > >> > > > >>>>>> intermediate value, which will go through an
> additional
> >> > >> > > > >>>>>> KTableKTableOuterJoin. Here it will look up the
> current
> >> > >> value of
> >> > >> > > the
> >> > >> > > > >>>> key
> >> > >> > > > >>>>> in
> >> > >> > > > >>>>>> storeName3 and use the second joiner to build the
> final
> >> > >> > aggregate
> >> > >> > > > >>>> value.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> If you think through all possibilities for incoming
> >> topics
> >> > >> you
> >> > >> > > will
> >> > >> > > > >> see
> >> > >> > > > >>>>>> that no matter which topic it comes in through all
> three
> >> > >> stores
> >> > >> > > are
> >> > >> > > > >>>>> queried
> >> > >> > > > >>>>>> and all of the joiners must get used.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> Topology wise for N incoming streams this creates N
> >> > >> > > > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and
> >> N-1
> >> > >> > > > >>>>>> KTableKTableJoinMergers.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> Example with Proposed API:
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> KGroupedStream<K, V1> grouped1 =
> >> builder.stream("topic1").
> >> > >> > > > >>>> groupByKey();
> >> > >> > > > >>>>>> KGroupedStream<K, V2> grouped2 =
> >> builder.stream("topic2").
> >> > >> > > > >>>> groupByKey();
> >> > >> > > > >>>>>> KGroupedStream<K, V3> grouped3 =
> >> builder.stream("topic3").
> >> > >> > > > >>>> groupByKey();
> >> > >> > > > >>>>>> KTable<K, CG> cogrouped =
> grouped1.cogroup(initializer1,
> >> > >> > > > aggregator1,
> >> > >> > > > >>>>>> aggValueSerde1, storeName1)
> >> > >> > > > >>>>>>       .cogroup(grouped2, aggregator2)
> >> > >> > > > >>>>>>       .cogroup(grouped3, aggregator3)
> >> > >> > > > >>>>>>       .aggregate();
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> As you can see this creates 1 StateStore, requires 1
> >> > >> > initializer,
> >> > >> > > > and
> >> > >> > > > >> 1
> >> > >> > > > >>>>>> aggValueSerde. The user no longer has to worry about
> the
> >> > >> > > > intermediate
> >> > >> > > > >>>>>> values and the joiners. All they have to think about
> is
> >> how
> >> > >> each
> >> > >> > > > >> stream
> >> > >> > > > >>>>>> impacts the creation of the final CG object.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it will
> >> first
> >> > >> go
> >> > >> > > > through
> >> > >> > > > >>>> a
> >> > >> > > > >>>>>> KStreamAggreagte and grab the current aggregate from
> >> > >> storeName1.
> >> > >> > > It
> >> > >> > > > >>>> will
> >> > >> > > > >>>>>> add its incoming object to the aggregate, update the
> >> store
> >> > >> and
> >> > >> > > pass
> >> > >> > > > >> the
> >> > >> > > > >>>>> new
> >> > >> > > > >>>>>> aggregate on. This new aggregate goes through the
> >> > >> KStreamCogroup
> >> > >> > > > which
> >> > >> > > > >>>> is
> >> > >> > > > >>>>>> pretty much just a pass through processor and you are
> >> done.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> Topology wise for N incoming streams the new api will
> >> only
> >> > >> every
> >> > >> > > > >>>> create N
> >> > >> > > > >>>>>> KStreamAggregates and 1 KStreamCogroup.
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> >> > >> > > > >> matthias@confluent.io
> >> > >> > > > >>>>>
> >> > >> > > > >>>>>> wrote:
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>>> Kyle,
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow,
> >> but I
> >> > >> could
> >> > >> > > not
> >> > >> > > > >>>>>>> follow completely. Could you maybe add a more
> concrete
> >> > >> example,
> >> > >> > > > like
> >> > >> > > > >>>> 3
> >> > >> > > > >>>>>>> streams with 3 records each (plus expected result),
> and
> >> > show
> >> > >> > the
> >> > >> > > > >>>>>>> difference between current way to to implement it and
> >> the
> >> > >> > > proposed
> >> > >> > > > >>>> API?
> >> > >> > > > >>>>>>> This could also cover the internal processing to see
> >> what
> >> > >> store
> >> > >> > > > calls
> >> > >> > > > >>>>>>> would be required for both approaches etc.
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>> I think, it's pretty advanced stuff you propose, and
> it
> >> > >> would
> >> > >> > > help
> >> > >> > > > to
> >> > >> > > > >>>>>>> understand it better.
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>> Thanks a lot!
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>> -Matthias
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> >> > >> > > > >>>>>>>> I have made a pull request. It can be found here.
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>> https://github.com/apache/kafka/pull/2975
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>> I plan to write some more unit tests for my classes
> >> and
> >> > get
> >> > >> > > around
> >> > >> > > > >>>> to
> >> > >> > > > >>>>>>>> writing documentation for the public api additions.
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>> One thing I was curious about is during the
> >> > >> > > > >>>>>>> KCogroupedStreamImpl#aggregate
> >> > >> > > > >>>>>>>> method I pass null to the KGroupedStream#
> >> > >> > repartitionIfRequired
> >> > >> > > > >>>>> method.
> >> > >> > > > >>>>>> I
> >> > >> > > > >>>>>>>> can't supply the store name because if more than one
> >> > >> grouped
> >> > >> > > > stream
> >> > >> > > > >>>>>>>> repartitions an error is thrown. Is there some name
> >> that
> >> > >> > someone
> >> > >> > > > >>>> can
> >> > >> > > > >>>>>>>> recommend or should I leave the null and allow it to
> >> fall
> >> > >> back
> >> > >> > > to
> >> > >> > > > >>>> the
> >> > >> > > > >>>>>>>> KGroupedStream.name?
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>> Should this be expanded to handle grouped tables?
> This
> >> > >> would
> >> > >> > be
> >> > >> > > > >>>>> pretty
> >> > >> > > > >>>>>>> easy
> >> > >> > > > >>>>>>>> for a normal aggregate but one allowing session
> stores
> >> > and
> >> > >> > > > windowed
> >> > >> > > > >>>>>>> stores
> >> > >> > > > >>>>>>>> would required KTableSessionWindowAggregate and
> >> > >> > > > >>>> KTableWindowAggregate
> >> > >> > > > >>>>>>>> implementations.
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>> Thanks,
> >> > >> > > > >>>>>>>> Kyle
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> >> > >> > eno.thereska@gmail.com>
> >> > >> > > > >>>>> wrote:
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>>> I’ll look as well asap, sorry, been swamped.
> >> > >> > > > >>>>>>>>>
> >> > >> > > > >>>>>>>>> Eno
> >> > >> > > > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> >> > >> > damian.guy@gmail.com>
> >> > >> > > > >>>>> wrote:
> >> > >> > > > >>>>>>>>>>
> >> > >> > > > >>>>>>>>>> Hi Kyle,
> >> > >> > > > >>>>>>>>>>
> >> > >> > > > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had
> >> the
> >> > >> > chance
> >> > >> > > to
> >> > >> > > > >>>>> look
> >> > >> > > > >>>>>>> at
> >> > >> > > > >>>>>>>>>> the KIP yet, but will schedule some time to look
> >> into
> >> > it
> >> > >> > > > >>>> tomorrow.
> >> > >> > > > >>>>>> For
> >> > >> > > > >>>>>>>>> the
> >> > >> > > > >>>>>>>>>> implementation, can you raise a PR against kafka
> >> trunk
> >> > >> and
> >> > >> > > mark
> >> > >> > > > >>>> it
> >> > >> > > > >>>>> as
> >> > >> > > > >>>>>>>>> WIP?
> >> > >> > > > >>>>>>>>>> It will be easier to review what you have done.
> >> > >> > > > >>>>>>>>>>
> >> > >> > > > >>>>>>>>>> Thanks,
> >> > >> > > > >>>>>>>>>> Damian
> >> > >> > > > >>>>>>>>>>
> >> > >> > > > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> >> > >> > > > >>>>> winkelman.kyle@gmail.com
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>>>> wrote:
> >> > >> > > > >>>>>>>>>>
> >> > >> > > > >>>>>>>>>>> I am replying to this in hopes it will draw some
> >> > >> attention
> >> > >> > to
> >> > >> > > > my
> >> > >> > > > >>>>> KIP
> >> > >> > > > >>>>>>> as
> >> > >> > > > >>>>>>>>> I
> >> > >> > > > >>>>>>>>>>> haven't heard from anyone in a couple days. This
> >> is my
> >> > >> > first
> >> > >> > > > KIP
> >> > >> > > > >>>>> and
> >> > >> > > > >>>>>>> my
> >> > >> > > > >>>>>>>>>>> first large contribution to the project so I'm
> >> sure I
> >> > >> did
> >> > >> > > > >>>>> something
> >> > >> > > > >>>>>>>>> wrong.
> >> > >> > > > >>>>>>>>>>> ;)
> >> > >> > > > >>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> >> > >> > > > >>>>> winkelman.kyle@gmail.com>
> >> > >> > > > >>>>>>>>> wrote:
> >> > >> > > > >>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>>> Hello all,
> >> > >> > > > >>>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>>> I have created KIP-150 to facilitate discussion
> >> about
> >> > >> > adding
> >> > >> > > > >>>>>> cogroup
> >> > >> > > > >>>>>>> to
> >> > >> > > > >>>>>>>>>>>> the streams DSL.
> >> > >> > > > >>>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>>> Please find the KIP here:
> >> > >> > > > >>>>>>>>>>>> https://cwiki.apache.org/
> >> > confluence/display/KAFKA/KIP-
> >> > >> > > > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> >> > >> > > > >>>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>>> Please find my initial implementation here:
> >> > >> > > > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> >> > >> > > > >>>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>>> Thanks,
> >> > >> > > > >>>>>>>>>>>> Kyle Winkelman
> >> > >> > > > >>>>>>>>>>>>
> >> > >> > > > >>>>>>>>>>>
> >> > >> > > > >>>>>>>>>
> >> > >> > > > >>>>>>>>>
> >> > >> > > > >>>>>>>>
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>>
> >> > >> > > > >>>>>>
> >> > >> > > > >>>>>
> >> > >> > > > >>>>
> >> > >> > > > >>
> >> > >> > > > >>
> >> > >> > > >
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message