kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Borowiecki <michal.borowie...@openbet.com>
Subject Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup
Date Tue, 13 Jun 2017 10:15:22 GMT
Actually, just had a thought. It started with naming.

Are we actually co-grouping these streams or are we co-aggregating them?

After all, in each of the cogroup calls we are providing an Aggregator 
implementation.


If they are really co-aggregated, why don't we turn this around:

|KGroupedStream<K, V1> grouped1 = 
builder.stream(||"topic1"||).groupByKey();|
|KGroupedStream<K, V2> grouped2 = 
builder.stream(||"topic2"||).groupByKey();|
|KGroupedStream<K, V3> grouped3 = builder.stream(||"topic3"||).groupByKey();
|
|KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1, 
aggValueSerde1) // this is the unchanged aggregate method||
||.aggregate(grouped2, aggregator2)  // this is a new method
|
|||.aggregate(grouped3, aggregator3); // ditto
|
|
|
This means instead of adding cogroup methods on KGroupStream interface, 
adding aggregate method on KTable interface.

Is that feasible?

Cheers,
Michał

On 13/06/17 10:56, Michal Borowiecki wrote:
>
> Also, I still feel that putting initializer on the first cogroup can 
> mislead users into thinking the first stream is in some way special.
>
> Just my 5c.
> Michał
>
> On 13/06/17 09:54, Michal Borowiecki wrote:
>>
>> Agree completely with the argument for serdes belonging in the same 
>> place as the state store name, which is in the aggregate method.
>>
>> Cheers,
>>
>> Michał
>>
>>
>> On 12/06/17 18:20, Xavier Léauté wrote:
>>> I think we are discussing two separate things here, so it might be 
>>> worth clarifying:
>>>
>>> 1) the position of the initializer with respect to the aggregators. 
>>> If I understand correctly, Guozhang seems to think it is more 
>>> natural to specify the initializer first, despite it not bearing any 
>>> relation to the first aggregator. I can see the argument for 
>>> specifying the initializer first, but I think it is debatable 
>>> whether mixing it into the first cogroup call leads to a cleaner API 
>>> or not.
>>>
>>> 2) where the serde should be defined (if necessary). Looking at our 
>>> existing APIs in KGroupedStreams, we always offer two aggregate() 
>>> methods. The first one takes the name of the store and associated 
>>> aggregate value serde e.g. KGroupedStream.aggregate(Initializer<VR> 
>>> initializer, Aggregator<? super K, ? super V, VR> aggregator, 
>>> Serde<VR> aggValueSerde, String queryableStoreName)
>>> The second one only takes a state store supplier, and does not 
>>> specify any serde, e.g. KGroupedStream.aggregate(Initializer<VR> 
>>> initializer, Aggregator<? super K, ? super V, VR> aggregator, final 
>>> StateStoreSupplier<KeyValueStore> storeSupplier)
>>> Presumably, when specifying a state store supplier it shouldn't be 
>>> necessary to specify an aggregate value serde, since the provided 
>>> statestore might not need to serialize the values (e.g. it may just 
>>> keep them as regular objects in heap) or it may have its own 
>>> internal serialization format.
>>>
>>> For consistency I think it would be valuable to preserve the same 
>>> two aggregate methods for cogroup as well. Since the serde is only 
>>> required in one of the two cases, I believe the serde has no place 
>>> in the first cogroup() call and should only have to be specified as 
>>> part of the aggregate() method that takes a state store name. In the 
>>> case of a state store supplier, no serde would be necessary.
>>>
>>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangguoz@gmail.com 
>>> <mailto:wangguoz@gmail.com>> wrote:
>>>
>>>     I'd agree that the aggregate value serde and the initializer
>>>     does not bear direct relationship with the first `cogroup`
>>>     calls, but after I tried to write some example code with these
>>>     two different set of APIs I felt the current APIs just program
>>>     more naturally.
>>>
>>>     I know it is kinda subjective, but I do think that user
>>>     experience may be more important as a deciding factor than the
>>>     logical argument for public interfaces. So I'd recommend people
>>>     to also try out writing some example lines also and we can
>>>     circle back and discuss which one feels more natural to write code.
>>>
>>>
>>>     Guozhang
>>>
>>>     On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki
>>>     <michal.borowiecki@openbet.com
>>>     <mailto:michal.borowiecki@openbet.com>> wrote:
>>>
>>>>         I feel it would make more sense to move the initializer and
>>>>         serde to the
>>>>         final aggregate statement, since the serde only applies to
>>>>         the state store,
>>>>         and the initializer doesn't bear any relation to the first
>>>>         group in
>>>>         particular.
>>>         +1 for moving initializer and serde from cogroup() to the
>>>         aggregate() for the reasons mentioned above.
>>>
>>>         Cheers,
>>>
>>>         Michał
>>>
>>>
>>>         On 08/06/17 22:44, Guozhang Wang wrote:
>>>
>>>>         Note that although the internal `AbstractStoreSupplier` does maintain the
>>>>         key-value serdes, we do not enforce the interface of `StateStoreSupplier`
>>>>         to always retain that information, and hence we cannot assume that
>>>>         StateStoreSuppliers always retain key / value serdes.
>>>>
>>>>         On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté<xavier@confluent.io> <mailto:xavier@confluent.io>  wrote:
>>>>
>>>>>         Another reason for the serde not to be in the first cogroup call, is that
>>>>>         the serde should not be required if you pass a StateStoreSupplier to
>>>>>         aggregate()
>>>>>
>>>>>         Regarding the aggregated type <T> I don't the why initializer should be
>>>>>         favored over aggregator to define the type. In my mind separating the
>>>>>         initializer into the last aggregate call clearly indicates that the
>>>>>         initializer is independent of any of the aggregators or streams and that we
>>>>>         don't wait for grouped1 events to initialize the co-group.
>>>>>
>>>>>         On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang<wangguoz@gmail.com> <mailto:wangguoz@gmail.com>  wrote:
>>>>>
>>>>>>         On a second thought... This is the current proposal API
>>>>>>
>>>>>>
>>>>>>         ```
>>>>>>
>>>>>>         <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer,
>>>>>         final
>>>>>>         Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
>>>>>>         aggValueSerde)
>>>>>>
>>>>>>         ```
>>>>>>
>>>>>>
>>>>>>         If we do not have the initializer in the first co-group it might be a bit
>>>>>>         awkward for users to specify the aggregator that returns a typed <T>
>>>>>         value?
>>>>>>         Maybe it is still better to put these two functions in the same api?
>>>>>>
>>>>>>
>>>>>>
>>>>>>         Guozhang
>>>>>>
>>>>>>         On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang<wangguoz@gmail.com> <mailto:wangguoz@gmail.com>
>>>>>         wrote:
>>>>>>>         This suggestion lgtm. I would vote for the first alternative than
>>>>>         adding
>>>>>>>         it to the `KStreamBuilder` though.
>>>>>>>
>>>>>>>         On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté<xavier@confluent.io> <mailto:xavier@confluent.io>
>>>>>>>         wrote:
>>>>>>>
>>>>>>>>         I have a minor suggestion to make the API a little bit more symmetric.
>>>>>>>>         I feel it would make more sense to move the initializer and serde to
>>>>>         the
>>>>>>>>         final aggregate statement, since the serde only applies to the state
>>>>>>>>         store,
>>>>>>>>         and the initializer doesn't bear any relation to the first group in
>>>>>>>>         particular. It would end up looking like this:
>>>>>>>>
>>>>>>>>         KTable<K, CG> cogrouped =
>>>>>>>>              grouped1.cogroup(aggregator1)
>>>>>>>>                      .cogroup(grouped2, aggregator2)
>>>>>>>>                      .cogroup(grouped3, aggregator3)
>>>>>>>>                      .aggregate(initializer1, aggValueSerde, storeName1);
>>>>>>>>
>>>>>>>>         Alternatively, we could move the the first cogroup() method to
>>>>>>>>         KStreamBuilder, similar to how we have .merge()
>>>>>>>>         and end up with an api that would be even more symmetric.
>>>>>>>>
>>>>>>>>         KStreamBuilder.cogroup(grouped1, aggregator1)
>>>>>>>>                        .cogroup(grouped2, aggregator2)
>>>>>>>>                        .cogroup(grouped3, aggregator3)
>>>>>>>>                        .aggregate(initializer1, aggValueSerde, storeName1);
>>>>>>>>
>>>>>>>>         This doesn't have to be a blocker, but I thought it would make the API
>>>>>>>>         just
>>>>>>>>         a tad cleaner.
>>>>>>>>
>>>>>>>>         On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang<wangguoz@gmail.com> <mailto:wangguoz@gmail.com>
>>>>>>         wrote:
>>>>>>>>>         Kyle,
>>>>>>>>>
>>>>>>>>>         Thanks a lot for the updated KIP. It looks good to me.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski<jim@jagunet.com> <mailto:jim@jagunet.com>
>>>>>>         wrote:
>>>>>>>>>>         This makes much more sense to me. +1
>>>>>>>>>>
>>>>>>>>>>>         On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
>>>>>>>>         winkelman.kyle@gmail.com <mailto:winkelman.kyle@gmail.com>>
>>>>>>>>>>         wrote:
>>>>>>>>>>>         I have updated the KIP and my PR. Let me know what you think.
>>>>>>>>>>>         To created a cogrouped stream just call cogroup on a
>>>>>>         KgroupedStream
>>>>>>>>         and
>>>>>>>>>>>         supply the initializer, aggValueSerde, and an aggregator. Then
>>>>>>>>         continue
>>>>>>>>>>>         adding kgroupedstreams and aggregators. Then call one of the
>>>>>         many
>>>>>>>>>>         aggregate
>>>>>>>>>>>         calls to create a KTable.
>>>>>>>>>>>
>>>>>>>>>>>         Thanks,
>>>>>>>>>>>         Kyle
>>>>>>>>>>>
>>>>>>>>>>>         On Jun 1, 2017 4:03 AM, "Damian Guy"<damian.guy@gmail.com> <mailto:damian.guy@gmail.com>
>>>>>>         wrote:
>>>>>>>>>>>>         Hi Kyle,
>>>>>>>>>>>>
>>>>>>>>>>>>         Thanks for the update. I think just one initializer makes sense
>>>>>>         as
>>>>>>>>         it
>>>>>>>>>>>>         should only be called once per key and generally it is just
>>>>>         going
>>>>>>>>         to
>>>>>>>>>>         create
>>>>>>>>>>>>         a new instance of whatever the Aggregate class is.
>>>>>>>>>>>>
>>>>>>>>>>>>         Cheers,
>>>>>>>>>>>>         Damian
>>>>>>>>>>>>
>>>>>>>>>>>>         On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
>>>>>>>>         winkelman.kyle@gmail.com <mailto:winkelman.kyle@gmail.com>
>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>         Hello all,
>>>>>>>>>>>>>
>>>>>>>>>>>>>         I have spent some more time on this and the best alternative I
>>>>>>>>         have
>>>>>>>>>>         come
>>>>>>>>>>>>         up
>>>>>>>>>>>>>         with is:
>>>>>>>>>>>>>         KGroupedStream has a single cogroup call that takes an
>>>>>>         initializer
>>>>>>>>>         and
>>>>>>>>>>         an
>>>>>>>>>>>>>         aggregator.
>>>>>>>>>>>>>         CogroupedKStream has a cogroup call that takes additional
>>>>>>>>>         groupedStream
>>>>>>>>>>>>>         aggregator pairs.
>>>>>>>>>>>>>         CogroupedKStream has multiple aggregate methods that create
>>>>>         the
>>>>>>>>>>         different
>>>>>>>>>>>>>         stores.
>>>>>>>>>>>>>
>>>>>>>>>>>>>         I plan on updating the kip but I want people's input on if we
>>>>>>>>         should
>>>>>>>>>>         have
>>>>>>>>>>>>>         the initializer be passed in once at the beginning or if we
>>>>>>         should
>>>>>>>>>>>>         instead
>>>>>>>>>>>>>         have the initializer be required for each call to one of the
>>>>>>>>>         aggregate
>>>>>>>>>>>>>         calls. The first makes more sense to me but doesnt allow the
>>>>>>         user
>>>>>>>>         to
>>>>>>>>>>>>>         specify different initializers for different tables.
>>>>>>>>>>>>>
>>>>>>>>>>>>>         Thanks,
>>>>>>>>>>>>>         Kyle
>>>>>>>>>>>>>
>>>>>>>>>>>>>         On May 24, 2017 7:46 PM, "Kyle Winkelman" <
>>>>>>>>         winkelman.kyle@gmail.com <mailto:winkelman.kyle@gmail.com>>
>>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Yea I really like that idea I'll see what I can do to update
>>>>>>         the
>>>>>>>>         kip
>>>>>>>>>>>>         and
>>>>>>>>>>>>>>         my pr when I have some time. I'm not sure how well creating
>>>>>         the
>>>>>>>>>>>>>>         kstreamaggregates will go though because at that point I will
>>>>>>>>         have
>>>>>>>>>>>>         thrown
>>>>>>>>>>>>>>         away the type of the values. It will be type safe I just may
>>>>>>>>         need to
>>>>>>>>>>>>         do a
>>>>>>>>>>>>>>         little forcing.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Thanks,
>>>>>>>>>>>>>>         Kyle
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         On May 24, 2017 3:28 PM, "Guozhang Wang" <wangguoz@gmail.com <mailto:wangguoz@gmail.com>
>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>         Kyle,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Thanks for the explanations, my previous read on the wiki
>>>>>>>>         examples
>>>>>>>>>>         was
>>>>>>>>>>>>>>>         wrong.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         So I guess my motivation should be "reduced" to: can we move
>>>>>>         the
>>>>>>>>>>>>         window
>>>>>>>>>>>>>>>         specs param from "KGroupedStream#cogroup(..)" to
>>>>>>>>>>>>>>>         "CogroupedKStream#aggregate(..)", and my motivations are:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         1. minor: we can reduce the #.generics in CogroupedKStream
>>>>>>         from
>>>>>>>>         3
>>>>>>>>>         to
>>>>>>>>>>>>         2.
>>>>>>>>>>>>>>>         2. major: this is for extensibility of the APIs, and since
>>>>>         we
>>>>>>>>         are
>>>>>>>>>>>>>         removing
>>>>>>>>>>>>>>>         the "Evolving" annotations on Streams it may be harder to
>>>>>>>>         change it
>>>>>>>>>>>>>         again
>>>>>>>>>>>>>>>         in the future. The extended use cases are that people wanted
>>>>>>         to
>>>>>>>>>         have
>>>>>>>>>>>>>>>         windowed running aggregates on different granularities, e.g.
>>>>>>>>         "give
>>>>>>>>>         me
>>>>>>>>>>>>>         the
>>>>>>>>>>>>>>>         counts per-minute, per-hour, per-day and per-week", and
>>>>>         today
>>>>>>         in
>>>>>>>>>         DSL
>>>>>>>>>>>>         we
>>>>>>>>>>>>>>>         need to specify that case in multiple aggregate operators,
>>>>>>         which
>>>>>>>>>         gets
>>>>>>>>>>>>         a
>>>>>>>>>>>>>>>         state store / changelog, etc. And it is possible to optimize
>>>>>>         it
>>>>>>>>         as
>>>>>>>>>>>>         well
>>>>>>>>>>>>>         to
>>>>>>>>>>>>>>>         a single state store. Its implementation would be tricky as
>>>>>>         you
>>>>>>>>>         need
>>>>>>>>>>>>         to
>>>>>>>>>>>>>>>         contain different lengthed windows within your window store
>>>>>>         but
>>>>>>>>>         just
>>>>>>>>>>>>>         from
>>>>>>>>>>>>>>>         the public API point of view, it could be specified as:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         CogroupedKStream stream = stream1.cogroup(stream2, ...
>>>>>>>>>>>>>>>         "state-store-name");
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         table1 = stream.aggregate(/*per-minute window*/)
>>>>>>>>>>>>>>>         table2 = stream.aggregate(/*per-hour window*/)
>>>>>>>>>>>>>>>         table3 = stream.aggregate(/*per-day window*/)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         while underlying we are only using a single store
>>>>>>>>>         "state-store-name"
>>>>>>>>>>>>         for
>>>>>>>>>>>>>>>         it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Although this feature is out of the scope of this KIP, I'd
>>>>>>         like
>>>>>>>>         to
>>>>>>>>>>>>>         discuss
>>>>>>>>>>>>>>>         if we can "leave the door open" to make such changes without
>>>>>>>>>>         modifying
>>>>>>>>>>>>>         the
>>>>>>>>>>>>>>>         public APIs .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
>>>>>>>>>>>>>         winkelman.kyle@gmail.com
>>>>>>>>>>>>>         <mailto:winkelman.kyle@gmail.com>
>>>>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         I allow defining a single window/sessionwindow one time
>>>>>         when
>>>>>>>>         you
>>>>>>>>>>>>         make
>>>>>>>>>>>>>>>         the
>>>>>>>>>>>>>>>>         cogroup call from a KGroupedStream. From then on you are
>>>>>>         using
>>>>>>>>         the
>>>>>>>>>>>>>>>         cogroup
>>>>>>>>>>>>>>>>         call from with in CogroupedKStream which doesnt accept any
>>>>>>>>>>>>         additional
>>>>>>>>>>>>>>>>         windows/sessionwindows.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         Is this what you meant by your question or did I
>>>>>>         misunderstand?
>>>>>>>>>>>>>>>>         On May 23, 2017 9:33 PM, "Guozhang Wang" <
>>>>>         wangguoz@gmail.com <mailto:wangguoz@gmail.com>
>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>>         Another question that came to me is on "window alignment":
>>>>>>         from
>>>>>>>>>         the
>>>>>>>>>>>>>         KIP
>>>>>>>>>>>>>>>         it
>>>>>>>>>>>>>>>>         seems you are allowing users to specify a (potentially
>>>>>>>>         different)
>>>>>>>>>>>>>         window
>>>>>>>>>>>>>>>>         spec in each co-grouped input stream. So if these window
>>>>>>         specs
>>>>>>>>         are
>>>>>>>>>>>>>>>>         different how should we "align" them with different input
>>>>>>>>>         streams? I
>>>>>>>>>>>>>>>         think
>>>>>>>>>>>>>>>>         it is more natural to only specify on window spec in the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         KTable<RK, V> CogroupedKStream#aggregate(Windows);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         And remove it from the cogroup() functions. WDYT?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
>>>>>>>>>         wangguoz@gmail.com <mailto:wangguoz@gmail.com>>
>>>>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>>>         Thanks for the proposal Kyle, this is a quite common use
>>>>>>         case
>>>>>>>>         to
>>>>>>>>>>>>>>>         support
>>>>>>>>>>>>>>>>>         such multi-way table join (i.e. N source tables with N
>>>>>>>>         aggregate
>>>>>>>>>>>>>         func)
>>>>>>>>>>>>>>>>         with
>>>>>>>>>>>>>>>>>         a single store and N+1 serdes, I have seen lots of people
>>>>>>>>         using
>>>>>>>>>>>>         the
>>>>>>>>>>>>>>>>>         low-level PAPI to achieve this goal.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
>>>>>>>>>>>>>>>>         winkelman.kyle@gmail.com
>>>>>>>>>>>>>>>>         <mailto:winkelman.kyle@gmail.com>
>>>>>>>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>>>>         I like your point about not handling other cases such as
>>>>>>>>         count
>>>>>>>>>>>>         and
>>>>>>>>>>>>>>>>         reduce.
>>>>>>>>>>>>>>>>>>         I think that reduce may not make sense because reduce
>>>>>>         assumes
>>>>>>>>>>>>         that
>>>>>>>>>>>>>>>         the
>>>>>>>>>>>>>>>>>>         input values are the same as the output values. With
>>>>>>         cogroup
>>>>>>>>>>>>         there
>>>>>>>>>>>>>>>         may
>>>>>>>>>>>>>>>>         be
>>>>>>>>>>>>>>>>>>         multiple different input types and then your output type
>>>>>>>>         cant be
>>>>>>>>>>>>>>>>         multiple
>>>>>>>>>>>>>>>>>>         different things. In the case where you have all matching
>>>>>>>>         value
>>>>>>>>>>>>>         types
>>>>>>>>>>>>>>>>         you
>>>>>>>>>>>>>>>>>>         can do KStreamBuilder#merge followed by the reduce.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         As for count I think it is possible to call count on all
>>>>>>         the
>>>>>>>>>>>>>>>         individual
>>>>>>>>>>>>>>>>>>         grouped streams and then do joins. Otherwise we could
>>>>>         maybe
>>>>>>>>         make
>>>>>>>>>>>>         a
>>>>>>>>>>>>>>>>         special
>>>>>>>>>>>>>>>>>>         call in groupedstream for this case. Because in this case
>>>>>>         we
>>>>>>>>>         dont
>>>>>>>>>>>>>>>         need
>>>>>>>>>>>>>>>>         to
>>>>>>>>>>>>>>>>>>         do type checking on the values. It could be similar to
>>>>>         the
>>>>>>>>>>>>         current
>>>>>>>>>>>>>>>         count
>>>>>>>>>>>>>>>>>>         methods but accept a var args of additonal grouped
>>>>>         streams
>>>>>>         as
>>>>>>>>>>>>         well
>>>>>>>>>>>>>>>         and
>>>>>>>>>>>>>>>>>>         make
>>>>>>>>>>>>>>>>>>         sure they have a key type of K.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         The way I have put the kip together is to ensure that we
>>>>>         do
>>>>>>>>         type
>>>>>>>>>>>>>>>>         checking.
>>>>>>>>>>>>>>>>>>         I don't see a way we could group them all first and then
>>>>>>>>         make a
>>>>>>>>>>>>>         call
>>>>>>>>>>>>>>>         to
>>>>>>>>>>>>>>>>>>         count, reduce, or aggregate because with aggregate they
>>>>>>         would
>>>>>>>>>>>>         need
>>>>>>>>>>>>>         to
>>>>>>>>>>>>>>>>         pass
>>>>>>>>>>>>>>>>>>         a list of aggregators and we would have no way of type
>>>>>>>>         checking
>>>>>>>>>>>>>         that
>>>>>>>>>>>>>>>>         they
>>>>>>>>>>>>>>>>>>         match the grouped streams.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         Thanks,
>>>>>>>>>>>>>>>>>>         Kyle
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         On May 19, 2017 11:42 AM, "Xavier Léauté" <
>>>>>>>>         xavier@confluent.io <mailto:xavier@confluent.io>>
>>>>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>>>>>         Sorry to jump on this thread so late. I agree this is a
>>>>>>         very
>>>>>>>>>>>>>         useful
>>>>>>>>>>>>>>>>>>>         addition and wanted to provide an additional use-case
>>>>>         and
>>>>>>>>         some
>>>>>>>>>>>>>         more
>>>>>>>>>>>>>>>>>>>         comments.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         This is actually a very common analytics use-case in the
>>>>>>>>>>>>         ad-tech
>>>>>>>>>>>>>>>>>>         industry.
>>>>>>>>>>>>>>>>>>>         The typical setup will have an auction stream, an
>>>>>>         impression
>>>>>>>>>>>>>>>         stream,
>>>>>>>>>>>>>>>>>>         and a
>>>>>>>>>>>>>>>>>>>         click stream. Those three streams need to be combined to
>>>>>>>>>>>>         compute
>>>>>>>>>>>>>>>>>>         aggregate
>>>>>>>>>>>>>>>>>>>         statistics (e.g. impression statistics, and
>>>>>         click-through
>>>>>>>>>>>>         rates),
>>>>>>>>>>>>>>>>         since
>>>>>>>>>>>>>>>>>>>         most of the attributes of interest are only present the
>>>>>>>>         auction
>>>>>>>>>>>>>>>>         stream.
>>>>>>>>>>>>>>>>>>>         A simple way to do this is to co-group all the streams
>>>>>         by
>>>>>>>>         the
>>>>>>>>>>>>>>>         auction
>>>>>>>>>>>>>>>>>>         key,
>>>>>>>>>>>>>>>>>>>         and process updates to the co-group as events for each
>>>>>>>>         stream
>>>>>>>>>>>>>         come
>>>>>>>>>>>>>>>         in,
>>>>>>>>>>>>>>>>>>>         keeping only one value from each stream before sending
>>>>>>>>>>>>         downstream
>>>>>>>>>>>>>>>         for
>>>>>>>>>>>>>>>>>>>         further processing / aggregation.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         One could view the result of that co-group operation as
>>>>>         a
>>>>>>>>>>>>>         "KTable"
>>>>>>>>>>>>>>>>         with
>>>>>>>>>>>>>>>>>>>         multiple values per key. The key being the grouping key,
>>>>>>         and
>>>>>>>>>>>>         the
>>>>>>>>>>>>>>>>         values
>>>>>>>>>>>>>>>>>>>         consisting of one value per stream.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         What I like about Kyle's approach is that allows elegant
>>>>>>>>>>>>>>>         co-grouping
>>>>>>>>>>>>>>>>         of
>>>>>>>>>>>>>>>>>>>         multiple streams without having to worry about the
>>>>>         number
>>>>>>         of
>>>>>>>>>>>>>>>         streams,
>>>>>>>>>>>>>>>>>>         and
>>>>>>>>>>>>>>>>>>>         avoids dealing with Tuple types or other generic
>>>>>>         interfaces
>>>>>>>>>>>>         that
>>>>>>>>>>>>>>>         could
>>>>>>>>>>>>>>>>>>         get
>>>>>>>>>>>>>>>>>>>         messy if we wanted to preserve all the value types in
>>>>>         the
>>>>>>>>>>>>>         resulting
>>>>>>>>>>>>>>>>>>>         co-grouped stream.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         My only concern is that we only allow the cogroup +
>>>>>>>>         aggregate
>>>>>>>>>>>>>>>         combined
>>>>>>>>>>>>>>>>>>>         operation. This forces the user to build their own tuple
>>>>>>>>>>>>>>>         serialization
>>>>>>>>>>>>>>>>>>>         format if they want to preserve the individual input
>>>>>>         stream
>>>>>>>>>>>>>         values
>>>>>>>>>>>>>>>         as
>>>>>>>>>>>>>>>>         a
>>>>>>>>>>>>>>>>>>>         group. It also deviates quite a bit from our approach in
>>>>>>>>>>>>>>>>         KGroupedStream
>>>>>>>>>>>>>>>>>>>         which offers other operations, such as count and reduce,
>>>>>>>>         which
>>>>>>>>>>>>>>>         should
>>>>>>>>>>>>>>>>>>         also
>>>>>>>>>>>>>>>>>>>         be applicable to a co-grouped stream.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         Overall I still think this is a really useful addition,
>>>>>>         but
>>>>>>>>         I
>>>>>>>>>>>>>         feel
>>>>>>>>>>>>>>>         we
>>>>>>>>>>>>>>>>>>>         haven't spend much time trying to explore alternative
>>>>>         DSLs
>>>>>>>>         that
>>>>>>>>>>>>>>>         could
>>>>>>>>>>>>>>>>>>         maybe
>>>>>>>>>>>>>>>>>>>         generalize better or match our existing syntax more
>>>>>>         closely.
>>>>>>>>>>>>>>>>>>>         On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
>>>>>>>>>>>>>>>>         winkelman.kyle@gmail.com
>>>>>>>>>>>>>>>>         <mailto:winkelman.kyle@gmail.com>
>>>>>>>>>>>>>>>>>>>         wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>         Eno, is there anyone else that is an expert in the
>>>
>>>>>         kafka
>>>>>>>>>>>>>         streams
>>>>>>>>>>>>>>>>         realm
>>>>>>>>>>>>>>>>>>>         that
>>>>>>>>>>>>>>>>>>>>         I should reach out to for input?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>         I believe Damian Guy is still planning on reviewing
>>>>>         this
>>>>>>>>         more
>>>>>>>>>>>>>         in
>>>>>>>>>>>>>>>>         depth
>>>>>>>>>>>>>>>>>>>         so I
>>>
>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If 
>> you have received this message in error, please immediately notify 
>> the postmaster@openbet.com <mailto:postmaster@openbet.com> and delete 
>> it from your system as well as any copies. The content of e-mails as 
>> well as traffic data may be monitored by OpenBet for employment and 
>> security purposes. To protect the environment please do not print 
>> this e-mail unless necessary. OpenBet Ltd. Registered Office: 
>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, 
>> United Kingdom. A company registered in England and Wales. Registered 
>> no. 3134634. VAT no. GB927523612
>>
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Mime
View raw message