kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kyle Winkelman <winkelman.k...@gmail.com>
Subject Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup
Date Tue, 09 May 2017 15:07:58 GMT
Eno, is there anyone else that is an expert in the kafka streams realm that
I should reach out to for input?

I believe Damian Guy is still planning on reviewing this more in depth so I
will wait for his inputs before continuing.

On May 9, 2017 7:30 AM, "Eno Thereska" <eno.thereska@gmail.com> wrote:

> Thanks Kyle, good arguments.
>
> Eno
>
> > On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.kyle@gmail.com>
> wrote:
> >
> > *- minor: could you add an exact example (similar to what Jay’s example
> is,
> > or like your Spark/Pig pointers had) to make this super concrete?*
> > I have added a more concrete example to the KIP.
> >
> > *- my main concern is that we’re exposing this optimization to the DSL.
> In
> > an ideal world, an optimizer would take the existing DSL and do the right
> > thing under the covers (create just one state store, arrange the nodes
> > etc). The original DSL had a bunch of small, composable pieces (group,
> > aggregate, join) that this proposal groups together. I’d like to hear
> your
> > thoughts on whether it’s possible to do this optimization with the
> current
> > DSL, at the topology builder level.*
> > You would have to make a lot of checks to understand if it is even
> possible
> > to make this optimization:
> > 1. Make sure they are all KTableKTableOuterJoins
> > 2. None of the intermediate KTables are used for anything else.
> > 3. None of the intermediate stores are used. (This may be impossible
> > especially if they use KafkaStreams#store after the topology has already
> > been built.)
> > You would then need to make decisions during the optimization:
> > 1. Your new initializer would the composite of all the individual
> > initializers and the valueJoiners.
> > 2. I am having a hard time thinking about how you would turn the
> > aggregators and valueJoiners into an aggregator that would work on the
> > final object, but this may be possible.
> > 3. Which state store would you use? The ones declared would be for the
> > aggregate values. None of the declared ones would be guaranteed to hold
> the
> > final object. This would mean you must created a new state store and not
> > created any of the declared ones.
> >
> > The main argument I have against it is even if it could be done I don't
> > know that we would want to have this be an optimization in the background
> > because the user would still be required to think about all of the
> > intermediate values that they shouldn't need to worry about if they only
> > care about the final object.
> >
> > In my opinion cogroup is a common enough case that it should be part of
> the
> > composable pieces (group, aggregate, join) because we want to allow
> people
> > to join more than 2 or more streams in an easy way. Right now I don't
> think
> > we give them ways of handling this use case easily.
> >
> > *-I think there will be scope for several such optimizations in the
> future
> > and perhaps at some point we need to think about decoupling the 1:1
> mapping
> > from the DSL into the physical topology.*
> > I would argue that cogroup is not just an optimization it is a new way
> for
> > the users to look at accomplishing a problem that requires multiple
> > streams. I may sound like a broken record but I don't think users should
> > have to build the N-1 intermediate tables and deal with their
> initializers,
> > serdes and stores if all they care about is the final object.
> > Now if for example someone uses cogroup but doesn't supply additional
> > streams and aggregators this case is equivalent to a single grouped
> stream
> > making an aggregate call. This case is what I view an optimization as, we
> > could remove the KStreamCogroup and act as if there was just a call to
> > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
> > would prefer to just write a warning saying that this is not how cogroup
> is
> > to be used.)
> >
> > Thanks,
> > Kyle
> >
> > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
> >
> >> Hi Kyle,
> >>
> >> Thanks for the KIP again. A couple of comments:
> >>
> >> - minor: could you add an exact example (similar to what Jay’s example
> is,
> >> or like your Spark/Pig pointers had) to make this super concrete?
> >>
> >> - my main concern is that we’re exposing this optimization to the DSL.
> In
> >> an ideal world, an optimizer would take the existing DSL and do the
> right
> >> thing under the covers (create just one state store, arrange the nodes
> >> etc). The original DSL had a bunch of small, composable pieces (group,
> >> aggregate, join) that this proposal groups together. I’d like to hear
> your
> >> thoughts on whether it’s possible to do this optimization with the
> current
> >> DSL, at the topology builder level.
> >>
> >> I think there will be scope for several such optimizations in the future
> >> and perhaps at some point we need to think about decoupling the 1:1
> mapping
> >> from the DSL into the physical topology.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On May 5, 2017, at 4:39 PM, Jay Kreps <jay@confluent.io> wrote:
> >>>
> >>> I haven't digested the proposal but the use case is pretty common. An
> >>> example would be the "customer 360" or "unified customer profile" use
> >> case
> >>> we often use. In that use case you have a dozen systems each of which
> has
> >>> some information about your customer (account details, settings,
> billing
> >>> info, customer service contacts, purchase history, etc). Your goal is
> to
> >>> join/munge these into a single profile record for each customer that
> has
> >>> all the relevant info in a usable form and is up-to-date with all the
> >>> source systems. If you implement that with kstreams as a sequence of
> >> joins
> >>> i think today we'd fully materialize N-1 intermediate tables. But
> clearly
> >>> you only need a single stage to group all these things that are already
> >>> co-partitioned. A distributed database would do this under the covers
> >> which
> >>> is arguably better (at least when it does the right thing) and perhaps
> we
> >>> could do the same thing but I'm not sure we know the partitioning so we
> >> may
> >>> need an explicit cogroup command that impllies they are already
> >>> co-partitioned.
> >>>
> >>> -Jay
> >>>
> >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> winkelman.kyle@gmail.com
> >>>
> >>> wrote:
> >>>
> >>>> Yea thats a good way to look at it.
> >>>> I have seen this type of functionality in a couple other platforms
> like
> >>>> spark and pig.
> >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> >> PairRDDFunctions.html
> >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> >> cogroup_operator.htm
> >>>>
> >>>>
> >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian.guy@gmail.com> wrote:
> >>>>
> >>>>> Hi Kyle,
> >>>>>
> >>>>> If i'm reading this correctly it is like an N way outer join? So
an
> >> input
> >>>>> on any stream will always produce a new aggregated value - is that
> >>>> correct?
> >>>>> Effectively, each Aggregator just looks up the current value,
> >> aggregates
> >>>>> and forwards the result.
> >>>>> I need to look into it and think about it a bit more, but it seems
> like
> >>>> it
> >>>>> could be a useful optimization.
> >>>>>
> >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.kyle@gmail.com
> >
> >>>>> wrote:
> >>>>>
> >>>>>> I sure can. I have added the following description to my KIP.
If
> this
> >>>>>> doesn't help let me know and I will take some more time to build
a
> >>>>> diagram
> >>>>>> and make more of a step by step description:
> >>>>>>
> >>>>>> Example with Current API:
> >>>>>>
> >>>>>> KTable<K, V1> table1 =
> >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
> >>>>> aggregator1,
> >>>>>> aggValueSerde1, storeName1);
> >>>>>> KTable<K, V2> table2 =
> >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
> >>>>> aggregator2,
> >>>>>> aggValueSerde2, storeName2);
> >>>>>> KTable<K, V3> table3 =
> >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
> >>>>> aggregator3,
> >>>>>> aggValueSerde3, storeName3);
> >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> >>>>>>
> >>>>>> As you can see this creates 3 StateStores, requires 3 initializers,
> >>>> and 3
> >>>>>> aggValueSerdes. This also adds the pressure to user to define
what
> the
> >>>>>> intermediate values are going to be (V1, V2, V3). They are left
> with a
> >>>>>> couple choices, first to make V1, V2, and V3 all the same as
CG and
> >> the
> >>>>> two
> >>>>>> joiners are more like mergers, or second make them intermediate
> states
> >>>>> such
> >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those
to
> >>>> build
> >>>>>> the final aggregate CG value. This is something the user could
avoid
> >>>>>> thinking about with this KIP.
> >>>>>>
> >>>>>> When a new input arrives lets say at "topic1" it will first
go
> through
> >>>> a
> >>>>>> KStreamAggregate grabbing the current aggregate from storeName1.
It
> >>>> will
> >>>>>> produce this in the form of the first intermediate value and
get
> sent
> >>>>>> through a KTableKTableOuterJoin where it will look up the current
> >> value
> >>>>> of
> >>>>>> the key in storeName2. It will use the first joiner to calculate
the
> >>>>> second
> >>>>>> intermediate value, which will go through an additional
> >>>>>> KTableKTableOuterJoin. Here it will look up the current value
of the
> >>>> key
> >>>>> in
> >>>>>> storeName3 and use the second joiner to build the final aggregate
> >>>> value.
> >>>>>>
> >>>>>> If you think through all possibilities for incoming topics you
will
> >> see
> >>>>>> that no matter which topic it comes in through all three stores
are
> >>>>> queried
> >>>>>> and all of the joiners must get used.
> >>>>>>
> >>>>>> Topology wise for N incoming streams this creates N
> >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
> >>>>>> KTableKTableJoinMergers.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Example with Proposed API:
> >>>>>>
> >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> >>>> groupByKey();
> >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> >>>> groupByKey();
> >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> >>>> groupByKey();
> >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
> aggregator1,
> >>>>>> aggValueSerde1, storeName1)
> >>>>>>       .cogroup(grouped2, aggregator2)
> >>>>>>       .cogroup(grouped3, aggregator3)
> >>>>>>       .aggregate();
> >>>>>>
> >>>>>> As you can see this creates 1 StateStore, requires 1 initializer,
> and
> >> 1
> >>>>>> aggValueSerde. The user no longer has to worry about the
> intermediate
> >>>>>> values and the joiners. All they have to think about is how
each
> >> stream
> >>>>>> impacts the creation of the final CG object.
> >>>>>>
> >>>>>> When a new input arrives lets say at "topic1" it will first
go
> through
> >>>> a
> >>>>>> KStreamAggreagte and grab the current aggregate from storeName1.
It
> >>>> will
> >>>>>> add its incoming object to the aggregate, update the store and
pass
> >> the
> >>>>> new
> >>>>>> aggregate on. This new aggregate goes through the KStreamCogroup
> which
> >>>> is
> >>>>>> pretty much just a pass through processor and you are done.
> >>>>>>
> >>>>>> Topology wise for N incoming streams the new api will only every
> >>>> create N
> >>>>>> KStreamAggregates and 1 KStreamCogroup.
> >>>>>>
> >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> >> matthias@confluent.io
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Kyle,
> >>>>>>>
> >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but
I could not
> >>>>>>> follow completely. Could you maybe add a more concrete example,
> like
> >>>> 3
> >>>>>>> streams with 3 records each (plus expected result), and
show the
> >>>>>>> difference between current way to to implement it and the
proposed
> >>>> API?
> >>>>>>> This could also cover the internal processing to see what
store
> calls
> >>>>>>> would be required for both approaches etc.
> >>>>>>>
> >>>>>>> I think, it's pretty advanced stuff you propose, and it
would help
> to
> >>>>>>> understand it better.
> >>>>>>>
> >>>>>>> Thanks a lot!
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> >>>>>>>> I have made a pull request. It can be found here.
> >>>>>>>>
> >>>>>>>> https://github.com/apache/kafka/pull/2975
> >>>>>>>>
> >>>>>>>> I plan to write some more unit tests for my classes
and get around
> >>>> to
> >>>>>>>> writing documentation for the public api additions.
> >>>>>>>>
> >>>>>>>> One thing I was curious about is during the
> >>>>>>> KCogroupedStreamImpl#aggregate
> >>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired
> >>>>> method.
> >>>>>> I
> >>>>>>>> can't supply the store name because if more than one
grouped
> stream
> >>>>>>>> repartitions an error is thrown. Is there some name
that someone
> >>>> can
> >>>>>>>> recommend or should I leave the null and allow it to
fall back to
> >>>> the
> >>>>>>>> KGroupedStream.name?
> >>>>>>>>
> >>>>>>>> Should this be expanded to handle grouped tables? This
would be
> >>>>> pretty
> >>>>>>> easy
> >>>>>>>> for a normal aggregate but one allowing session stores
and
> windowed
> >>>>>>> stores
> >>>>>>>> would required KTableSessionWindowAggregate and
> >>>> KTableWindowAggregate
> >>>>>>>> implementations.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Kyle
> >>>>>>>>
> >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.thereska@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I’ll look as well asap, sorry, been swamped.
> >>>>>>>>>
> >>>>>>>>> Eno
> >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian.guy@gmail.com>
> >>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Kyle,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the KIP. I apologize that i haven't
had the chance to
> >>>>> look
> >>>>>>> at
> >>>>>>>>>> the KIP yet, but will schedule some time to
look into it
> >>>> tomorrow.
> >>>>>> For
> >>>>>>>>> the
> >>>>>>>>>> implementation, can you raise a PR against kafka
trunk and mark
> >>>> it
> >>>>> as
> >>>>>>>>> WIP?
> >>>>>>>>>> It will be easier to review what you have done.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Damian
> >>>>>>>>>>
> >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> >>>>> winkelman.kyle@gmail.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> I am replying to this in hopes it will draw
some attention to
> my
> >>>>> KIP
> >>>>>>> as
> >>>>>>>>> I
> >>>>>>>>>>> haven't heard from anyone in a couple days.
This is my first
> KIP
> >>>>> and
> >>>>>>> my
> >>>>>>>>>>> first large contribution to the project
so I'm sure I did
> >>>>> something
> >>>>>>>>> wrong.
> >>>>>>>>>>> ;)
> >>>>>>>>>>>
> >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman"
<
> >>>>> winkelman.kyle@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have created KIP-150 to facilitate
discussion about adding
> >>>>>> cogroup
> >>>>>>> to
> >>>>>>>>>>>> the streams DSL.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Please find the KIP here:
> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> >>>>>>>>>>>>
> >>>>>>>>>>>> Please find my initial implementation
here:
> >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Kyle Winkelman
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message