kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Tue, 06 Sep 2016 14:41:26 GMT
A small update to the KIP: the deduping of records using the cache does not affect the .to
operator since we'd have already deduped the KTable before the operator. Adjusting KIP.

Thanks
Eno

> On 5 Sep 2016, at 12:43, Eno Thereska <eno.thereska@gmail.com> wrote:
> 
> Hi Matthias,
> 
> The motivation for KIP-63 was primarily aggregates and reducing the load on "both" state
stores and downstream. I think there is agreement that for the DSL the motivation and design
make sense.
> 
> For the Processor API: caching is a major component in any system, and it is difficult
to continue to operate as before, without fully understanding the consequences. Hence, I think
this is mostly a case of educating users to understand the boundaries of the solution. 
> 
> Introducing a cache, either for the state store only, or for downstream forwarding only,
or for both, leads to moving from a model where we process each request end-to-end (today)
to one where a request is temporarily buffered in a cache. In all the cases, this opens up
the question of what to do next once the request then leaves the cache, and how to express
that (future) behaviour. E.g., even when the cache is just for downstream forwarding (i.e.,
decoupled from any state store), the processor API user might be surprised that context.forward()
does not immediately do anything.
> 
> I agree that for ultra-flexibility, a processor API user should be able to choose whether
the dedup cache is put 1) on top of a store only, 2) on forward only, 3) on both store and
forward, but given the motivation for KIP-63 (aggregates), I believe a decoupled store-forward
dedup cache is a reasonable choice that provides good default behaviour, without prodding
the user to specify the combinations.
> 
> We need to educate users that if a cache is used in the Processor API, the forwarding
will happen in the future. 
> 
> -Eno
> 
> 
> 
>> On 4 Sep 2016, at 19:11, Matthias J. Sax <matthias@confluent.io> wrote:
>> 
>>> Processor code should always work; independently if caching is enabled
>> or not.
>> 
>> If we want to get this, I guess we need a quite different design (see (1)).
>> 
>> The point is, that we want to dedup the output, and not state updates.
>> 
>> It just happens that our starting point was KTable, for which state
>> updates and downstream changelog output is the same thing. Thus, we can
>> just use the internal KTable state to do the deduplication for the
>> downstream changelog.
>> 
>> However, from a general point of view (Processor API view), if we dedup
>> the output, we want dedup/caching for the processor (and not for a state
>> store). Of course, we need a state to do the dedup. For KTable, both
>> things merge into a single abstraction, and we use only a single state
>> instead of two. From a general point of view, we would need two states
>> though (one for the actual state, and one for dedup -- think Processor
>> API -- not DSL).
>> 
>> 
>> Alternative proposal 1:
>> (see also (2) -- which might be better than this one)
>> 
>> Thus, it might be a cleaner design to decouple user-states and
>> dedup-state from each other. If a user enables dedup/caching (for a
>> processor) we add an additional state to do the dedup and this
>> dedup-state is independent from all user states and context.forward()
>> works as always. The dedup state could be hidden from the user and could
>> be a pure in-memory state (no need for any recovery -- only flush on
>> commit). Internally, a context.forward() would call dedupState.put() and
>> trigger actual output if dedup state needs to evict records.
>> 
>> The disadvantage would be, that we end up with two states for KTable.
>> The advantage is, that deduplication can be switched off/on without any
>> Processor code change.
>> 
>> 
>> Alternative proposal 2:
>> 
>> We basically keep the current KIP design, including not to disable
>> context.forward() if a cached state is used. Additionally, for cached
>> state, we rename put() into putAndForward() which is only available for
>> cached states. Thus, in processor code, a state must be explicitly cast
>> into a cached state. We also make the user aware, that an update/put to
>> a state result in downstream output and that context.forward() would be
>> a "direct/non-cached" output.
>> 
>> The disadvantage of this is, that processor code is not independent from
>> caching and thus, caching cannot just be switched on/off (ie, we do not
>> follow the initial statement of this mail). The advantage is, we can
>> keep a single state for KTable and this design is just small changes to
>> the current KIP.
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>>> Sure, you can use a non-cached state. However, if you write code like
>>> below for a non-cached state, and learn about caching later on, and
>>> think, caching is a cool feature, I want to use it, you would simply
>>> want to enable caching (without breaking your code).
>>> 
>>> Processor code should always work independently if caching is enabled or
>>> not.
>>> 
>>> -Matthias
>>> 
>>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>>> Hi Matthias,
>>>> 
>>>> Thanks for the good questions. 
>>>> 
>>>> There is still the option of not using cached state. If one uses cached state
it will dedup for stores and forwarding further. But you can always disable caching and do
what you say.
>>>> 
>>>> Eno
>>>> 
>>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matthias@confluent.io>
wrote:
>>>>> 
>>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>>> single processor. Assume I want to have the following pattern:
>>>>> 
>>>>> process(...) {
>>>>>  if (someCondition) {
>>>>>    state.put(...)
>>>>>    context.forward(...);
>>>>>  } else {
>>>>>    context.forward(...);
>>>>> }
>>>>> 
>>>>> Ie, for some record I do update the state and emit output records, for
>>>>> other records I only emit output records. This work in current design.
>>>>> However, if a "cached state" would be used, it would not work any more.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>>> Hi Matthias,
>>>>>> 
>>>>>> Thanks for bringing the conversation across to the thread.
>>>>>> 
>>>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>>>> within a single application anymore (iff you use a "caches state").
If
>>>>>>> you have processor with a "cached state" this disables direct
usage of
>>>>>>> context.forward() completely -- if I understand the design correctly.
>>>>>>> Thus, if a "cached state" is used, forwarding is only possible
via state
>>>>>>> updates.
>>>>>>> 
>>>>>>> 
>>>>>> The above statement is not correct. Caching doesn't completely disable
>>>>>> forwarding, it only disables it for Processors that are using State
Stores.
>>>>>> In all other cases context.forward() works as it does now.
>>>>>> 
>>>>>> Thanks,
>>>>>> Damian
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Mime
View raw message