kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xavier Léauté <xav...@confluent.io>
Subject Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup
Date Fri, 19 May 2017 15:36:10 GMT
Sorry to jump on this thread so late. I agree this is a very useful
addition and wanted to provide an additional use-case and some more
comments.

This is actually a very common analytics use-case in the ad-tech industry.
The typical setup will have an auction stream, an impression stream, and a
click stream. Those three streams need to be combined to compute aggregate
statistics (e.g. impression statistics, and click-through rates), since
most of the attributes of interest are only present the auction stream.

A simple way to do this is to co-group all the streams by the auction key,
and process updates to the co-group as events for each stream come in,
keeping only one value from each stream before sending downstream for
further processing / aggregation.

One could view the result of that co-group operation as a "KTable" with
multiple values per key. The key being the grouping key, and the values
consisting of one value per stream.

What I like about Kyle's approach is that allows elegant co-grouping of
multiple streams without having to worry about the number of streams, and
avoids dealing with Tuple types or other generic interfaces that could get
messy if we wanted to preserve all the value types in the resulting
co-grouped stream.

My only concern is that we only allow the cogroup + aggregate combined
operation. This forces the user to build their own tuple serialization
format if they want to preserve the individual input stream values as a
group. It also deviates quite a bit from our approach in KGroupedStream
which offers other operations, such as count and reduce, which should also
be applicable to a co-grouped stream.

Overall I still think this is a really useful addition, but I feel we
haven't spend much time trying to explore alternative DSLs that could maybe
generalize better or match our existing syntax more closely.

On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <winkelman.kyle@gmail.com>
wrote:

> Eno, is there anyone else that is an expert in the kafka streams realm that
> I should reach out to for input?
>
> I believe Damian Guy is still planning on reviewing this more in depth so I
> will wait for his inputs before continuing.
>
> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.thereska@gmail.com> wrote:
>
> > Thanks Kyle, good arguments.
> >
> > Eno
> >
> > > On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.kyle@gmail.com>
> > wrote:
> > >
> > > *- minor: could you add an exact example (similar to what Jay’s example
> > is,
> > > or like your Spark/Pig pointers had) to make this super concrete?*
> > > I have added a more concrete example to the KIP.
> > >
> > > *- my main concern is that we’re exposing this optimization to the DSL.
> > In
> > > an ideal world, an optimizer would take the existing DSL and do the
> right
> > > thing under the covers (create just one state store, arrange the nodes
> > > etc). The original DSL had a bunch of small, composable pieces (group,
> > > aggregate, join) that this proposal groups together. I’d like to hear
> > your
> > > thoughts on whether it’s possible to do this optimization with the
> > current
> > > DSL, at the topology builder level.*
> > > You would have to make a lot of checks to understand if it is even
> > possible
> > > to make this optimization:
> > > 1. Make sure they are all KTableKTableOuterJoins
> > > 2. None of the intermediate KTables are used for anything else.
> > > 3. None of the intermediate stores are used. (This may be impossible
> > > especially if they use KafkaStreams#store after the topology has
> already
> > > been built.)
> > > You would then need to make decisions during the optimization:
> > > 1. Your new initializer would the composite of all the individual
> > > initializers and the valueJoiners.
> > > 2. I am having a hard time thinking about how you would turn the
> > > aggregators and valueJoiners into an aggregator that would work on the
> > > final object, but this may be possible.
> > > 3. Which state store would you use? The ones declared would be for the
> > > aggregate values. None of the declared ones would be guaranteed to hold
> > the
> > > final object. This would mean you must created a new state store and
> not
> > > created any of the declared ones.
> > >
> > > The main argument I have against it is even if it could be done I don't
> > > know that we would want to have this be an optimization in the
> background
> > > because the user would still be required to think about all of the
> > > intermediate values that they shouldn't need to worry about if they
> only
> > > care about the final object.
> > >
> > > In my opinion cogroup is a common enough case that it should be part of
> > the
> > > composable pieces (group, aggregate, join) because we want to allow
> > people
> > > to join more than 2 or more streams in an easy way. Right now I don't
> > think
> > > we give them ways of handling this use case easily.
> > >
> > > *-I think there will be scope for several such optimizations in the
> > future
> > > and perhaps at some point we need to think about decoupling the 1:1
> > mapping
> > > from the DSL into the physical topology.*
> > > I would argue that cogroup is not just an optimization it is a new way
> > for
> > > the users to look at accomplishing a problem that requires multiple
> > > streams. I may sound like a broken record but I don't think users
> should
> > > have to build the N-1 intermediate tables and deal with their
> > initializers,
> > > serdes and stores if all they care about is the final object.
> > > Now if for example someone uses cogroup but doesn't supply additional
> > > streams and aggregators this case is equivalent to a single grouped
> > stream
> > > making an aggregate call. This case is what I view an optimization as,
> we
> > > could remove the KStreamCogroup and act as if there was just a call to
> > > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
> > > would prefer to just write a warning saying that this is not how
> cogroup
> > is
> > > to be used.)
> > >
> > > Thanks,
> > > Kyle
> > >
> > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.thereska@gmail.com>
> > wrote:
> > >
> > >> Hi Kyle,
> > >>
> > >> Thanks for the KIP again. A couple of comments:
> > >>
> > >> - minor: could you add an exact example (similar to what Jay’s example
> > is,
> > >> or like your Spark/Pig pointers had) to make this super concrete?
> > >>
> > >> - my main concern is that we’re exposing this optimization to the DSL.
> > In
> > >> an ideal world, an optimizer would take the existing DSL and do the
> > right
> > >> thing under the covers (create just one state store, arrange the nodes
> > >> etc). The original DSL had a bunch of small, composable pieces (group,
> > >> aggregate, join) that this proposal groups together. I’d like to hear
> > your
> > >> thoughts on whether it’s possible to do this optimization with the
> > current
> > >> DSL, at the topology builder level.
> > >>
> > >> I think there will be scope for several such optimizations in the
> future
> > >> and perhaps at some point we need to think about decoupling the 1:1
> > mapping
> > >> from the DSL into the physical topology.
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >>> On May 5, 2017, at 4:39 PM, Jay Kreps <jay@confluent.io> wrote:
> > >>>
> > >>> I haven't digested the proposal but the use case is pretty common.
An
> > >>> example would be the "customer 360" or "unified customer profile" use
> > >> case
> > >>> we often use. In that use case you have a dozen systems each of which
> > has
> > >>> some information about your customer (account details, settings,
> > billing
> > >>> info, customer service contacts, purchase history, etc). Your goal
is
> > to
> > >>> join/munge these into a single profile record for each customer that
> > has
> > >>> all the relevant info in a usable form and is up-to-date with all the
> > >>> source systems. If you implement that with kstreams as a sequence of
> > >> joins
> > >>> i think today we'd fully materialize N-1 intermediate tables. But
> > clearly
> > >>> you only need a single stage to group all these things that are
> already
> > >>> co-partitioned. A distributed database would do this under the covers
> > >> which
> > >>> is arguably better (at least when it does the right thing) and
> perhaps
> > we
> > >>> could do the same thing but I'm not sure we know the partitioning so
> we
> > >> may
> > >>> need an explicit cogroup command that impllies they are already
> > >>> co-partitioned.
> > >>>
> > >>> -Jay
> > >>>
> > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> > winkelman.kyle@gmail.com
> > >>>
> > >>> wrote:
> > >>>
> > >>>> Yea thats a good way to look at it.
> > >>>> I have seen this type of functionality in a couple other platforms
> > like
> > >>>> spark and pig.
> > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> > >> PairRDDFunctions.html
> > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> > >> cogroup_operator.htm
> > >>>>
> > >>>>
> > >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian.guy@gmail.com>
wrote:
> > >>>>
> > >>>>> Hi Kyle,
> > >>>>>
> > >>>>> If i'm reading this correctly it is like an N way outer join?
So an
> > >> input
> > >>>>> on any stream will always produce a new aggregated value -
is that
> > >>>> correct?
> > >>>>> Effectively, each Aggregator just looks up the current value,
> > >> aggregates
> > >>>>> and forwards the result.
> > >>>>> I need to look into it and think about it a bit more, but it
seems
> > like
> > >>>> it
> > >>>>> could be a useful optimization.
> > >>>>>
> > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> winkelman.kyle@gmail.com
> > >
> > >>>>> wrote:
> > >>>>>
> > >>>>>> I sure can. I have added the following description to my
KIP. If
> > this
> > >>>>>> doesn't help let me know and I will take some more time
to build a
> > >>>>> diagram
> > >>>>>> and make more of a step by step description:
> > >>>>>>
> > >>>>>> Example with Current API:
> > >>>>>>
> > >>>>>> KTable<K, V1> table1 =
> > >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
> > >>>>> aggregator1,
> > >>>>>> aggValueSerde1, storeName1);
> > >>>>>> KTable<K, V2> table2 =
> > >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
> > >>>>> aggregator2,
> > >>>>>> aggValueSerde2, storeName2);
> > >>>>>> KTable<K, V3> table3 =
> > >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
> > >>>>> aggregator3,
> > >>>>>> aggValueSerde3, storeName3);
> > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> > >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> > >>>>>>
> > >>>>>> As you can see this creates 3 StateStores, requires 3
> initializers,
> > >>>> and 3
> > >>>>>> aggValueSerdes. This also adds the pressure to user to
define what
> > the
> > >>>>>> intermediate values are going to be (V1, V2, V3). They
are left
> > with a
> > >>>>>> couple choices, first to make V1, V2, and V3 all the same
as CG
> and
> > >> the
> > >>>>> two
> > >>>>>> joiners are more like mergers, or second make them intermediate
> > states
> > >>>>> such
> > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners
use those
> to
> > >>>> build
> > >>>>>> the final aggregate CG value. This is something the user
could
> avoid
> > >>>>>> thinking about with this KIP.
> > >>>>>>
> > >>>>>> When a new input arrives lets say at "topic1" it will first
go
> > through
> > >>>> a
> > >>>>>> KStreamAggregate grabbing the current aggregate from storeName1.
> It
> > >>>> will
> > >>>>>> produce this in the form of the first intermediate value
and get
> > sent
> > >>>>>> through a KTableKTableOuterJoin where it will look up the
current
> > >> value
> > >>>>> of
> > >>>>>> the key in storeName2. It will use the first joiner to
calculate
> the
> > >>>>> second
> > >>>>>> intermediate value, which will go through an additional
> > >>>>>> KTableKTableOuterJoin. Here it will look up the current
value of
> the
> > >>>> key
> > >>>>> in
> > >>>>>> storeName3 and use the second joiner to build the final
aggregate
> > >>>> value.
> > >>>>>>
> > >>>>>> If you think through all possibilities for incoming topics
you
> will
> > >> see
> > >>>>>> that no matter which topic it comes in through all three
stores
> are
> > >>>>> queried
> > >>>>>> and all of the joiners must get used.
> > >>>>>>
> > >>>>>> Topology wise for N incoming streams this creates N
> > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and
N-1
> > >>>>>> KTableKTableJoinMergers.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> Example with Proposed API:
> > >>>>>>
> > >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> > >>>> groupByKey();
> > >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> > >>>> groupByKey();
> > >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> > >>>> groupByKey();
> > >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
> > aggregator1,
> > >>>>>> aggValueSerde1, storeName1)
> > >>>>>>       .cogroup(grouped2, aggregator2)
> > >>>>>>       .cogroup(grouped3, aggregator3)
> > >>>>>>       .aggregate();
> > >>>>>>
> > >>>>>> As you can see this creates 1 StateStore, requires 1 initializer,
> > and
> > >> 1
> > >>>>>> aggValueSerde. The user no longer has to worry about the
> > intermediate
> > >>>>>> values and the joiners. All they have to think about is
how each
> > >> stream
> > >>>>>> impacts the creation of the final CG object.
> > >>>>>>
> > >>>>>> When a new input arrives lets say at "topic1" it will first
go
> > through
> > >>>> a
> > >>>>>> KStreamAggreagte and grab the current aggregate from storeName1.
> It
> > >>>> will
> > >>>>>> add its incoming object to the aggregate, update the store
and
> pass
> > >> the
> > >>>>> new
> > >>>>>> aggregate on. This new aggregate goes through the KStreamCogroup
> > which
> > >>>> is
> > >>>>>> pretty much just a pass through processor and you are done.
> > >>>>>>
> > >>>>>> Topology wise for N incoming streams the new api will only
every
> > >>>> create N
> > >>>>>> KStreamAggregates and 1 KStreamCogroup.
> > >>>>>>
> > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> > >> matthias@confluent.io
> > >>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Kyle,
> > >>>>>>>
> > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow,
but I could
> not
> > >>>>>>> follow completely. Could you maybe add a more concrete
example,
> > like
> > >>>> 3
> > >>>>>>> streams with 3 records each (plus expected result),
and show the
> > >>>>>>> difference between current way to to implement it and
the
> proposed
> > >>>> API?
> > >>>>>>> This could also cover the internal processing to see
what store
> > calls
> > >>>>>>> would be required for both approaches etc.
> > >>>>>>>
> > >>>>>>> I think, it's pretty advanced stuff you propose, and
it would
> help
> > to
> > >>>>>>> understand it better.
> > >>>>>>>
> > >>>>>>> Thanks a lot!
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> > >>>>>>>> I have made a pull request. It can be found here.
> > >>>>>>>>
> > >>>>>>>> https://github.com/apache/kafka/pull/2975
> > >>>>>>>>
> > >>>>>>>> I plan to write some more unit tests for my classes
and get
> around
> > >>>> to
> > >>>>>>>> writing documentation for the public api additions.
> > >>>>>>>>
> > >>>>>>>> One thing I was curious about is during the
> > >>>>>>> KCogroupedStreamImpl#aggregate
> > >>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired
> > >>>>> method.
> > >>>>>> I
> > >>>>>>>> can't supply the store name because if more than
one grouped
> > stream
> > >>>>>>>> repartitions an error is thrown. Is there some
name that someone
> > >>>> can
> > >>>>>>>> recommend or should I leave the null and allow
it to fall back
> to
> > >>>> the
> > >>>>>>>> KGroupedStream.name?
> > >>>>>>>>
> > >>>>>>>> Should this be expanded to handle grouped tables?
This would be
> > >>>>> pretty
> > >>>>>>> easy
> > >>>>>>>> for a normal aggregate but one allowing session
stores and
> > windowed
> > >>>>>>> stores
> > >>>>>>>> would required KTableSessionWindowAggregate and
> > >>>> KTableWindowAggregate
> > >>>>>>>> implementations.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Kyle
> > >>>>>>>>
> > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.thereska@gmail.com>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> I’ll look as well asap, sorry, been swamped.
> > >>>>>>>>>
> > >>>>>>>>> Eno
> > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy
<damian.guy@gmail.com>
> > >>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Hi Kyle,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for the KIP. I apologize that i
haven't had the chance
> to
> > >>>>> look
> > >>>>>>> at
> > >>>>>>>>>> the KIP yet, but will schedule some time
to look into it
> > >>>> tomorrow.
> > >>>>>> For
> > >>>>>>>>> the
> > >>>>>>>>>> implementation, can you raise a PR against
kafka trunk and
> mark
> > >>>> it
> > >>>>> as
> > >>>>>>>>> WIP?
> > >>>>>>>>>> It will be easier to review what you have
done.
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks,
> > >>>>>>>>>> Damian
> > >>>>>>>>>>
> > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman
<
> > >>>>> winkelman.kyle@gmail.com
> > >>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> I am replying to this in hopes it will
draw some attention to
> > my
> > >>>>> KIP
> > >>>>>>> as
> > >>>>>>>>> I
> > >>>>>>>>>>> haven't heard from anyone in a couple
days. This is my first
> > KIP
> > >>>>> and
> > >>>>>>> my
> > >>>>>>>>>>> first large contribution to the project
so I'm sure I did
> > >>>>> something
> > >>>>>>>>> wrong.
> > >>>>>>>>>>> ;)
> > >>>>>>>>>>>
> > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman"
<
> > >>>>> winkelman.kyle@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hello all,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I have created KIP-150 to facilitate
discussion about adding
> > >>>>>> cogroup
> > >>>>>>> to
> > >>>>>>>>>>>> the streams DSL.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Please find the KIP here:
> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Please find my initial implementation
here:
> > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>> Kyle Winkelman
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message