kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Sun, 04 Sep 2016 15:37:34 GMT
I think a main limitation would be, that you cannot mix the 4 patterns
within a single application anymore (iff you use a "caches state"). If
you have processor with a "cached state" this disables direct usage of
context.forward() completely -- if I understand the design correctly.
Thus, if a "cached state" is used, forwarding is only possible via state
updates.

The above described approach is fine from DSL point of view. The main
question is, if a "cached state" should be a DSL internal implementation
detail or should be exposed to the user for Processor API reuse. For the
former, the design is fine; for the latter, IMHO it puts a limitation
and hard to understand usage pattern for a regular user of Processor API.

-Matthias


On 09/04/2016 05:28 PM, Matthias J. Sax wrote:
> We had a recent discussion about KIP-63, and I just c&p from the JIRA
> discussion:
> 
> Damian:
>> During the code walk-through, Matthias raised a very good point about the use of
context().forward being coupled to whether or not caching is enabled. Now that i've had the
chance to think about it I have one potential solution for making this transparent to uses
of the Processor API.
>>
>> We can add another method boolean isCachingEnabled() to the new interface ForwardingStateStoreSupplier.
We also add 2 new methods to ProcessorNode:
>> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
>>
>> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener to attach
to the ForwardingStateStoreSupplier we can call ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
>>
>> We add an extra boolean parameter to the ProcessorRecordContextImpl forward this
will be set to false when constructed from StreamTask and will be set to true when constructed
from ProcessorNodeCacheFlushListener. Then in ProcessorRecordContextImpl.forward(..) we add
a guard if (shouldForward()) where shouldForward is return forward || !node.stateStoreCachingEnabled();
>>
>> Now Processors are free to call context().forward(..) whether caching is enabled
or not. If it is enabled the values just wont get forwarded until the cache evicts/flushes
them.
> 
> 
> Matthias:
>> I guess this is a good solution/workaround. I had something like this in my mind
during the call, too.
>>
>> However, thinking about the root cause of this issue again, I am not sure if the
(overall) design of this KIP is optimal or not. My new concern is, that with this caching
strategy, we "merge" two concepts into one; and I am not sure, if we should to this.
>>
>> Currently, message flow and state is decoupled and independent of each other. Thus,
if there is a state, updates to the state are completely independent from emitting output
records. With the new design, we merge state updates and record emits, limiting the overall
flexibility. I guess, from a DSL point of view, this would not be problematic, because in
an aggregation and changelog output, each update to the state should result in a downstream
record. However, from a Processor API point of view, there are other patterns we want to be
able to support, too.
>>
>> Basically, for each input record, there a four different patterns that could be applied
by the user:
>>
>>     no state updates, no output records
>>     only state update
>>     only output records
>>     state updates and output records
>>
>> Right now, we go with a design that allows to use one of the patterns within a Processor.
However, all 4 pattern could be mixed within a single Processor (pre KIP design), and this
mixture would not be possible any more. If we want to support all four cases, we might not
want to merge both into "a single abstraction" as we do in the design of this PR. What if
a user just wants to sent a record downstream (without any state manipulation)?
>>
>> Going back to the KIP design, we move the cache from RocksDB into the processor.
However, what we actually wanted to do was to de-duplicate output records. Thus, the newly
introduced cache, could actually go "after the processor" and could be completely independent
from the state. Thus, on each call to forward() the record is put into the cache, and if the
cache is full, an actual cache eviction and record forwarding happens. This would make the
de-duplication cache independent from the state.
> 
> 
> Eno:
>> it's not entirely true that the flexibility is limited. For example, what's next
in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 where we add the dedup
cache to the to operator. That is not implemented yet.
> 
> 
> Damian:
>> i think of the 4 patterns you mentioned only the last one changes, i.e, state updates
and output records.
>> context.forward() still exists so you can just send a record downstream without any
state manipulation, that behaviour hasn't changed.
> 
> 
> 
> 
> 
> 
> On 08/24/2016 03:35 PM, Eno Thereska wrote:
>> Hi folks,
>>
>> We've been working on a proof-of-concept for KIP-63 and that can now be
>> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776)
>> under PR https://github.com/apache/kafka/pull/1752. It is still work in
>> progress, however we are confident that the basic structure is there.
>>
>> As part of this work, we've also updated the KIP to clarify several things,
>> listed here for convenience:
>>
>> - Clarify that the optimization is applicable to aggregations and to
>> operators. It is not applicable to joins.
>> - Clarify that for the low-level Processor API, we propose to allow users
>> for disabling caching on a store-by-store basis using a new
>> .enableCaching() call.
>>
>> We'll start the voting process shortly for this KIP.
>>
>> Thanks
>> Eno
>>
>>
>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.thereska@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I have created KIP-63: Unify store and downstream caching in streams
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 63%3A+Unify+store+and+downstream+caching+in+streams
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>
>>>
>>>
>>> Feedback is appreciated.
>>>
>>> Thank you
>>> Eno
>>>
>>
> 


Mime
View raw message