kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Tue, 06 Sep 2016 23:39:27 GMT
Hi Matthias,

I agree with your concerns of coupling with record forwarding with record
storing in the state store, and my understanding is that this can (and
should) be resolved with the current interface. Here are my thoughts:

1. The global cache, MemoryLRUCacheBytes, although is currently defined as
internal class, since it is exposed in ProcessorContext anyways, should
really be a public class anyways that users can access to (I have some
other comments about the names, but will rather leave them in the PR).

2. In the processor API, the users can choose to use the cache to store the
intermediate results in the cache, and register the flush listener via
addDirtyEntryFlushListener (again some naming suggestions in PR but use it
for discussion for now). And as a result, if the old processor code looks
like this:

----------------

process(...) {

  state.put(...);
  context.forward(...);
}
----------------

Users can now leverage the cache on some of the processors by modifying the
code as:

----------------

init(...) {

  context.getCache().addDirtyEntyFlushLisener(processorName,
{state.put(...); context.forward(...)})
}

process(...) {

  context.getCache().put(processorName, ..);
}

----------------

3. Note whether or not to apply caching is optional for each processor node
now, and is decoupled with its logic of forwarding / storing in persistent
state stores.

One may argue that now if users want to make use of the cache, he will need
to make code changes; but I think this is a reasonable requirement to users
actually, since that 1) currently we do one update-per-incoming-record, and
without code changes this behavior will be preserved, and 2) for DSL
implementation, we can just follow the above pattern to abstract it from
users, so they can pick up these changes automatically.


Guozhang


On Tue, Sep 6, 2016 at 7:41 AM, Eno Thereska <eno.thereska@gmail.com> wrote:

> A small update to the KIP: the deduping of records using the cache does
> not affect the .to operator since we'd have already deduped the KTable
> before the operator. Adjusting KIP.
>
> Thanks
> Eno
>
> > On 5 Sep 2016, at 12:43, Eno Thereska <eno.thereska@gmail.com> wrote:
> >
> > Hi Matthias,
> >
> > The motivation for KIP-63 was primarily aggregates and reducing the load
> on "both" state stores and downstream. I think there is agreement that for
> the DSL the motivation and design make sense.
> >
> > For the Processor API: caching is a major component in any system, and
> it is difficult to continue to operate as before, without fully
> understanding the consequences. Hence, I think this is mostly a case of
> educating users to understand the boundaries of the solution.
> >
> > Introducing a cache, either for the state store only, or for downstream
> forwarding only, or for both, leads to moving from a model where we process
> each request end-to-end (today) to one where a request is temporarily
> buffered in a cache. In all the cases, this opens up the question of what
> to do next once the request then leaves the cache, and how to express that
> (future) behaviour. E.g., even when the cache is just for downstream
> forwarding (i.e., decoupled from any state store), the processor API user
> might be surprised that context.forward() does not immediately do anything.
> >
> > I agree that for ultra-flexibility, a processor API user should be able
> to choose whether the dedup cache is put 1) on top of a store only, 2) on
> forward only, 3) on both store and forward, but given the motivation for
> KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a
> reasonable choice that provides good default behaviour, without prodding
> the user to specify the combinations.
> >
> > We need to educate users that if a cache is used in the Processor API,
> the forwarding will happen in the future.
> >
> > -Eno
> >
> >
> >
> >> On 4 Sep 2016, at 19:11, Matthias J. Sax <matthias@confluent.io> wrote:
> >>
> >>> Processor code should always work; independently if caching is enabled
> >> or not.
> >>
> >> If we want to get this, I guess we need a quite different design (see
> (1)).
> >>
> >> The point is, that we want to dedup the output, and not state updates.
> >>
> >> It just happens that our starting point was KTable, for which state
> >> updates and downstream changelog output is the same thing. Thus, we can
> >> just use the internal KTable state to do the deduplication for the
> >> downstream changelog.
> >>
> >> However, from a general point of view (Processor API view), if we dedup
> >> the output, we want dedup/caching for the processor (and not for a state
> >> store). Of course, we need a state to do the dedup. For KTable, both
> >> things merge into a single abstraction, and we use only a single state
> >> instead of two. From a general point of view, we would need two states
> >> though (one for the actual state, and one for dedup -- think Processor
> >> API -- not DSL).
> >>
> >>
> >> Alternative proposal 1:
> >> (see also (2) -- which might be better than this one)
> >>
> >> Thus, it might be a cleaner design to decouple user-states and
> >> dedup-state from each other. If a user enables dedup/caching (for a
> >> processor) we add an additional state to do the dedup and this
> >> dedup-state is independent from all user states and context.forward()
> >> works as always. The dedup state could be hidden from the user and could
> >> be a pure in-memory state (no need for any recovery -- only flush on
> >> commit). Internally, a context.forward() would call dedupState.put() and
> >> trigger actual output if dedup state needs to evict records.
> >>
> >> The disadvantage would be, that we end up with two states for KTable.
> >> The advantage is, that deduplication can be switched off/on without any
> >> Processor code change.
> >>
> >>
> >> Alternative proposal 2:
> >>
> >> We basically keep the current KIP design, including not to disable
> >> context.forward() if a cached state is used. Additionally, for cached
> >> state, we rename put() into putAndForward() which is only available for
> >> cached states. Thus, in processor code, a state must be explicitly cast
> >> into a cached state. We also make the user aware, that an update/put to
> >> a state result in downstream output and that context.forward() would be
> >> a "direct/non-cached" output.
> >>
> >> The disadvantage of this is, that processor code is not independent from
> >> caching and thus, caching cannot just be switched on/off (ie, we do not
> >> follow the initial statement of this mail). The advantage is, we can
> >> keep a single state for KTable and this design is just small changes to
> >> the current KIP.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
> >>> Sure, you can use a non-cached state. However, if you write code like
> >>> below for a non-cached state, and learn about caching later on, and
> >>> think, caching is a cool feature, I want to use it, you would simply
> >>> want to enable caching (without breaking your code).
> >>>
> >>> Processor code should always work independently if caching is enabled
> or
> >>> not.
> >>>
> >>> -Matthias
> >>>
> >>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for the good questions.
> >>>>
> >>>> There is still the option of not using cached state. If one uses
> cached state it will dedup for stores and forwarding further. But you can
> always disable caching and do what you say.
> >>>>
> >>>> Eno
> >>>>
> >>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matthias@confluent.io>
> wrote:
> >>>>>
> >>>>> Sorry for not being precise. What I meant be "completely" is for
a
> >>>>> single processor. Assume I want to have the following pattern:
> >>>>>
> >>>>> process(...) {
> >>>>>  if (someCondition) {
> >>>>>    state.put(...)
> >>>>>    context.forward(...);
> >>>>>  } else {
> >>>>>    context.forward(...);
> >>>>> }
> >>>>>
> >>>>> Ie, for some record I do update the state and emit output records,
> for
> >>>>> other records I only emit output records. This work in current
> design.
> >>>>> However, if a "cached state" would be used, it would not work any
> more.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
> >>>>>> Hi Matthias,
> >>>>>>
> >>>>>> Thanks for bringing the conversation across to the thread.
> >>>>>>
> >>>>>> I think a main limitation would be, that you cannot mix the
4
> patterns
> >>>>>>> within a single application anymore (iff you use a "caches
> state"). If
> >>>>>>> you have processor with a "cached state" this disables direct
> usage of
> >>>>>>> context.forward() completely -- if I understand the design
> correctly.
> >>>>>>> Thus, if a "cached state" is used, forwarding is only possible
via
> state
> >>>>>>> updates.
> >>>>>>>
> >>>>>>>
> >>>>>> The above statement is not correct. Caching doesn't completely
> disable
> >>>>>> forwarding, it only disables it for Processors that are using
State
> Stores.
> >>>>>> In all other cases context.forward() works as it does now.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Damian
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message