Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 71AAB200C04 for ; Tue, 24 Jan 2017 20:30:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 70177160B5B; Tue, 24 Jan 2017 19:30:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 44E73160B38 for ; Tue, 24 Jan 2017 20:30:41 +0100 (CET) Received: (qmail 95375 invoked by uid 500); 24 Jan 2017 19:30:40 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 95350 invoked by uid 99); 24 Jan 2017 19:30:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 19:30:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 648AFC0333 for ; Tue, 24 Jan 2017 19:30:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.499 X-Spam-Level: X-Spam-Status: No, score=0.499 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id EYKjOP7LqnP0 for ; Tue, 24 Jan 2017 19:30:36 +0000 (UTC) Received: from mail-pf0-f172.google.com (mail-pf0-f172.google.com [209.85.192.172]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 6109B5F46D for ; Tue, 24 Jan 2017 19:30:35 +0000 (UTC) Received: by mail-pf0-f172.google.com with SMTP id y143so52134675pfb.0 for ; Tue, 24 Jan 2017 11:30:35 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=subject:to:references:from:organization:message-id:date:user-agent :mime-version:in-reply-to; bh=TjZykVZtGdGCXX/QjfnzS+Gb2xAVCQNF+/lnLXYsfRs=; b=tde3zNlx2AXaS90ENlbXGxqD/H+ybCoM+0neUnYTdfIhNvqLrbsUa2FDh4HB7Tu27Y /+5VCucSt54G7+A2X1NYzZkZ98klF4aC2PA4l7SG+53TMamMc1lUD5XrYUMzJNnpl4eX 76D2Do0Or3Mu69vK/fSp4Mm//q+F7g6rJ+btdRT8kw92sFS1bZk32RwnAABSOCoR+BG3 cjntXPViybYJYyXS8iLFMHQRsMCFQzWm/pImxZQFs0FYQdS97QPiOVVW4QmdDP1q7J8s XX6cA9SsKn2PgIBK8joUIWMFfQDy5WN+c+3vvJnZw0tsRRVSQyF+eG/QabjoR68teaGh vK3Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:subject:to:references:from:organization :message-id:date:user-agent:mime-version:in-reply-to; bh=TjZykVZtGdGCXX/QjfnzS+Gb2xAVCQNF+/lnLXYsfRs=; b=c4ZAdN8MIDBjmGCo9sQcKi6QSL3epF3vboTdXORXc0FNItfRCdzl+gwuZEe3TVan5a BKhM83jBMXymor50w4ohKOfaPPY7aJYGrCtC6SOo6zLHjAAEfAMRNVcXdm6Zq7lKt1yL 6RerdXs4PZZrghggZwuSuPBvr573Y0g+cO5o8soeNxzlnEzi18i7gkDs4+ZDOLVmlTaW 5Bqff0zog9kV7Ip1p1XbK+3S+HSH+CmM9VqIm1tL8zwV5zSetLTYhbikMtg9LXib3KbX lPDLJZO7dDabEOlhd5nQrdUUNGMKx+ocpj1x6eaQ07UuMkWeD9B3YELheskmUOguj8tg MlBA== X-Gm-Message-State: AIkVDXLUigq6ln+wB0WwLK8fRTSL4iTljv3HZW3UJPs2Q+oqXnkNnwqMcIi2O0scZNWxF1SR X-Received: by 10.98.196.202 with SMTP id h71mr40709254pfk.66.1485286212526; Tue, 24 Jan 2017 11:30:12 -0800 (PST) Received: from Matthias-Sax-Macbook-Pro.local ([216.3.4.180]) by smtp.gmail.com with ESMTPSA id h17sm46589733pfh.62.2017.01.24.11.30.11 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 24 Jan 2017 11:30:11 -0800 (PST) Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved semantics To: dev@kafka.apache.org References: <800777E2-F966-45AC-9F95-12A093FB38B2@gmail.com> <78A844AC-4F17-450F-BE16-4B81DA673649@gmail.com> <6b22fbdf-c166-f557-22d5-acf6a9a1a1fb@confluent.io> From: "Matthias J. Sax" Organization: Confluent Inc Message-ID: Date: Tue, 24 Jan 2017 11:30:10 -0800 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:45.0) Gecko/20100101 Thunderbird/45.6.0 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/signed; micalg=pgp-sha512; protocol="application/pgp-signature"; boundary="9CmHVr3emA6fTfPrtNv6lD9vvU7OmmHeH" archived-at: Tue, 24 Jan 2017 19:30:42 -0000 --9CmHVr3emA6fTfPrtNv6lD9vvU7OmmHeH Content-Type: multipart/mixed; boundary="OeqVNVOa4tjlR89kuXgvuWvPGeurlkT0q"; protected-headers="v1" From: "Matthias J. Sax" To: dev@kafka.apache.org Message-ID: Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved semantics References: <800777E2-F966-45AC-9F95-12A093FB38B2@gmail.com> <78A844AC-4F17-450F-BE16-4B81DA673649@gmail.com> <6b22fbdf-c166-f557-22d5-acf6a9a1a1fb@confluent.io> In-Reply-To: --OeqVNVOa4tjlR89kuXgvuWvPGeurlkT0q Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable That not what I meant by "huge impact". I refer to the actions related to materialize a KTable: creating a RocksDB store and a changelog topic -- users should be aware about runtime implication and this is better expressed by an explicit method call, rather than implicitly triggered by using a different overload of a method. -Matthias On 1/24/17 1:35 AM, Damian Guy wrote: > I think your definition of a huge impact and mine are rather different = ;-P > Overloading a few methods is not really a huge impact IMO. It is also = a > sacrifice worth making for readability, usability of the API. >=20 > On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax wr= ote: >=20 >> I understand your argument, but do not agree with it. >> >> Your first version (even if the "flow" is not as nice) is more explici= t >> than the second version. Adding a stateStoreName parameter is quite >> implicit but has a huge impact -- thus, I prefer the rather more verbo= se >> but explicit version. >> >> >> -Matthias >> >> On 1/23/17 1:39 AM, Damian Guy wrote: >>> I'm not a fan of materialize. I think it interrupts the flow, i.e, >>> >>> table.mapValue(..).materialize().join(..).materialize() >>> compared to: >>> table.mapValues(..).join(..) >>> >>> I know which one i prefer. >>> My preference is stil to provide overloaded methods where people can >>> specify the store names if they want, otherwise we just generate them= =2E >>> >>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax >> wrote: >>> >>>> Hi, >>>> >>>> thanks for the KIP Eno! Here are my 2 cents: >>>> >>>> 1) I like Guozhang's proposal about removing store name from all KTa= ble >>>> methods and generate internal names (however, I would do this as >>>> overloads). Furthermore, I would not force users to call .materializ= e() >>>> if they want to query a store, but add one more method .stateStoreNa= me() >>>> that returns the store name if the KTable is materialized. Thus, als= o >>>> .materialize() must not necessarily have a parameter storeName (ie, = we >>>> should have some overloads here). >>>> >>>> I would also not allow to provide a null store name (to indicate no >>>> materialization if not necessary) but throw an exception. >>>> >>>> This yields some simplification (see below). >>>> >>>> >>>> 2) I also like Guozhang's proposal about KStream#toTable() >>>> >>>> >>>> 3) >>>>> >>>>>> 3. What will happen when you call materialize on KTable that is >>>> already >>>>>> materialized? Will it create another StateStore (providing the n= ame >> is >>>>>> different), throw an Exception? >>>>> >>>>> Currently an exception is thrown, but see below. >>>>> >>>>> >>>> >>>> If we follow approach (1) from Guozhang, there is no need to worry a= bout >>>> a second materialization and also no exception must be throws. A cal= l to >>>> .materialize() basically sets a "materialized flag" (ie, idempotent >>>> operation) and sets a new name. >>>> >>>> >>>> 4) >>>>>> Rename toStream() to toKStream() for consistency. >>>>> >>>>> Not sure whether that is really required. We also use >>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example= , >> and >>>>> don't care about the "K" prefix. >>>> >>>> Eno's reply: >>>>> I think changing it to `toKStream` would make it absolutely clear w= hat >>>> we are converting it to. >>>>> >>>>> I'd say we should probably change the KStreamBuilder methods (but n= ot >> in >>>> this KIP). >>>> >>>> I would keep #toStream(). (see below) >>>> >>>> >>>> 5) We should not remove any methods but only deprecate them. >>>> >>>> >>>> >>>> A general note: >>>> >>>> I do not understand your comments "Rejected Alternatives". You say "= Have >>>> the KTable be the materialized view" was rejected. But your KIP actu= ally >>>> does exactly this -- the changelog abstraction of KTable is secondar= y >>>> after those changes and the "view" abstraction is what a KTable is. = And >>>> just to be clear, I like this a lot: >>>> >>>> - it aligns with the name KTable >>>> - is aligns with stream-table-duality >>>> - it aligns with IQ >>>> >>>> I would say that a KTable is a "view abstraction" (as materializatio= n is >>>> optional). >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> >>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>> Thanks for the KIP Eno, I have a few meta comments and a few detail= ed >>>>> comments: >>>>> >>>>> 1. I like the materialize() function in general, but I would like t= o >> see >>>>> how other KTable functions should be updated accordingly. For examp= le, >> 1) >>>>> KStreamBuilder.table(..) has a state store name parameter, and we w= ill >>>>> always materialize the KTable unless its state store name is set to= >> null; >>>>> 2) KTable.agg requires the result KTable to be materialized, and he= nce >> it >>>>> also have a state store name; 3) KTable.join requires the joining t= able >>>> to >>>>> be materialized. And today we do not actually have a mechanism to >> enforce >>>>> that, but will only throw an exception at runtime if it is not (e.g= =2E if >>>> you >>>>> have "builder.table("topic", null).join()" a RTE will be thrown). >>>>> >>>>> I'd make an extended proposal just to kick off the discussion here:= >> let's >>>>> remove all the state store params in other KTable functions, and if= in >>>> some >>>>> cases KTable have to be materialized (e.g. KTable resulted from >> KXX.agg) >>>>> and users do not call materialize(), then we treat it as "users are= not >>>>> interested in querying it at all" and hence use an internal name >>>> generated >>>>> for the materialized KTable; i.e. although it is materialized the s= tate >>>>> store is not exposed to users. And if users call materialize() >> afterwards >>>>> but we have already decided to materialize it, we can replace the >>>> internal >>>>> name with the user's provided names. Then from a user's point-view,= if >>>> they >>>>> ever want to query a KTable, they have to call materialize() with a= >> given >>>>> state store name. This approach has one awkwardness though, that se= rdes >>>> and >>>>> state store names param are not separated and could be overlapped (= see >>>>> detailed comment #2 below). >>>>> >>>>> >>>>> 2. This step does not need to be included in this KIP, but just as = a >>>>> reference / future work: as we have discussed before, we may enforc= e >>>>> materialize KTable.join resulted KTables as well in the future. If = we >> do >>>>> that, then: >>>>> >>>>> a) KXX.agg resulted KTables are always materialized; >>>>> b) KTable.agg requires the aggregating KTable to always be material= ized >>>>> (otherwise we would not know the old value); >>>>> c) KTable.join resulted KTables are always materialized, and so are= the >>>>> joining KTables to always be materialized. >>>>> d) KTable.filter/mapValues resulted KTables materialization depend = on >> its >>>>> parent's materialization; >>>>> >>>>> By recursive induction all KTables are actually always materialized= , >> and >>>>> then the effect of the "materialize()" is just for specifying the s= tate >>>>> store names. In this scenario, we do not need to send Change in >>>>> repartition topics within joins any more, but only for repartitions= >>>> topics >>>>> within aggregations. Instead, we can just send a "tombstone" withou= t >> the >>>>> old value and we do not need to calculate joins twice (one more tim= e >> when >>>>> old value is received). >>>>> >>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()" >>>> function >>>>> which is interpreted as a dummy-aggregation where the new value alw= ays >>>>> replaces the old value. I have seen a couple of use cases of this, = for >>>>> example, users want to read a changelog topic, apply some filters, = and >>>> then >>>>> materialize it into a KTable with state stores without creating >>>> duplicated >>>>> changelog topics. With materialize() and toTable I'd imagine users = can >>>>> specify sth. like: >>>>> >>>>> " >>>>> KStream stream =3D builder.stream("topic1").filter(..); >>>>> KTable table =3D stream.toTable(..); >>>>> table.materialize("state1"); >>>>> " >>>>> >>>>> And the library in this case could set store "state1" 's changelog >> topic >>>> to >>>>> be "topic1", and applying the filter on the fly while (re-)storing = its >>>>> state by reading from this topic, instead of creating a second >> changelog >>>>> topic like "appID-state1-changelog" which is a semi-duplicate of >>>> "topic1". >>>>> >>>>> >>>>> Detailed: >>>>> >>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was thinki= ng >>>> about >>>>> renaming to "#toChangeLog" but after thinking a bit more I think >>>> #toStream >>>>> is still better, and we can just mention in the javaDoc that it is >>>>> transforming its underlying changelog stream to a normal stream. >>>>> 2. As Damian mentioned, there are a few scenarios where the serdes = are >>>>> already specified in a previous operation whereas it is not known >> before >>>>> calling materialize, for example: >>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. >> table.mapValues(/*no >>>>> serde specified*/).materialize(serde). We need to specify what are = the >>>>> handling logic here. >>>>> 3. We can remove "KTable#to" call as well, and enforce users to cal= l " >>>>> KTable.toStream.to" to be more clear. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska >>>>> wrote: >>>>> >>>>>> I think changing it to `toKStream` would make it absolutely clear = what >>>> we >>>>>> are converting it to. >>>>>> >>>>>> I'd say we should probably change the KStreamBuilder methods (but = not >> in >>>>>> this KIP). >>>>>> >>>>>> Thanks >>>>>> Eno >>>>>> >>>>>>> On 17 Jan 2017, at 13:59, Michael Noll wro= te: >>>>>>> >>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>> >>>>>>> Not sure whether that is really required. We also use >>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for examp= le, >>>> and >>>>>>> don't care about the "K" prefix. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < >> eno.thereska@gmail.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Damian, answers inline: >>>>>>>> >>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy wro= te: >>>>>>>>> >>>>>>>>> Hi Eno, >>>>>>>>> >>>>>>>>> Thanks for the KIP. Some comments: >>>>>>>>> >>>>>>>>> 1. I'd probably rename materialized to materialize. >>>>>>>> >>>>>>>> Ok. >>>>>>>> >>>>>>>>> 2. I don't think the addition of the new Log compaction mechan= ism >> is >>>>>>>>> necessary for this KIP, i.e, the KIP is useful without it. May= be >>>> that >>>>>>>>> should be a different KIP? >>>>>>>> >>>>>>>> Agreed, already removed. Will do a separate KIP for that. >>>>>>>> >>>>>>>> >>>>>>>>> 3. What will happen when you call materialize on KTable that i= s >>>>>> already >>>>>>>>> materialized? Will it create another StateStore (providing the= >> name >>>> is >>>>>>>>> different), throw an Exception? >>>>>>>> >>>>>>>> Currently an exception is thrown, but see below. >>>>>>>> >>>>>>>> >>>>>>>>> 4. Have you considered overloading the existing KTable operati= ons >> to >>>>>>>> add >>>>>>>>> a state store name? So if a state store name is provided, then= >>>>>>>> materialize >>>>>>>>> a state store? This would be my preferred approach as i don't >> think >>>>>>>>> materialize is always a valid operation. >>>>>>>> >>>>>>>> Ok I can see your point. This will increase the KIP size since I= 'll >>>> need >>>>>>>> to enumerate all overloaded methods, but it's not a problem. >>>>>>>> >>>>>>>>> 5. The materialize method will need ta value Serde as some >>>> operations, >>>>>>>>> i.e., mapValues, join etc can change the value types >>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might me= an >>>> that >>>>>>>> we >>>>>>>>> always need to materialize the StateStore for KTable-KTable jo= ins. >>>> If >>>>>>>> that >>>>>>>>> is the case, then the KTable Join operators will also need Ser= de >>>>>>>>> information. >>>>>>>> >>>>>>>> I'll update the KIP with the serdes. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Damian >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> We created "KIP-114: KTable materialization and improved >> semantics" >>>> to >>>>>>>>>> solidify the KTable semantics in Kafka Streams: >>>>>>>>>> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 114%3A+KTable+materialization+and+improved+semantics >>>>>>>>>> < >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 114:+KTable+materialization+and+improved+semantics >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Your feedback is appreciated. >>>>>>>>>> Thanks >>>>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >=20 --OeqVNVOa4tjlR89kuXgvuWvPGeurlkT0q-- --9CmHVr3emA6fTfPrtNv6lD9vvU7OmmHeH Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYh6tDAAoJECnhiMLycopPQRAQAKnyDIJtpoLdOK2QpYBww4+x kMpGTfPSjzjFLPUGhZLOeca9uhCOMH2l5xlhb9jsCjwTkQKjw5kq9KpMAOqbyk88 3JWvAjw2TVwZ5bRtedr8MCp2mAhrGFjpzPPHKOuUiW8hsPGsdN1MuBa7xyXyNiMg HBdL8lcQQ9aP9O/DGBqhyE9gzhkwOQSINu/BpkVIK5tzqwKWuVyp9+K1AUwf5Gnb 7ABtH7wdyX6IwAiS3LXv3Qb7PV8UA9irNBhqfYKiDIcbCw33oaGC2kLwt9Y3XJyR ss6yM2o14p0d/O+ayoPtMBmLU7ju4+hleXz/1ZPLZSXayQD70SsDT4a6qbfRHnxG aLtrlFKhjz0/Q2KSEZe2opVJLwrnKBtZ0OHlkP7AXbE9jKXYXLzXr8+S3ExAt1Fz s598ahe3Sf/iKdJlA+WvcqQ4EFqDrX6ah83dasA4Lrgl+/MJMrkYLgYP6HNam7oI x/kVqGs7SlEHKsG9ILWA+8YPavIxKCk92IJTix8UKwzwM47Zm3Kuwqe2bbWLfKNw kbcV1IoLpXXtdPQg8o32l4XYTWDkGymkCNzzhCgMxIoH/8HxZmF6bp//UjbVg77T Mvq8pWwC5EtiuidIa5wro6GsF8XDadH+3YUSjZpL20Mn+QlX5kYsGNRZbBclAsi9 wX/4/DxSz6LAeV5hDZxp =o53j -----END PGP SIGNATURE----- --9CmHVr3emA6fTfPrtNv6lD9vvU7OmmHeH--