Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F1AC5200CA3 for ; Thu, 1 Jun 2017 11:03:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EFF2A160BC4; Thu, 1 Jun 2017 09:03:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 68276160BB5 for ; Thu, 1 Jun 2017 11:03:24 +0200 (CEST) Received: (qmail 8325 invoked by uid 500); 1 Jun 2017 09:03:23 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 8312 invoked by uid 99); 1 Jun 2017 09:03:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jun 2017 09:03:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 76E0B182170 for ; Thu, 1 Jun 2017 09:03:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.604 X-Spam-Level: X-Spam-Status: No, score=0.604 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.796, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Uo4N8LQUJkHW for ; Thu, 1 Jun 2017 09:03:12 +0000 (UTC) Received: from mail-vk0-f44.google.com (mail-vk0-f44.google.com [209.85.213.44]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C708B5F2A8 for ; Thu, 1 Jun 2017 09:03:11 +0000 (UTC) Received: by mail-vk0-f44.google.com with SMTP id x71so21297759vkd.0 for ; Thu, 01 Jun 2017 02:03:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=tjzoMGF7T+vObQ+YDLk0fK41A+zWUubwyia8AKUFUBU=; b=rp8RdnTcwmaXa2X0M9F4rbbUNUxBvuu4gkF9CVe56Bopdta4+n2KLBp9NfC1sssngh TFUsYCQSRtse8CE++M3XCGKgdXrSSN5WrLrmQZXB/0pqZsAq3RUaS5xZr8Bs4OR4pclH pGugzPp9GfVOUG4Qq0aDLDcMb42k44nOceiuQWK0/0buHjjy/8He83dyjxDR/SitfK8n syqwoWVUUjUoPcYPxb0EiGjX56r/W4duyCzVn7RODr9RKqdGcjyF+Hf9HESGb10wzh+u nRxnhSfO+nyBWTRcbRvDKOkCZnDnEx6QqPA4YtEQSixxqg6IXcuSQjDsGsXpv2VtbgYJ Rf5A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=tjzoMGF7T+vObQ+YDLk0fK41A+zWUubwyia8AKUFUBU=; b=W0W3jhX4nn6FtHIxc/wJndUVVNDh3BzJlzR+Uff8PncRQKufOQylPwUR6NCQrUfx0z EmDBpRCsFUcw62T2eWDX/x9L7fROrvzOM/NkKyq5UqEfA5RcteV5t4X4shVDVrEay3Og fieiWvdUql6rhM4DFeCzgqHHuOZc3R4zX6x5bPCSyVZ/2qhz9K90p129aJMRdc7lCdFL EMhE7ppE8tzStGsB4fu8y2BGf04xySrxu/ZOVhrknCjJs5DxqHXGFUXQP7o0F7OBbWWP vy2chEm4lqcCZ/7jFwUC867KADMUGDZ8M+HJh8+s8XRXW6NOEZnFEPKl6GhWyXbIFJ6Q n5zQ== X-Gm-Message-State: AODbwcCTRJeBsAUZ3YG/EH6xFFZeAGtYu5T9a4/4nz+QGfUpiAaxWkF0 lYZ+4xmXlDUa3wjhSzGkYZzZeL9DRzGO X-Received: by 10.31.139.198 with SMTP id n189mr194313vkd.20.1496307790981; Thu, 01 Jun 2017 02:03:10 -0700 (PDT) MIME-Version: 1.0 References: <7AF2068D-6EFA-474B-9B31-303264B6695E@gmail.com> <24c52411-bd49-9b16-e3b7-3a1a1dc0bc53@confluent.io> <6BBC2298-65F1-433E-8725-A832B4DEDD28@gmail.com> <5649A7D9-B08A-4AF7-B846-DCD08FD10027@gmail.com> In-Reply-To: From: Damian Guy Date: Thu, 01 Jun 2017 09:03:00 +0000 Message-ID: Subject: Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="001a11441bb85b4e790550e24eea" archived-at: Thu, 01 Jun 2017 09:03:26 -0000 --001a11441bb85b4e790550e24eea Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Kyle, Thanks for the update. I think just one initializer makes sense as it should only be called once per key and generally it is just going to create a new instance of whatever the Aggregate class is. Cheers, Damian On Wed, 31 May 2017 at 20:09 Kyle Winkelman wrote: > Hello all, > > I have spent some more time on this and the best alternative I have come = up > with is: > KGroupedStream has a single cogroup call that takes an initializer and an > aggregator. > CogroupedKStream has a cogroup call that takes additional groupedStream > aggregator pairs. > CogroupedKStream has multiple aggregate methods that create the different > stores. > > I plan on updating the kip but I want people's input on if we should have > the initializer be passed in once at the beginning or if we should instea= d > have the initializer be required for each call to one of the aggregate > calls. The first makes more sense to me but doesnt allow the user to > specify different initializers for different tables. > > Thanks, > Kyle > > On May 24, 2017 7:46 PM, "Kyle Winkelman" > wrote: > > > Yea I really like that idea I'll see what I can do to update the kip an= d > > my pr when I have some time. I'm not sure how well creating the > > kstreamaggregates will go though because at that point I will have thro= wn > > away the type of the values. It will be type safe I just may need to do= a > > little forcing. > > > > Thanks, > > Kyle > > > > On May 24, 2017 3:28 PM, "Guozhang Wang" wrote: > > > >> Kyle, > >> > >> Thanks for the explanations, my previous read on the wiki examples was > >> wrong. > >> > >> So I guess my motivation should be "reduced" to: can we move the windo= w > >> specs param from "KGroupedStream#cogroup(..)" to > >> "CogroupedKStream#aggregate(..)", and my motivations are: > >> > >> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to 2= . > >> 2. major: this is for extensibility of the APIs, and since we are > removing > >> the "Evolving" annotations on Streams it may be harder to change it > again > >> in the future. The extended use cases are that people wanted to have > >> windowed running aggregates on different granularities, e.g. "give me > the > >> counts per-minute, per-hour, per-day and per-week", and today in DSL w= e > >> need to specify that case in multiple aggregate operators, which gets = a > >> state store / changelog, etc. And it is possible to optimize it as wel= l > to > >> a single state store. Its implementation would be tricky as you need t= o > >> contain different lengthed windows within your window store but just > from > >> the public API point of view, it could be specified as: > >> > >> CogroupedKStream stream =3D stream1.cogroup(stream2, ... > >> "state-store-name"); > >> > >> table1 =3D stream.aggregate(/*per-minute window*/) > >> table2 =3D stream.aggregate(/*per-hour window*/) > >> table3 =3D stream.aggregate(/*per-day window*/) > >> > >> while underlying we are only using a single store "state-store-name" f= or > >> it. > >> > >> > >> Although this feature is out of the scope of this KIP, I'd like to > discuss > >> if we can "leave the door open" to make such changes without modifying > the > >> public APIs . > >> > >> Guozhang > >> > >> > >> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > winkelman.kyle@gmail.com > >> > > >> wrote: > >> > >> > I allow defining a single window/sessionwindow one time when you mak= e > >> the > >> > cogroup call from a KGroupedStream. From then on you are using the > >> cogroup > >> > call from with in CogroupedKStream which doesnt accept any additiona= l > >> > windows/sessionwindows. > >> > > >> > Is this what you meant by your question or did I misunderstand? > >> > > >> > On May 23, 2017 9:33 PM, "Guozhang Wang" wrote: > >> > > >> > Another question that came to me is on "window alignment": from the > KIP > >> it > >> > seems you are allowing users to specify a (potentially different) > window > >> > spec in each co-grouped input stream. So if these window specs are > >> > different how should we "align" them with different input streams? I > >> think > >> > it is more natural to only specify on window spec in the > >> > > >> > KTable CogroupedKStream#aggregate(Windows); > >> > > >> > > >> > And remove it from the cogroup() functions. WDYT? > >> > > >> > > >> > Guozhang > >> > > >> > On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang > >> wrote: > >> > > >> > > Thanks for the proposal Kyle, this is a quite common use case to > >> support > >> > > such multi-way table join (i.e. N source tables with N aggregate > func) > >> > with > >> > > a single store and N+1 serdes, I have seen lots of people using th= e > >> > > low-level PAPI to achieve this goal. > >> > > > >> > > > >> > > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > >> > winkelman.kyle@gmail.com > >> > > > wrote: > >> > > > >> > >> I like your point about not handling other cases such as count an= d > >> > reduce. > >> > >> > >> > >> I think that reduce may not make sense because reduce assumes tha= t > >> the > >> > >> input values are the same as the output values. With cogroup ther= e > >> may > >> > be > >> > >> multiple different input types and then your output type cant be > >> > multiple > >> > >> different things. In the case where you have all matching value > types > >> > you > >> > >> can do KStreamBuilder#merge followed by the reduce. > >> > >> > >> > >> As for count I think it is possible to call count on all the > >> individual > >> > >> grouped streams and then do joins. Otherwise we could maybe make = a > >> > special > >> > >> call in groupedstream for this case. Because in this case we dont > >> need > >> > to > >> > >> do type checking on the values. It could be similar to the curren= t > >> count > >> > >> methods but accept a var args of additonal grouped streams as wel= l > >> and > >> > >> make > >> > >> sure they have a key type of K. > >> > >> > >> > >> The way I have put the kip together is to ensure that we do type > >> > checking. > >> > >> I don't see a way we could group them all first and then make a > call > >> to > >> > >> count, reduce, or aggregate because with aggregate they would nee= d > to > >> > pass > >> > >> a list of aggregators and we would have no way of type checking > that > >> > they > >> > >> match the grouped streams. > >> > >> > >> > >> Thanks, > >> > >> Kyle > >> > >> > >> > >> On May 19, 2017 11:42 AM, "Xavier L=C3=A9aut=C3=A9" > >> wrote: > >> > >> > >> > >> > Sorry to jump on this thread so late. I agree this is a very > useful > >> > >> > addition and wanted to provide an additional use-case and some > more > >> > >> > comments. > >> > >> > > >> > >> > This is actually a very common analytics use-case in the ad-tec= h > >> > >> industry. > >> > >> > The typical setup will have an auction stream, an impression > >> stream, > >> > >> and a > >> > >> > click stream. Those three streams need to be combined to comput= e > >> > >> aggregate > >> > >> > statistics (e.g. impression statistics, and click-through rates= ), > >> > since > >> > >> > most of the attributes of interest are only present the auction > >> > stream. > >> > >> > > >> > >> > A simple way to do this is to co-group all the streams by the > >> auction > >> > >> key, > >> > >> > and process updates to the co-group as events for each stream > come > >> in, > >> > >> > keeping only one value from each stream before sending downstre= am > >> for > >> > >> > further processing / aggregation. > >> > >> > > >> > >> > One could view the result of that co-group operation as a > "KTable" > >> > with > >> > >> > multiple values per key. The key being the grouping key, and th= e > >> > values > >> > >> > consisting of one value per stream. > >> > >> > > >> > >> > What I like about Kyle's approach is that allows elegant > >> co-grouping > >> > of > >> > >> > multiple streams without having to worry about the number of > >> streams, > >> > >> and > >> > >> > avoids dealing with Tuple types or other generic interfaces tha= t > >> could > >> > >> get > >> > >> > messy if we wanted to preserve all the value types in the > resulting > >> > >> > co-grouped stream. > >> > >> > > >> > >> > My only concern is that we only allow the cogroup + aggregate > >> combined > >> > >> > operation. This forces the user to build their own tuple > >> serialization > >> > >> > format if they want to preserve the individual input stream > values > >> as > >> > a > >> > >> > group. It also deviates quite a bit from our approach in > >> > KGroupedStream > >> > >> > which offers other operations, such as count and reduce, which > >> should > >> > >> also > >> > >> > be applicable to a co-grouped stream. > >> > >> > > >> > >> > Overall I still think this is a really useful addition, but I > feel > >> we > >> > >> > haven't spend much time trying to explore alternative DSLs that > >> could > >> > >> maybe > >> > >> > generalize better or match our existing syntax more closely. > >> > >> > > >> > >> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < > >> > winkelman.kyle@gmail.com > >> > >> > > >> > >> > wrote: > >> > >> > > >> > >> > > Eno, is there anyone else that is an expert in the kafka > streams > >> > realm > >> > >> > that > >> > >> > > I should reach out to for input? > >> > >> > > > >> > >> > > I believe Damian Guy is still planning on reviewing this more > in > >> > depth > >> > >> > so I > >> > >> > > will wait for his inputs before continuing. > >> > >> > > > >> > >> > > On May 9, 2017 7:30 AM, "Eno Thereska" > > >> > >> wrote: > >> > >> > > > >> > >> > > > Thanks Kyle, good arguments. > >> > >> > > > > >> > >> > > > Eno > >> > >> > > > > >> > >> > > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman < > >> > >> winkelman.kyle@gmail.com > >> > >> > > > >> > >> > > > wrote: > >> > >> > > > > > >> > >> > > > > *- minor: could you add an exact example (similar to what > >> Jay=E2=80=99s > >> > >> > example > >> > >> > > > is, > >> > >> > > > > or like your Spark/Pig pointers had) to make this super > >> > concrete?* > >> > >> > > > > I have added a more concrete example to the KIP. > >> > >> > > > > > >> > >> > > > > *- my main concern is that we=E2=80=99re exposing this op= timization > >> to > >> > the > >> > >> > DSL. > >> > >> > > > In > >> > >> > > > > an ideal world, an optimizer would take the existing DSL > and > >> do > >> > >> the > >> > >> > > right > >> > >> > > > > thing under the covers (create just one state store, > arrange > >> the > >> > >> > nodes > >> > >> > > > > etc). The original DSL had a bunch of small, composable > >> pieces > >> > >> > (group, > >> > >> > > > > aggregate, join) that this proposal groups together. I=E2= =80=99d > >> like to > >> > >> hear > >> > >> > > > your > >> > >> > > > > thoughts on whether it=E2=80=99s possible to do this opti= mization > >> with > >> > the > >> > >> > > > current > >> > >> > > > > DSL, at the topology builder level.* > >> > >> > > > > You would have to make a lot of checks to understand if i= t > is > >> > even > >> > >> > > > possible > >> > >> > > > > to make this optimization: > >> > >> > > > > 1. Make sure they are all KTableKTableOuterJoins > >> > >> > > > > 2. None of the intermediate KTables are used for anything > >> else. > >> > >> > > > > 3. None of the intermediate stores are used. (This may be > >> > >> impossible > >> > >> > > > > especially if they use KafkaStreams#store after the > topology > >> has > >> > >> > > already > >> > >> > > > > been built.) > >> > >> > > > > You would then need to make decisions during the > >> optimization: > >> > >> > > > > 1. Your new initializer would the composite of all the > >> > individual > >> > >> > > > > initializers and the valueJoiners. > >> > >> > > > > 2. I am having a hard time thinking about how you would > turn > >> the > >> > >> > > > > aggregators and valueJoiners into an aggregator that woul= d > >> work > >> > on > >> > >> > the > >> > >> > > > > final object, but this may be possible. > >> > >> > > > > 3. Which state store would you use? The ones declared wou= ld > >> be > >> > for > >> > >> > the > >> > >> > > > > aggregate values. None of the declared ones would be > >> guaranteed > >> > to > >> > >> > hold > >> > >> > > > the > >> > >> > > > > final object. This would mean you must created a new stat= e > >> store > >> > >> and > >> > >> > > not > >> > >> > > > > created any of the declared ones. > >> > >> > > > > > >> > >> > > > > The main argument I have against it is even if it could b= e > >> done > >> > I > >> > >> > don't > >> > >> > > > > know that we would want to have this be an optimization i= n > >> the > >> > >> > > background > >> > >> > > > > because the user would still be required to think about a= ll > >> of > >> > the > >> > >> > > > > intermediate values that they shouldn't need to worry abo= ut > >> if > >> > >> they > >> > >> > > only > >> > >> > > > > care about the final object. > >> > >> > > > > > >> > >> > > > > In my opinion cogroup is a common enough case that it > should > >> be > >> > >> part > >> > >> > of > >> > >> > > > the > >> > >> > > > > composable pieces (group, aggregate, join) because we wan= t > to > >> > >> allow > >> > >> > > > people > >> > >> > > > > to join more than 2 or more streams in an easy way. Right > >> now I > >> > >> don't > >> > >> > > > think > >> > >> > > > > we give them ways of handling this use case easily. > >> > >> > > > > > >> > >> > > > > *-I think there will be scope for several such > optimizations > >> in > >> > >> the > >> > >> > > > future > >> > >> > > > > and perhaps at some point we need to think about decoupli= ng > >> the > >> > >> 1:1 > >> > >> > > > mapping > >> > >> > > > > from the DSL into the physical topology.* > >> > >> > > > > I would argue that cogroup is not just an optimization it > is > >> a > >> > new > >> > >> > way > >> > >> > > > for > >> > >> > > > > the users to look at accomplishing a problem that require= s > >> > >> multiple > >> > >> > > > > streams. I may sound like a broken record but I don't thi= nk > >> > users > >> > >> > > should > >> > >> > > > > have to build the N-1 intermediate tables and deal with > their > >> > >> > > > initializers, > >> > >> > > > > serdes and stores if all they care about is the final > object. > >> > >> > > > > Now if for example someone uses cogroup but doesn't suppl= y > >> > >> additional > >> > >> > > > > streams and aggregators this case is equivalent to a sing= le > >> > >> grouped > >> > >> > > > stream > >> > >> > > > > making an aggregate call. This case is what I view an > >> > optimization > >> > >> > as, > >> > >> > > we > >> > >> > > > > could remove the KStreamCogroup and act as if there was > just > >> a > >> > >> call > >> > >> > to > >> > >> > > > > KGroupedStream#aggregate instead of calling > >> > >> KGroupedStream#cogroup. > >> > >> > (I > >> > >> > > > > would prefer to just write a warning saying that this is > not > >> how > >> > >> > > cogroup > >> > >> > > > is > >> > >> > > > > to be used.) > >> > >> > > > > > >> > >> > > > > Thanks, > >> > >> > > > > Kyle > >> > >> > > > > > >> > >> > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > >> > >> eno.thereska@gmail.com > >> > >> > > > >> > >> > > > wrote: > >> > >> > > > > > >> > >> > > > >> Hi Kyle, > >> > >> > > > >> > >> > >> > > > >> Thanks for the KIP again. A couple of comments: > >> > >> > > > >> > >> > >> > > > >> - minor: could you add an exact example (similar to what > >> Jay=E2=80=99s > >> > >> > example > >> > >> > > > is, > >> > >> > > > >> or like your Spark/Pig pointers had) to make this super > >> > concrete? > >> > >> > > > >> > >> > >> > > > >> - my main concern is that we=E2=80=99re exposing this op= timization > >> to > >> > the > >> > >> > DSL. > >> > >> > > > In > >> > >> > > > >> an ideal world, an optimizer would take the existing DSL > >> and do > >> > >> the > >> > >> > > > right > >> > >> > > > >> thing under the covers (create just one state store, > arrange > >> > the > >> > >> > nodes > >> > >> > > > >> etc). The original DSL had a bunch of small, composable > >> pieces > >> > >> > (group, > >> > >> > > > >> aggregate, join) that this proposal groups together. I= =E2=80=99d > >> like > >> > to > >> > >> > hear > >> > >> > > > your > >> > >> > > > >> thoughts on whether it=E2=80=99s possible to do this opt= imization > >> with > >> > >> the > >> > >> > > > current > >> > >> > > > >> DSL, at the topology builder level. > >> > >> > > > >> > >> > >> > > > >> I think there will be scope for several such optimizatio= ns > >> in > >> > the > >> > >> > > future > >> > >> > > > >> and perhaps at some point we need to think about > decoupling > >> the > >> > >> 1:1 > >> > >> > > > mapping > >> > >> > > > >> from the DSL into the physical topology. > >> > >> > > > >> > >> > >> > > > >> Thanks > >> > >> > > > >> Eno > >> > >> > > > >> > >> > >> > > > >>> On May 5, 2017, at 4:39 PM, Jay Kreps > >> > wrote: > >> > >> > > > >>> > >> > >> > > > >>> I haven't digested the proposal but the use case is > pretty > >> > >> common. > >> > >> > An > >> > >> > > > >>> example would be the "customer 360" or "unified custome= r > >> > >> profile" > >> > >> > use > >> > >> > > > >> case > >> > >> > > > >>> we often use. In that use case you have a dozen systems > >> each > >> > of > >> > >> > which > >> > >> > > > has > >> > >> > > > >>> some information about your customer (account details, > >> > settings, > >> > >> > > > billing > >> > >> > > > >>> info, customer service contacts, purchase history, etc)= . > >> Your > >> > >> goal > >> > >> > is > >> > >> > > > to > >> > >> > > > >>> join/munge these into a single profile record for each > >> > customer > >> > >> > that > >> > >> > > > has > >> > >> > > > >>> all the relevant info in a usable form and is up-to-dat= e > >> with > >> > >> all > >> > >> > the > >> > >> > > > >>> source systems. If you implement that with kstreams as = a > >> > >> sequence > >> > >> > of > >> > >> > > > >> joins > >> > >> > > > >>> i think today we'd fully materialize N-1 intermediate > >> tables. > >> > >> But > >> > >> > > > clearly > >> > >> > > > >>> you only need a single stage to group all these things > that > >> > are > >> > >> > > already > >> > >> > > > >>> co-partitioned. A distributed database would do this > under > >> the > >> > >> > covers > >> > >> > > > >> which > >> > >> > > > >>> is arguably better (at least when it does the right > thing) > >> and > >> > >> > > perhaps > >> > >> > > > we > >> > >> > > > >>> could do the same thing but I'm not sure we know the > >> > >> partitioning > >> > >> > so > >> > >> > > we > >> > >> > > > >> may > >> > >> > > > >>> need an explicit cogroup command that impllies they are > >> > already > >> > >> > > > >>> co-partitioned. > >> > >> > > > >>> > >> > >> > > > >>> -Jay > >> > >> > > > >>> > >> > >> > > > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > >> > >> > > > winkelman.kyle@gmail.com > >> > >> > > > >>> > >> > >> > > > >>> wrote: > >> > >> > > > >>> > >> > >> > > > >>>> Yea thats a good way to look at it. > >> > >> > > > >>>> I have seen this type of functionality in a couple oth= er > >> > >> platforms > >> > >> > > > like > >> > >> > > > >>>> spark and pig. > >> > >> > > > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > >> > >> > > > >> PairRDDFunctions.html > >> > >> > > > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > >> > >> > > > >> cogroup_operator.htm > >> > >> > > > >>>> > >> > >> > > > >>>> > >> > >> > > > >>>> On May 5, 2017 7:43 AM, "Damian Guy" < > >> damian.guy@gmail.com> > >> > >> > wrote: > >> > >> > > > >>>> > >> > >> > > > >>>>> Hi Kyle, > >> > >> > > > >>>>> > >> > >> > > > >>>>> If i'm reading this correctly it is like an N way out= er > >> > join? > >> > >> So > >> > >> > an > >> > >> > > > >> input > >> > >> > > > >>>>> on any stream will always produce a new aggregated > value > >> - > >> > is > >> > >> > that > >> > >> > > > >>>> correct? > >> > >> > > > >>>>> Effectively, each Aggregator just looks up the curren= t > >> > value, > >> > >> > > > >> aggregates > >> > >> > > > >>>>> and forwards the result. > >> > >> > > > >>>>> I need to look into it and think about it a bit more, > >> but it > >> > >> > seems > >> > >> > > > like > >> > >> > > > >>>> it > >> > >> > > > >>>>> could be a useful optimization. > >> > >> > > > >>>>> > >> > >> > > > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > >> > >> > > winkelman.kyle@gmail.com > >> > >> > > > > > >> > >> > > > >>>>> wrote: > >> > >> > > > >>>>> > >> > >> > > > >>>>>> I sure can. I have added the following description t= o > my > >> > >> KIP. If > >> > >> > > > this > >> > >> > > > >>>>>> doesn't help let me know and I will take some more > time > >> to > >> > >> > build a > >> > >> > > > >>>>> diagram > >> > >> > > > >>>>>> and make more of a step by step description: > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> Example with Current API: > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> KTable table1 =3D > >> > >> > > > >>>>>> builder.stream("topic1").groupByKey().aggregate( > >> > initializer1 > >> > >> , > >> > >> > > > >>>>> aggregator1, > >> > >> > > > >>>>>> aggValueSerde1, storeName1); > >> > >> > > > >>>>>> KTable table2 =3D > >> > >> > > > >>>>>> builder.stream("topic2").groupByKey().aggregate( > >> > initializer2 > >> > >> , > >> > >> > > > >>>>> aggregator2, > >> > >> > > > >>>>>> aggValueSerde2, storeName2); > >> > >> > > > >>>>>> KTable table3 =3D > >> > >> > > > >>>>>> builder.stream("topic3").groupByKey().aggregate( > >> > initializer3 > >> > >> , > >> > >> > > > >>>>> aggregator3, > >> > >> > > > >>>>>> aggValueSerde3, storeName3); > >> > >> > > > >>>>>> KTable cogrouped =3D table1.outerJoin(table2, > >> > >> > > > >>>>>> joinerOneAndTwo).outerJoin(table3, > >> joinerOneTwoAndThree); > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> As you can see this creates 3 StateStores, requires = 3 > >> > >> > > initializers, > >> > >> > > > >>>> and 3 > >> > >> > > > >>>>>> aggValueSerdes. This also adds the pressure to user = to > >> > define > >> > >> > what > >> > >> > > > the > >> > >> > > > >>>>>> intermediate values are going to be (V1, V2, V3). Th= ey > >> are > >> > >> left > >> > >> > > > with a > >> > >> > > > >>>>>> couple choices, first to make V1, V2, and V3 all the > >> same > >> > as > >> > >> CG > >> > >> > > and > >> > >> > > > >> the > >> > >> > > > >>>>> two > >> > >> > > > >>>>>> joiners are more like mergers, or second make them > >> > >> intermediate > >> > >> > > > states > >> > >> > > > >>>>> such > >> > >> > > > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joine= rs > >> use > >> > >> those > >> > >> > > to > >> > >> > > > >>>> build > >> > >> > > > >>>>>> the final aggregate CG value. This is something the > user > >> > >> could > >> > >> > > avoid > >> > >> > > > >>>>>> thinking about with this KIP. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it wil= l > >> first > >> > >> go > >> > >> > > > through > >> > >> > > > >>>> a > >> > >> > > > >>>>>> KStreamAggregate grabbing the current aggregate from > >> > >> storeName1. > >> > >> > > It > >> > >> > > > >>>> will > >> > >> > > > >>>>>> produce this in the form of the first intermediate > value > >> > and > >> > >> get > >> > >> > > > sent > >> > >> > > > >>>>>> through a KTableKTableOuterJoin where it will look u= p > >> the > >> > >> > current > >> > >> > > > >> value > >> > >> > > > >>>>> of > >> > >> > > > >>>>>> the key in storeName2. It will use the first joiner = to > >> > >> calculate > >> > >> > > the > >> > >> > > > >>>>> second > >> > >> > > > >>>>>> intermediate value, which will go through an > additional > >> > >> > > > >>>>>> KTableKTableOuterJoin. Here it will look up the > current > >> > >> value of > >> > >> > > the > >> > >> > > > >>>> key > >> > >> > > > >>>>> in > >> > >> > > > >>>>>> storeName3 and use the second joiner to build the > final > >> > >> > aggregate > >> > >> > > > >>>> value. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> If you think through all possibilities for incoming > >> topics > >> > >> you > >> > >> > > will > >> > >> > > > >> see > >> > >> > > > >>>>>> that no matter which topic it comes in through all > three > >> > >> stores > >> > >> > > are > >> > >> > > > >>>>> queried > >> > >> > > > >>>>>> and all of the joiners must get used. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> Topology wise for N incoming streams this creates N > >> > >> > > > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, a= nd > >> N-1 > >> > >> > > > >>>>>> KTableKTableJoinMergers. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> Example with Proposed API: > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> KGroupedStream grouped1 =3D > >> builder.stream("topic1"). > >> > >> > > > >>>> groupByKey(); > >> > >> > > > >>>>>> KGroupedStream grouped2 =3D > >> builder.stream("topic2"). > >> > >> > > > >>>> groupByKey(); > >> > >> > > > >>>>>> KGroupedStream grouped3 =3D > >> builder.stream("topic3"). > >> > >> > > > >>>> groupByKey(); > >> > >> > > > >>>>>> KTable cogrouped =3D > grouped1.cogroup(initializer1, > >> > >> > > > aggregator1, > >> > >> > > > >>>>>> aggValueSerde1, storeName1) > >> > >> > > > >>>>>> .cogroup(grouped2, aggregator2) > >> > >> > > > >>>>>> .cogroup(grouped3, aggregator3) > >> > >> > > > >>>>>> .aggregate(); > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> As you can see this creates 1 StateStore, requires 1 > >> > >> > initializer, > >> > >> > > > and > >> > >> > > > >> 1 > >> > >> > > > >>>>>> aggValueSerde. The user no longer has to worry about > the > >> > >> > > > intermediate > >> > >> > > > >>>>>> values and the joiners. All they have to think about > is > >> how > >> > >> each > >> > >> > > > >> stream > >> > >> > > > >>>>>> impacts the creation of the final CG object. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it wil= l > >> first > >> > >> go > >> > >> > > > through > >> > >> > > > >>>> a > >> > >> > > > >>>>>> KStreamAggreagte and grab the current aggregate from > >> > >> storeName1. > >> > >> > > It > >> > >> > > > >>>> will > >> > >> > > > >>>>>> add its incoming object to the aggregate, update the > >> store > >> > >> and > >> > >> > > pass > >> > >> > > > >> the > >> > >> > > > >>>>> new > >> > >> > > > >>>>>> aggregate on. This new aggregate goes through the > >> > >> KStreamCogroup > >> > >> > > > which > >> > >> > > > >>>> is > >> > >> > > > >>>>>> pretty much just a pass through processor and you ar= e > >> done. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> Topology wise for N incoming streams the new api wil= l > >> only > >> > >> every > >> > >> > > > >>>> create N > >> > >> > > > >>>>>> KStreamAggregates and 1 KStreamCogroup. > >> > >> > > > >>>>>> > >> > >> > > > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > >> > >> > > > >> matthias@confluent.io > >> > >> > > > >>>>> > >> > >> > > > >>>>>> wrote: > >> > >> > > > >>>>>> > >> > >> > > > >>>>>>> Kyle, > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, > >> but I > >> > >> could > >> > >> > > not > >> > >> > > > >>>>>>> follow completely. Could you maybe add a more > concrete > >> > >> example, > >> > >> > > > like > >> > >> > > > >>>> 3 > >> > >> > > > >>>>>>> streams with 3 records each (plus expected result), > and > >> > show > >> > >> > the > >> > >> > > > >>>>>>> difference between current way to to implement it a= nd > >> the > >> > >> > > proposed > >> > >> > > > >>>> API? > >> > >> > > > >>>>>>> This could also cover the internal processing to se= e > >> what > >> > >> store > >> > >> > > > calls > >> > >> > > > >>>>>>> would be required for both approaches etc. > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> I think, it's pretty advanced stuff you propose, an= d > it > >> > >> would > >> > >> > > help > >> > >> > > > to > >> > >> > > > >>>>>>> understand it better. > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> Thanks a lot! > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> -Matthias > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > >> > >> > > > >>>>>>>> I have made a pull request. It can be found here. > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>> https://github.com/apache/kafka/pull/2975 > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>> I plan to write some more unit tests for my classe= s > >> and > >> > get > >> > >> > > around > >> > >> > > > >>>> to > >> > >> > > > >>>>>>>> writing documentation for the public api additions= . > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>> One thing I was curious about is during the > >> > >> > > > >>>>>>> KCogroupedStreamImpl#aggregate > >> > >> > > > >>>>>>>> method I pass null to the KGroupedStream# > >> > >> > repartitionIfRequired > >> > >> > > > >>>>> method. > >> > >> > > > >>>>>> I > >> > >> > > > >>>>>>>> can't supply the store name because if more than o= ne > >> > >> grouped > >> > >> > > > stream > >> > >> > > > >>>>>>>> repartitions an error is thrown. Is there some nam= e > >> that > >> > >> > someone > >> > >> > > > >>>> can > >> > >> > > > >>>>>>>> recommend or should I leave the null and allow it = to > >> fall > >> > >> back > >> > >> > > to > >> > >> > > > >>>> the > >> > >> > > > >>>>>>>> KGroupedStream.name? > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>> Should this be expanded to handle grouped tables? > This > >> > >> would > >> > >> > be > >> > >> > > > >>>>> pretty > >> > >> > > > >>>>>>> easy > >> > >> > > > >>>>>>>> for a normal aggregate but one allowing session > stores > >> > and > >> > >> > > > windowed > >> > >> > > > >>>>>>> stores > >> > >> > > > >>>>>>>> would required KTableSessionWindowAggregate and > >> > >> > > > >>>> KTableWindowAggregate > >> > >> > > > >>>>>>>> implementations. > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>> Thanks, > >> > >> > > > >>>>>>>> Kyle > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > >> > >> > eno.thereska@gmail.com> > >> > >> > > > >>>>> wrote: > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>>>> I=E2=80=99ll look as well asap, sorry, been swamp= ed. > >> > >> > > > >>>>>>>>> > >> > >> > > > >>>>>>>>> Eno > >> > >> > > > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > >> > >> > damian.guy@gmail.com> > >> > >> > > > >>>>> wrote: > >> > >> > > > >>>>>>>>>> > >> > >> > > > >>>>>>>>>> Hi Kyle, > >> > >> > > > >>>>>>>>>> > >> > >> > > > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't h= ad > >> the > >> > >> > chance > >> > >> > > to > >> > >> > > > >>>>> look > >> > >> > > > >>>>>>> at > >> > >> > > > >>>>>>>>>> the KIP yet, but will schedule some time to look > >> into > >> > it > >> > >> > > > >>>> tomorrow. > >> > >> > > > >>>>>> For > >> > >> > > > >>>>>>>>> the > >> > >> > > > >>>>>>>>>> implementation, can you raise a PR against kafka > >> trunk > >> > >> and > >> > >> > > mark > >> > >> > > > >>>> it > >> > >> > > > >>>>> as > >> > >> > > > >>>>>>>>> WIP? > >> > >> > > > >>>>>>>>>> It will be easier to review what you have done. > >> > >> > > > >>>>>>>>>> > >> > >> > > > >>>>>>>>>> Thanks, > >> > >> > > > >>>>>>>>>> Damian > >> > >> > > > >>>>>>>>>> > >> > >> > > > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > >> > >> > > > >>>>> winkelman.kyle@gmail.com > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>>>> wrote: > >> > >> > > > >>>>>>>>>> > >> > >> > > > >>>>>>>>>>> I am replying to this in hopes it will draw som= e > >> > >> attention > >> > >> > to > >> > >> > > > my > >> > >> > > > >>>>> KIP > >> > >> > > > >>>>>>> as > >> > >> > > > >>>>>>>>> I > >> > >> > > > >>>>>>>>>>> haven't heard from anyone in a couple days. Thi= s > >> is my > >> > >> > first > >> > >> > > > KIP > >> > >> > > > >>>>> and > >> > >> > > > >>>>>>> my > >> > >> > > > >>>>>>>>>>> first large contribution to the project so I'm > >> sure I > >> > >> did > >> > >> > > > >>>>> something > >> > >> > > > >>>>>>>>> wrong. > >> > >> > > > >>>>>>>>>>> ;) > >> > >> > > > >>>>>>>>>>> > >> > >> > > > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > >> > >> > > > >>>>> winkelman.kyle@gmail.com> > >> > >> > > > >>>>>>>>> wrote: > >> > >> > > > >>>>>>>>>>> > >> > >> > > > >>>>>>>>>>>> Hello all, > >> > >> > > > >>>>>>>>>>>> > >> > >> > > > >>>>>>>>>>>> I have created KIP-150 to facilitate discussio= n > >> about > >> > >> > adding > >> > >> > > > >>>>>> cogroup > >> > >> > > > >>>>>>> to > >> > >> > > > >>>>>>>>>>>> the streams DSL. > >> > >> > > > >>>>>>>>>>>> > >> > >> > > > >>>>>>>>>>>> Please find the KIP here: > >> > >> > > > >>>>>>>>>>>> https://cwiki.apache.org/ > >> > confluence/display/KAFKA/KIP- > >> > >> > > > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > >> > >> > > > >>>>>>>>>>>> > >> > >> > > > >>>>>>>>>>>> Please find my initial implementation here: > >> > >> > > > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > >> > >> > > > >>>>>>>>>>>> > >> > >> > > > >>>>>>>>>>>> Thanks, > >> > >> > > > >>>>>>>>>>>> Kyle Winkelman > >> > >> > > > >>>>>>>>>>>> > >> > >> > > > >>>>>>>>>>> > >> > >> > > > >>>>>>>>> > >> > >> > > > >>>>>>>>> > >> > >> > > > >>>>>>>> > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>>> > >> > >> > > > >>>>>> > >> > >> > > > >>>>> > >> > >> > > > >>>> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > --001a11441bb85b4e790550e24eea--