kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-114: KTable materialization and improved semantics
Date Tue, 24 Jan 2017 19:30:10 GMT
That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit method
call, rather than implicitly triggered by using a different overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:
> I think your definition of a huge impact and mine are rather different ;-P
> Overloading a few methods  is not really a huge impact IMO. It is also a
> sacrifice worth making for readability, usability of the API.
> 
> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matthias@confluent.io> wrote:
> 
>> I understand your argument, but do not agree with it.
>>
>> Your first version (even if the "flow" is not as nice) is more explicit
>> than the second version. Adding a stateStoreName parameter is quite
>> implicit but has a huge impact -- thus, I prefer the rather more verbose
>> but explicit version.
>>
>>
>> -Matthias
>>
>> On 1/23/17 1:39 AM, Damian Guy wrote:
>>> I'm not a fan of materialize. I think it interrupts the flow, i.e,
>>>
>>> table.mapValue(..).materialize().join(..).materialize()
>>> compared to:
>>> table.mapValues(..).join(..)
>>>
>>> I know which one i prefer.
>>> My preference is stil to provide overloaded methods where people can
>>> specify the store names if they want, otherwise we just generate them.
>>>
>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>>
>>>> Hi,
>>>>
>>>> thanks for the KIP Eno! Here are my 2 cents:
>>>>
>>>> 1) I like Guozhang's proposal about removing store name from all KTable
>>>> methods and generate internal names (however, I would do this as
>>>> overloads). Furthermore, I would not force users to call .materialize()
>>>> if they want to query a store, but add one more method .stateStoreName()
>>>> that returns the store name if the KTable is materialized. Thus, also
>>>> .materialize() must not necessarily have a parameter storeName (ie, we
>>>> should have some overloads here).
>>>>
>>>> I would also not allow to provide a null store name (to indicate no
>>>> materialization if not necessary) but throw an exception.
>>>>
>>>> This yields some simplification (see below).
>>>>
>>>>
>>>> 2) I also like Guozhang's proposal about KStream#toTable()
>>>>
>>>>
>>>> 3)
>>>>>
>>>>>>   3. What will happen when you call materialize on KTable that is
>>>> already
>>>>>>   materialized? Will it create another StateStore (providing the
name
>> is
>>>>>>   different), throw an Exception?
>>>>>
>>>>> Currently an exception is thrown, but see below.
>>>>>
>>>>>
>>>>
>>>> If we follow approach (1) from Guozhang, there is no need to worry about
>>>> a second materialization and also no exception must be throws. A call to
>>>> .materialize() basically sets a "materialized flag" (ie, idempotent
>>>> operation) and sets a new name.
>>>>
>>>>
>>>> 4)
>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>
>>>>> Not sure whether that is really required. We also use
>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,
>> and
>>>>> don't care about the "K" prefix.
>>>>
>>>> Eno's reply:
>>>>> I think changing it to `toKStream` would make it absolutely clear what
>>>> we are converting it to.
>>>>>
>>>>> I'd say we should probably change the KStreamBuilder methods (but not
>> in
>>>> this KIP).
>>>>
>>>> I would keep #toStream(). (see below)
>>>>
>>>>
>>>> 5) We should not remove any methods but only deprecate them.
>>>>
>>>>
>>>>
>>>> A general note:
>>>>
>>>> I do not understand your comments "Rejected Alternatives". You say "Have
>>>> the KTable be the materialized view" was rejected. But your KIP actually
>>>> does exactly this -- the changelog abstraction of KTable is secondary
>>>> after those changes and the "view" abstraction is what a KTable is. And
>>>> just to be clear, I like this a lot:
>>>>
>>>>  - it aligns with the name KTable
>>>>  - is aligns with stream-table-duality
>>>>  - it aligns with IQ
>>>>
>>>> I would say that a KTable is a "view abstraction" (as materialization is
>>>> optional).
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>>
>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>>>> Thanks for the KIP Eno, I have a few meta comments and a few detailed
>>>>> comments:
>>>>>
>>>>> 1. I like the materialize() function in general, but I would like to
>> see
>>>>> how other KTable functions should be updated accordingly. For example,
>> 1)
>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>>>> always materialize the KTable unless its state store name is set to
>> null;
>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence
>> it
>>>>> also have a state store name; 3) KTable.join requires the joining table
>>>> to
>>>>> be materialized. And today we do not actually have a mechanism to
>> enforce
>>>>> that, but will only throw an exception at runtime if it is not (e.g.
if
>>>> you
>>>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>>>
>>>>> I'd make an extended proposal just to kick off the discussion here:
>> let's
>>>>> remove all the state store params in other KTable functions, and if in
>>>> some
>>>>> cases KTable have to be materialized (e.g. KTable resulted from
>> KXX.agg)
>>>>> and users do not call materialize(), then we treat it as "users are not
>>>>> interested in querying it at all" and hence use an internal name
>>>> generated
>>>>> for the materialized KTable; i.e. although it is materialized the state
>>>>> store is not exposed to users. And if users call materialize()
>> afterwards
>>>>> but we have already decided to materialize it, we can replace the
>>>> internal
>>>>> name with the user's provided names. Then from a user's point-view, if
>>>> they
>>>>> ever want to query a KTable, they have to call materialize() with a
>> given
>>>>> state store name. This approach has one awkwardness though, that serdes
>>>> and
>>>>> state store names param are not separated and could be overlapped (see
>>>>> detailed comment #2 below).
>>>>>
>>>>>
>>>>> 2. This step does not need to be included in this KIP, but just as a
>>>>> reference / future work: as we have discussed before, we may enforce
>>>>> materialize KTable.join resulted KTables as well in the future. If we
>> do
>>>>> that, then:
>>>>>
>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>> b) KTable.agg requires the aggregating KTable to always be materialized
>>>>> (otherwise we would not know the old value);
>>>>> c) KTable.join resulted KTables are always materialized, and so are the
>>>>> joining KTables to always be materialized.
>>>>> d) KTable.filter/mapValues resulted KTables materialization depend on
>> its
>>>>> parent's materialization;
>>>>>
>>>>> By recursive induction all KTables are actually always materialized,
>> and
>>>>> then the effect of the "materialize()" is just for specifying the state
>>>>> store names. In this scenario, we do not need to send Change<V>
in
>>>>> repartition topics within joins any more, but only for repartitions
>>>> topics
>>>>> within aggregations. Instead, we can just send a "tombstone" without
>> the
>>>>> old value and we do not need to calculate joins twice (one more time
>> when
>>>>> old value is received).
>>>>>
>>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()"
>>>> function
>>>>> which is interpreted as a dummy-aggregation where the new value always
>>>>> replaces the old value. I have seen a couple of use cases of this, for
>>>>> example, users want to read a changelog topic, apply some filters, and
>>>> then
>>>>> materialize it into a KTable with state stores without creating
>>>> duplicated
>>>>> changelog topics. With materialize() and toTable I'd imagine users can
>>>>> specify sth. like:
>>>>>
>>>>> "
>>>>> KStream stream = builder.stream("topic1").filter(..);
>>>>> KTable table = stream.toTable(..);
>>>>> table.materialize("state1");
>>>>> "
>>>>>
>>>>> And the library in this case could set store "state1" 's changelog
>> topic
>>>> to
>>>>> be "topic1", and applying the filter on the fly while (re-)storing its
>>>>> state by reading from this topic, instead of creating a second
>> changelog
>>>>> topic like "appID-state1-changelog" which is a semi-duplicate of
>>>> "topic1".
>>>>>
>>>>>
>>>>> Detailed:
>>>>>
>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was thinking
>>>> about
>>>>> renaming to "#toChangeLog" but after thinking a bit more I think
>>>> #toStream
>>>>> is still better, and we can just mention in the javaDoc that it is
>>>>> transforming its underlying changelog stream to a normal stream.
>>>>> 2. As Damian mentioned, there are a few scenarios where the serdes are
>>>>> already specified in a previous operation whereas it is not known
>> before
>>>>> calling materialize, for example:
>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s.
>> table.mapValues(/*no
>>>>> serde specified*/).materialize(serde). We need to specify what are the
>>>>> handling logic here.
>>>>> 3. We can remove "KTable#to" call as well, and enforce users to call
"
>>>>> KTable.toStream.to" to be more clear.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <eno.thereska@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think changing it to `toKStream` would make it absolutely clear
what
>>>> we
>>>>>> are converting it to.
>>>>>>
>>>>>> I'd say we should probably change the KStreamBuilder methods (but
not
>> in
>>>>>> this KIP).
>>>>>>
>>>>>> Thanks
>>>>>> Eno
>>>>>>
>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <michael@confluent.io>
wrote:
>>>>>>>
>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>
>>>>>>> Not sure whether that is really required. We also use
>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,
>>>> and
>>>>>>> don't care about the "K" prefix.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <
>> eno.thereska@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Damian, answers inline:
>>>>>>>>
>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian.guy@gmail.com>
wrote:
>>>>>>>>>
>>>>>>>>> Hi Eno,
>>>>>>>>>
>>>>>>>>> Thanks for the KIP. Some comments:
>>>>>>>>>
>>>>>>>>>  1. I'd probably rename materialized to materialize.
>>>>>>>>
>>>>>>>> Ok.
>>>>>>>>
>>>>>>>>>  2. I don't think the addition of the new Log compaction
mechanism
>> is
>>>>>>>>>  necessary for this KIP, i.e, the KIP is useful without
it. Maybe
>>>> that
>>>>>>>>>  should be a different KIP?
>>>>>>>>
>>>>>>>> Agreed, already removed. Will do a separate KIP for that.
>>>>>>>>
>>>>>>>>
>>>>>>>>>  3. What will happen when you call materialize on KTable
that is
>>>>>> already
>>>>>>>>>  materialized? Will it create another StateStore (providing
the
>> name
>>>> is
>>>>>>>>>  different), throw an Exception?
>>>>>>>>
>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>
>>>>>>>>
>>>>>>>>>  4. Have you considered overloading the existing KTable
operations
>> to
>>>>>>>> add
>>>>>>>>>  a state store name? So if a state store name is provided,
then
>>>>>>>> materialize
>>>>>>>>>  a state store? This would be my preferred approach as
i don't
>> think
>>>>>>>>>  materialize is always a valid operation.
>>>>>>>>
>>>>>>>> Ok I can see your point. This will increase the KIP size
since I'll
>>>> need
>>>>>>>> to enumerate all overloaded methods, but it's not a problem.
>>>>>>>>
>>>>>>>>>  5. The materialize method will need ta value Serde as
some
>>>> operations,
>>>>>>>>>  i.e., mapValues, join etc can change the value types
>>>>>>>>>  6. https://issues.apache.org/jira/browse/KAFKA-4609
- might mean
>>>> that
>>>>>>>> we
>>>>>>>>>  always need to materialize the StateStore for KTable-KTable
joins.
>>>> If
>>>>>>>> that
>>>>>>>>>  is the case, then the KTable Join operators will also
need Serde
>>>>>>>>>  information.
>>>>>>>>
>>>>>>>> I'll update the KIP with the serdes.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Eno
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <eno.thereska@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> We created "KIP-114: KTable materialization and improved
>> semantics"
>>>> to
>>>>>>>>>> solidify the KTable semantics in Kafka Streams:
>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics
>>>>>>>>>> <
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 114:+KTable+materialization+and+improved+semantics
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Your feedback is appreciated.
>>>>>>>>>> Thanks
>>>>>>>>>> Eno
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Mime
View raw message