kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <bbej...@gmail.com>
Subject Re: [DISCUSS] KIP-479: Add Materialized to Join
Date Tue, 17 Sep 2019 21:09:27 GMT
Thanks for the comments, Matthias.

I like both suggestions regarding the names and I'll update the KIP
accordingly.

On Tue, Sep 17, 2019 at 11:22 AM Matthias J. Sax <matthias@confluent.io>
wrote:

> Thanks Bill!
>
> Using a new configuration object was suggested by John in the original
> DISCUSS thread already. We rejected it because we wanted to avoid a new
> configuration class. However, given your analysis, it seems it's
> actually the right choice to introduce a new configuration class.
>
> Hence, overall I am +1 on the proposal.
>
>
> Some nits about names (as always :))
>
> - `StreamJoined.as()` does not sound right for the `StoreSupplier`
> overload. Maybe it's better to call that static method `with` (not 100%
> sure)
>
> - Should we use plural instead of singular -> `Stream{s}Joined`
>   -> or keep singular but call it `StreamJoin`
>   `Joined` seems to refer to the input stream(s) while `Join` would
> refer to the join-operator
>
> Thoughts?
>
>
> You suggest to deprecate existing overload what I support -- can you
> list deprecated method in the "Public Interface" section?
>
>
>
> -Matthias
>
>
>
> On 9/17/19 9:39 AM, Guozhang Wang wrote:
> > Hello Bill,
> >
> > Thanks for the updated KIP. I made a pass on the StreamJoined section.
> Just
> > a quick question from user's perspective: if a user wants to provide a
> > customized store-supplier, she is forced to also provide a name via the
> > store-supplier. So there's no way to say "I want to provide my own store
> > engine but let the library decide its name", is that right?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbejeck@gmail.com> wrote:
> >
> >> Bumping this discussion as we need to re-vote before the KIP deadline.
> >>
> >> On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbejeck@gmail.com> wrote:
> >>
> >>> Hi All,
> >>>
> >>> While working on the implementation of KIP-479, some issues came to
> light
> >>> that the KIP as written won't work.  I have updated the KIP with a
> >> solution
> >>> I believe will solve the original problem as well as address the
> >>> impediment to the initial approach.
> >>>
> >>> This update is a significant change, so please review the updated KIP
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> >> and
> >>> provide feedback.  After we conclude the discussion, there will be a
> >>> re-vote.
> >>>
> >>> Thanks!
> >>> Bill
> >>>
> >>> On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Bill, thanks for your explanations. I'm on board with your decision
> >>>> too.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbejeck@gmail.com>
> wrote:
> >>>>
> >>>>> Thanks for the response, John.
> >>>>>
> >>>>>> If I can offer my thoughts, it seems better to just document on the
> >>>>>> Stream join javadoc for the Materialized parameter that it will not
> >>>>>> make the join result queriable. I'm not opposed to the queriable
> >> flag
> >>>>>> in general, but introducing it is a much larger consideration that
> >> has
> >>>>>> previously derailed this KIP discussion. In the interest of just
> >>>>>> closing the gap and keeping the API change small, it seems better to
> >>>>>> just go with documentation for now.
> >>>>>
> >>>>> I agree with your statement here.  IMHO the most important goal of
> >> this
> >>>> KIP
> >>>>> is to not breaking existing users and gain some consistency of the
> >> API.
> >>>>>
> >>>>> I'll update the KIP accordingly.
> >>>>>
> >>>>> -Bill
> >>>>>
> >>>>> On Tue, Jul 16, 2019 at 11:55 AM John Roesler <john@confluent.io>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Bill,
> >>>>>>
> >>>>>> Thanks for driving this KIP toward a conclusion. I'm on board with
> >>>>>> your decision.
> >>>>>>
> >>>>>> You didn't mention whether you're still proposing to add the
> >>>>>> "queriable" flag to the Materialized config object, or just document
> >>>>>> that a Stream join is never queriable. Both options have come up
> >>>>>> earlier in the discussion, so it would be good to pin this down.
> >>>>>>
> >>>>>> If I can offer my thoughts, it seems better to just document on the
> >>>>>> Stream join javadoc for the Materialized parameter that it will not
> >>>>>> make the join result queriable. I'm not opposed to the queriable
> >> flag
> >>>>>> in general, but introducing it is a much larger consideration that
> >> has
> >>>>>> previously derailed this KIP discussion. In the interest of just
> >>>>>> closing the gap and keeping the API change small, it seems better to
> >>>>>> just go with documentation for now.
> >>>>>>
> >>>>>> Thanks again,
> >>>>>> -John
> >>>>>>
> >>>>>> On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <bbejeck@gmail.com>
> >>>> wrote:
> >>>>>>>
> >>>>>>> Thanks all for the great discussion so far.
> >>>>>>>
> >>>>>>> Everyone has made excellent points, and I appreciate the detail
> >>>>> everyone
> >>>>>>> has put into their arguments.
> >>>>>>>
> >>>>>>> However, after carefully evaluating all the points made so far,
> >>>>> creating
> >>>>>> an
> >>>>>>> overload with Materialized is still my #1 option.
> >>>>>>> My reasoning for saying so is two-fold:
> >>>>>>>
> >>>>>>>    1. It's a small change, and IMHO since it's consistent with our
> >>>>>> current
> >>>>>>>    API concerning state store usage, the cognitive load on users
> >>>> will
> >>>>> be
> >>>>>>>    minimal.
> >>>>>>>    2. It achieves the most important goal of this KIP, namely to
> >>>> close
> >>>>>> the
> >>>>>>>    gap of naming state stores independently of the join operator
> >>>> name.
> >>>>>>>
> >>>>>>> Additionally, I agree with the points made by Matthias earlier (I
> >>>>> realize
> >>>>>>> there is some overlap here).
> >>>>>>>
> >>>>>>>>  - the main purpose of this KIP is to close the naming gap what
> >> we
> >>>>>> achieve
> >>>>>>>>  - we can allow people to use the new in-memory store
> >>>>>>>>  - we allow people to enable/disable caching
> >>>>>>>>  - we unify the API
> >>>>>>>>  - we decouple querying from naming
> >>>>>>>>  - it's a small API change
> >>>>>>>
> >>>>>>> Although it's not a perfect solution,  IMHO the positives of using
> >>>>>>> Materialize far outweigh the negatives, and from what we've
> >>>> discussed
> >>>>> so
> >>>>>>> far, anything we implement seems to involve an additional change
> >>>> down
> >>>>> the
> >>>>>>> road.
> >>>>>>>
> >>>>>>> If others are still strongly opposed to using Materialized, my
> >> other
> >>>>>>> preferences would be
> >>>>>>>
> >>>>>>>    1. Add a "withStoreName" to Joined.  Although I agree with
> >>>> Guozhang
> >>>>>> that
> >>>>>>>    having a parameter that only applies to one use-case would be
> >>>>> clumsy.
> >>>>>>>    2. Add a String overload for naming the store, but this would
> >> be
> >>>> my
> >>>>>>>    least favorite option as IMHO this seems to be a step backward
> >>>> from
> >>>>>> why we
> >>>>>>>    introduced configuration objects in the first place.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Bill
> >>>>>>>
> >>>>>>> On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax <
> >>>> matthias@confluent.io
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the KIP Bill!
> >>>>>>>>
> >>>>>>>> Great discussion to far.
> >>>>>>>>
> >>>>>>>> About John's idea about querying upstream stores and don't
> >>>>> materialize
> >>>>>> a
> >>>>>>>> store: I agree with Bill that this seems to be an orthogonal
> >>>>> question,
> >>>>>>>> and it might be better to treat it as an independent
> >> optimization
> >>>> and
> >>>>>>>> exclude from this KIP.
> >>>>>>>>
> >>>>>>>>> What should be the behavior if there is no store
> >>>>>>>>> configured (e.g., if Materialized with only serdes) and
> >>>> querying is
> >>>>>>>>> enabled?
> >>>>>>>>
> >>>>>>>> IMHO, this could be an error case. If one wants to query a
> >> store,
> >>>>> they
> >>>>>>>> need to provide a name -- if you don't know the name, how would
> >>>> you
> >>>>>>>> actually query the store (even if it would be possible to get
> >> the
> >>>>> name
> >>>>>>>> from the `TopologyDescription`, it seems clumsy).
> >>>>>>>>
> >>>>>>>> If we don't want to throw an error, materializing seems to be
> >> the
> >>>>> right
> >>>>>>>> option, to exclude "query optimization" from this KIP. I would
> >> be
> >>>> ok
> >>>>>>>> with this option, even if it's clumsy to get the name from
> >>>>>>>> `TopologyDescription`; hence, I would prefer to treat it as an
> >>>> error.
> >>>>>>>>
> >>>>>>>>> To get back to the current behavior, users would have to
> >>>>>>>>> add a "bytes store supplier" to the Materialized to indicate
> >>>> that,
> >>>>>>>>> yes, they really want a state store there.
> >>>>>>>>
> >>>>>>>> This sound like a quite subtle semantic difference on how to use
> >>>> the
> >>>>>>>> API. Might be hard to explain to users. I would prefer to not
> >>>>>> introduce it.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> About Guozhang's points:
> >>>>>>>>
> >>>>>>>> 1a) That is actually a good point. However, I believe we cannot
> >>>> get
> >>>>>>>> around this issue easily, and it seems ok to me, to expose the
> >>>> actual
> >>>>>>>> store type we are using. (More thoughts later.)
> >>>>>>>>
> >>>>>>>> 1b) I don't see an issue with allowing users to query all
> >> stores?
> >>>>> What
> >>>>>>>> is the rational behind it? What do we gain by not allowing it?
> >>>>>>>>
> >>>>>>>> 2) While I understand what you are saying, we also want/need to
> >>>> have
> >>>>> a
> >>>>>>>> way in the PAPI to allow users adding "internal/private"
> >>>>> non-queryable
> >>>>>>>> stores to a topology. That's possible via
> >>>>>>>> `Materialized#withQueryingDisabled()`. We could also update
> >>>>>>>> `Topology#addStateStore(StoreBuilder, boolean isQueryable,
> >>>>> String...)`
> >>>>>>>> to address this. Again, I agree with Bill that the current API
> >> is
> >>>>> built
> >>>>>>>> in a certain way, and if we want to change it, it should be a
> >>>>> separate
> >>>>>>>> KIP, as it seems to be an orthogonal concern.
> >>>>>>>>
> >>>>>>>>> Instead, we just restrict KIP-307 to NOT
> >>>>>>>>> use the Joined.name for state store names and always use
> >>>> internal
> >>>>>> names
> >>>>>>>> as
> >>>>>>>>> well, which admittedly indeed leaves a hole of not being able
> >> to
> >>>>>> cover
> >>>>>>>> all
> >>>>>>>>> internal names here
> >>>>>>>>
> >>>>>>>> I think it's important to close this gap. Naming entities seems
> >>>> to a
> >>>>>>>> binary feature: if there is a gap, the feature is more or less
> >>>>> useless,
> >>>>>>>> rendering KIP-307 void.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I like John's detailed list of required features and what
> >>>>>>>> Materialized/WindowByteStoreSuppliers offers. My take is, that
> >>>> adding
> >>>>>>>> Materialized including the required run-time checks is the best
> >>>>> option
> >>>>>>>> we have, for the following reasons:
> >>>>>>>>
> >>>>>>>>  - the main purpose of this KIP is to close the naming gap what
> >> we
> >>>>>> achieve
> >>>>>>>>  - we can allow people to use the new in-memory store
> >>>>>>>>  - we allow people to enable/disable caching
> >>>>>>>>  - we unify the API
> >>>>>>>>  - we decouple querying from naming
> >>>>>>>>  - it's a small API change
> >>>>>>>>
> >>>>>>>> Adding an overload and only passing in a name, would address the
> >>>> main
> >>>>>>>> purpose of the KIP. However, it falls short on all the other
> >>>>> "goodies".
> >>>>>>>> As you mentioned, passing in `Materialized` might not be perfect
> >>>> and
> >>>>>>>> maybe we need to deprecate is at some point; but this is also
> >> true
> >>>>> for
> >>>>>>>> passing in just a name.
> >>>>>>>>
> >>>>>>>> I am also not convinced, that a `StreamJoinStore` would resolve
> >>>> all
> >>>>> the
> >>>>>>>> issues. In the end, as long as we are using a `WindowedStore`
> >>>>>>>> internally, we need to expose this "implemenation detail" to
> >>>> users to
> >>>>>>>> allow them to plug in a custom store. Adding `Materialized` seem
> >>>> to
> >>>>> be
> >>>>>>>> the best short-term fix from my point of view.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 6/27/19 9:56 AM, Guozhang Wang wrote:
> >>>>>>>>> Hi John,
> >>>>>>>>>
> >>>>>>>>> I actually feels better about a new interface but I'm not sure
> >>>> if
> >>>>> we
> >>>>>>>> would
> >>>>>>>>> need the full configuration of store / log / cache, now or in
> >>>> the
> >>>>>> future
> >>>>>>>>> ever for stream-stream join.
> >>>>>>>>>
> >>>>>>>>> Right now I feel that 1) we want to improve our implementation
> >>>> of
> >>>>>>>>> stream-stream join, and potentially also allow users to
> >>>> customize
> >>>>>> this
> >>>>>>>>> implementation but with a more suitable interface than the
> >>>> current
> >>>>>>>>> WindowStore interface, how to do that is less clear and
> >>>>>> execution-wise
> >>>>>>>> it's
> >>>>>>>>> (arguably..) not urgent; 2) we want to close the last gap
> >>>>>> (Stream-stream
> >>>>>>>>> join) of allowing users to specify all internal names to help
> >> on
> >>>>>> backward
> >>>>>>>>> compatibility, which is urgent.
> >>>>>>>>>
> >>>>>>>>> Therefore if we want to unblock 2) from 1) in the near term, I
> >>>> feel
> >>>>>>>>> slightly inclined to just add overload functions that takes
> >> in a
> >>>>>> store
> >>>>>>>> name
> >>>>>>>>> for stream-stream joins only -- and admittedly, in the future
> >>>> this
> >>>>>>>> function
> >>>>>>>>> maybe deprecated -- i.e. if we have to do something that we
> >> "may
> >>>>>> regret"
> >>>>>>>> in
> >>>>>>>>> the future, I'd like to pick the least intrusive option.
> >>>>>>>>>
> >>>>>>>>> About `Joined#withStoreName`: since the Joined class itself is
> >>>> also
> >>>>>> used
> >>>>>>>> in
> >>>>>>>>> other join types, I feel less comfortable to have a
> >>>>>>>> `Joined#withStoreName`
> >>>>>>>>> which is only going to be used by stream-stream join. Or
> >> maybe I
> >>>>> miss
> >>>>>>>>> something here about the "latter" case that you are referring
> >>>> to?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>> On Mon, Jun 24, 2019 at 12:16 PM John Roesler <
> >>>> john@confluent.io>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks Guozhang,
> >>>>>>>>>>
> >>>>>>>>>> Yep. Maybe we can consider just exactly what the join needs:
> >>>>>>>>>>
> >>>>>>>>>>> the WindowStore<Bytes, byte[]> itself is fine, if overly
> >>>> broad,
> >>>>>>>>>>> since the only two methods we need are `window.put(key,
> >> value,
> >>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> iter =
> >>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`.
> >>>>>>>>>>
> >>>>>>>>>> One "middle ground" would be to extract _this_ into a new
> >> store
> >>>>>>>>>> interface, which only supports these API calls, like
> >>>>>>>>>> StreamJoinStore<K, V>. This would give us the latitude we
> >> need
> >>>> to
> >>>>>>>>>> efficiently support the exact operation without concerning
> >>>>> ourselves
> >>>>>>>>>> with all the other things a WindowStore can do (which are
> >>>>>> unreachable
> >>>>>>>>>> for the join use case). It would also let us drop "store
> >>>>> duplicates"
> >>>>>>>>>> from the main WindowStore interface, since it only exists to
> >>>>> support
> >>>>>>>>>> the join use case.
> >>>>>>>>>>
> >>>>>>>>>> If we were to add a new StreamJoinStore interface, then it'd
> >> be
> >>>>>>>>>> straightforward how we could add also
> >>>>>>>>>> `Materialized.as(StreamJoinBytesStoreSupplier)` and use
> >>>>>> Materialized,
> >>>>>>>>>> or alternatively add the ability to set the bytes store on
> >>>> Joined.
> >>>>>>>>>>
> >>>>>>>>>> Personally, I'm kind of leaning toward the latter (and also
> >>>> doing
> >>>>>>>>>> `Joined#withStoreName`), since adding the new interface to
> >>>>>>>>>> Materialized then also pollutes the interface for its
> >> _actual_
> >>>> use
> >>>>>>>>>> case of materializing a table view. Of course, to solve the
> >>>>>> immediate
> >>>>>>>>>> problem, all we need is the store name, but we might feel
> >>>> better
> >>>>>> about
> >>>>>>>>>> adding the store name to Joined if we _also_ feel like in the
> >>>>>> future,
> >>>>>>>>>> we would add store/log/cache configuration to Joined as well.
> >>>>>>>>>>
> >>>>>>>>>> -John
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <
> >>>>> wangguoz@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hello John,
> >>>>>>>>>>>
> >>>>>>>>>>> My main concern is exactly the first point at the bottom of
> >>>> your
> >>>>>>>> analysis
> >>>>>>>>>>> here: "* configure the bytes store". I'm not sure if using a
> >>>>> window
> >>>>>>>> bytes
> >>>>>>>>>>> store would be ideal for stream-stream windowed join; e.g.
> >> we
> >>>>> could
> >>>>>>>>>>> consider two dimensional list sorted by timestamps and then
> >> by
> >>>>>> keys to
> >>>>>>>> do
> >>>>>>>>>>> the join, whereas a windowed bytes store is basically sorted
> >>>> by
> >>>>> key
> >>>>>>>>>> first,
> >>>>>>>>>>> then by timestamp. If we expose the Materialized to let user
> >>>> pass
> >>>>>> in a
> >>>>>>>>>>> windowed bytes store, then we would need to change that if
> >> we
> >>>>> want
> >>>>>> to
> >>>>>>>>>>> replace it with a different implementation interface.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <
> >>>> john@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hey Guozhang and Bill,
> >>>>>>>>>>>>
> >>>>>>>>>>>> For what it's worth, I agree with you both!
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think it might help the discussion to look concretely at
> >>>> what
> >>>>>>>>>>>> Materialized does:
> >>>>>>>>>>>> * set a WindowBytesStoreSupplier
> >>>>>>>>>>>> * set a name
> >>>>>>>>>>>> * set the key/value serdes
> >>>>>>>>>>>> * disable/enable/configure change-logging
> >>>>>>>>>>>> * disable/enable caching
> >>>>>>>>>>>> * configure retention
> >>>>>>>>>>>>
> >>>>>>>>>>>> Further, looking into the WindowBytesStoreSupplier, the
> >>>>> interface
> >>>>>> lets
> >>>>>>>>>> you:
> >>>>>>>>>>>> * get the segment interval
> >>>>>>>>>>>> * get the window size
> >>>>>>>>>>>> * get whether "duplicates" are enabled
> >>>>>>>>>>>> * get the retention period
> >>>>>>>>>>>> * (obviously) get a WindowStore<Bytes, byte[]>
> >>>>>>>>>>>>
> >>>>>>>>>>>> We know that Materialized isn't exactly what we need for
> >>>> stream
> >>>>>> joins,
> >>>>>>>>>>>> but we can see how close Materialized is to what we need.
> >> If
> >>>> it
> >>>>> is
> >>>>>>>>>>>> close, maybe we can use it and document the gaps, and if it
> >>>> is
> >>>>> not
> >>>>>>>>>>>> close, then maybe we should just add what we need to
> >> Joined.
> >>>>>>>>>>>> Stream Join's requirements for its stores:
> >>>>>>>>>>>> * a multimap store (i.e., it keeps duplicates) for storing
> >>>>> general
> >>>>>>>>>>>> (not windowed) keyed records associated with their
> >> insertion
> >>>>>> time, and
> >>>>>>>>>>>> allows efficient time-bounded lookups and also efficient
> >>>> purges
> >>>>>> of old
> >>>>>>>>>>>> data.
> >>>>>>>>>>>> ** Note, a properly configured WindowBytesStoreSupplier
> >>>>> satisfies
> >>>>>> this
> >>>>>>>>>>>> requirement, and the interface supports the queries we need
> >>>> to
> >>>>>> verify
> >>>>>>>>>>>> the configuration at run-time
> >>>>>>>>>>>> * set a name for the store
> >>>>>>>>>>>> * do _not_ set the serdes (they are already set in Joined)
> >>>>>>>>>>>> * logging could be configurable (set to enabled now)
> >>>>>>>>>>>> * caching could be configurable (set to enabled now)
> >>>>>>>>>>>> * do _not_ configure retention (determined by JoinWindows)
> >>>>>>>>>>>>
> >>>>>>>>>>>> So, out of six capabilities for Materialized, there are two
> >>>> we
> >>>>>> don't
> >>>>>>>>>>>> want (serdes and retention). These would become run-time
> >>>> checks
> >>>>>> if we
> >>>>>>>>>>>> use it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A third questionable capability is to provide a
> >>>>>>>>>>>> WindowBytesStoreSupplier. Looking at whether the
> >>>>>>>>>>>> WindowBytesStoreSupplier is the right interface for Stream
> >>>> Join:
> >>>>>>>>>>>> * configuring segment interval is fine
> >>>>>>>>>>>> * should _not_ configure window size (it's determined by
> >>>>>> JoinWindows)
> >>>>>>>>>>>> * duplicates _must_ be enabled
> >>>>>>>>>>>> * retention should be _at least_ windowSize + gracePeriod,
> >>>> but
> >>>>>> note
> >>>>>>>>>>>> that (unlike for Table window stores) there is no utility
> >> in
> >>>>>> having a
> >>>>>>>>>>>> longer retention time.
> >>>>>>>>>>>> * the WindowStore<Bytes, byte[]> itself is fine, if overly
> >>>>> broad,
> >>>>>>>>>>>> since the only two methods we need are `window.put(key,
> >>>> value,
> >>>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> iter =
> >>>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thus, flattening out the overlap for
> >> WindowBytesStoreSupplier
> >>>>>> onto the
> >>>>>>>>>>>> overlap for Materialized, we have 9 capabilities total
> >> (note
> >>>>>> retention
> >>>>>>>>>>>> is duplicated), we have 4 that we don't want:
> >>>>>>>>>>>> * do _not_ set the serdes (they are already set in Joined)
> >>>>>>>>>>>> * do _not_ configure retention (determined by JoinWindows)
> >>>>>>>>>>>> * should _not_ configure window size (it's determined by
> >>>>>> JoinWindows)
> >>>>>>>>>>>> * duplicates _must_ be enabled
> >>>>>>>>>>>>
> >>>>>>>>>>>> These gaps would have to be covered with run-time checks if
> >>>> we
> >>>>>> re-use
> >>>>>>>>>>>> Materialized and WindowStoreBytesStoreSupplier both. Maybe
> >>>> this
> >>>>>> sounds
> >>>>>>>>>>>> bad, but consider the other side, that we get 5 new
> >>>> capabilities
> >>>>>> we
> >>>>>>>>>>>> don't require, but are still pretty nice:
> >>>>>>>>>>>> * configure the bytes store
> >>>>>>>>>>>> * set a name for the store
> >>>>>>>>>>>> * configure caching
> >>>>>>>>>>>> * configure logging
> >>>>>>>>>>>> * configure segment interval
> >>>>>>>>>>>>
> >>>>>>>>>>>> Not sure where this nets us out, but it's food for thought.
> >>>>>>>>>>>> -John
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <
> >>>>> wangguoz@gmail.com
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think by giving a Materialized param into stream-stream
> >>>> join,
> >>>>>> it's
> >>>>>>>>>> okay
> >>>>>>>>>>>>> (though still ideal) to say "we still would not expose the
> >>>>> store
> >>>>>> for
> >>>>>>>>>>>>> queries", but it would sound a bit awkward to say "we
> >> would
> >>>>> also
> >>>>>>>>>> ignore
> >>>>>>>>>>>>> whatever the passed in store supplier but just use our
> >>>> default
> >>>>>> ones"
> >>>>>>>>>> --
> >>>>>>>>>>>>> again the concern is that, if in the future we'd want to
> >>>> change
> >>>>>> the
> >>>>>>>>>>>> default
> >>>>>>>>>>>>> implementation of join algorithm which no longer rely on a
> >>>>> window
> >>>>>>>>>> store
> >>>>>>>>>>>>> with deduping enabled, then we need to change this API
> >>>> again by
> >>>>>>>>>> changing
> >>>>>>>>>>>>> the store supplier type.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If we do want to fill this hole for stream-stream join, I
> >>>> feel
> >>>>>> just
> >>>>>>>>>>>> adding
> >>>>>>>>>>>>> a String typed store-name would even be less
> >>>> future-intrusive
> >>>>> if
> >>>>>> we
> >>>>>>>>>>>> expect
> >>>>>>>>>>>>> this parameter to be modified later.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Does that makes sense?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <
> >>>>> bbejeck@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the comments John and Guozhang, I'll address
> >>>> each
> >>>>>> one of
> >>>>>>>>>>>> your
> >>>>>>>>>>>>>> comments in turn.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> John,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the truth
> >>>> table
> >>>>>>>>>> involving
> >>>>>>>>>>>>>>> whether a Materialized is stored or not and querying is
> >>>>>>>>>>>>>>> enabled/disabled... What should be the behavior if there
> >>>> is
> >>>>> no
> >>>>>>>>>> store
> >>>>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) and
> >>>>>> querying
> >>>>>>>>>> is
> >>>>>>>>>>>>>> enabled?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It seems we have two choices:
> >>>>>>>>>>>>>>> 1. we can force creation of a state store in this case,
> >> so
> >>>>> the
> >>>>>>>>>> store
> >>>>>>>>>>>>>>> can be used to serve the queries
> >>>>>>>>>>>>>>> 2. we can provide just a queriable view, basically
> >>>> letting IQ
> >>>>>>>>>> query
> >>>>>>>>>>>>>>> into the "KTableValueGetter", which would transparently
> >>>>>>>>>> construct the
> >>>>>>>>>>>>>>> query response by applying the operator logic to the
> >>>> upstream
> >>>>>>>>>> state
> >>>>>>>>>>>> if
> >>>>>>>>>>>>>>> the operator state isn't already stored.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I agree with your assertion about a missing quadrant from
> >>>> the
> >>>>>> truth
> >>>>>>>>>>>> table.
> >>>>>>>>>>>>>> Additionally, I too like the concept of a queriable view.
> >>>>> But I
> >>>>>>>>>> think
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>> goes a bit beyond the scope of this KIP and would like to
> >>>>> pursue
> >>>>>>>>>> that
> >>>>>>>>>>>>>> feature as follow-on work.  Also thinking about this KIP
> >>>> some
> >>>>>>>>>> more, I'm
> >>>>>>>>>>>>>> thinking of the changes to Materialized might be a reach
> >> as
> >>>>>> well.
> >>>>>>>>>>>>>> Separating the naming from a store and its queryable
> >> state
> >>>>> seems
> >>>>>>>>>> like a
> >>>>>>>>>>>>>> complex issue in and of itself and should be treated
> >>>>>> accordingly.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So here's what I'm thinking now.  We add Materialzied to
> >>>> Join,
> >>>>>> but
> >>>>>>>>>> for
> >>>>>>>>>>>> now,
> >>>>>>>>>>>>>> we internally disable querying.  I know this breaks our
> >>>>> current
> >>>>>>>>>>>> semantic
> >>>>>>>>>>>>>> approach, but I think it's crucial that we do two things
> >> in
> >>>>> this
> >>>>>>>>>> KIP
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>    1. Break the naming of the state stores from Joined to
> >>>>>>>>>>>> Materialized, so
> >>>>>>>>>>>>>>    the naming of state stores follows our current pattern
> >>>> and
> >>>>>>>>>> enables
> >>>>>>>>>>>>>> upgrades
> >>>>>>>>>>>>>>    from 2.3 to 2.4
> >>>>>>>>>>>>>>    2. Offer the ability to configure the state stores of
> >>>> the
> >>>>>> join,
> >>>>>>>>>> even
> >>>>>>>>>>>>>>    providing a different implementation (i.e. in-memory)
> >> if
> >>>>>>>>>> desired.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> With that in mind I'm considering changing the KIP to
> >>>> remove
> >>>>> the
> >>>>>>>>>>>> changes to
> >>>>>>>>>>>>>> Materialized, and we document very clearly that by
> >>>> providing a
> >>>>>>>>>>>> Materialized
> >>>>>>>>>>>>>> object with a name is only for naming the state store,
> >>>> hence
> >>>>> the
> >>>>>>>>>>>> changelog
> >>>>>>>>>>>>>> topics and any possible configurations of the store, but
> >>>> this
> >>>>>> store
> >>>>>>>>>>>> *will
> >>>>>>>>>>>>>> not be available for IQ.*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. About not breaking compatibility of stream-stream
> >> join
> >>>>>>>>>>>> materialized
> >>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, but
> >> after
> >>>>>>>>>> thinking
> >>>>>>>>>>>> about
> >>>>>>>>>>>>>>> it once more I'm not sure if exposing Materialized would
> >>>> be a
> >>>>>>>>>> good
> >>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>> here. My rationles:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of
> >>>> window-store
> >>>>>> is
> >>>>>>>>>> not
> >>>>>>>>>>>>>> ideal,
> >>>>>>>>>>>>>>> and we want to modify it in the near future to be more
> >>>>>>>>>> efficient. Not
> >>>>>>>>>>>>>>> allowing users to override such state store backend
> >> gives
> >>>> us
> >>>>>> such
> >>>>>>>>>>>> freedom
> >>>>>>>>>>>>>>> (which was also considered in the original DSL design),
> >>>>> whereas
> >>>>>>>>>>>> getting a
> >>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out that
> >> freedom
> >>>>> out
> >>>>>>>>>> of the
> >>>>>>>>>>>>>>> window.
> >>>>>>>>>>>>>>> 1.b For strema-stream join, in our original design we
> >>>> intend
> >>>>> to
> >>>>>>>>>>>> "never"
> >>>>>>>>>>>>>>> want users to query the state, since it is just for
> >>>> buffering
> >>>>>> the
> >>>>>>>>>>>>>> upcoming
> >>>>>>>>>>>>>>> records from the stream. Now I know that some users may
> >>>>> indeed
> >>>>>>>>>> want
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> query it from the debugging perspective, but still I
> >>>>> concerned
> >>>>>>>>>> about
> >>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes would be
> >> the
> >>>>> right
> >>>>>>>>>>>> solution
> >>>>>>>>>>>>>>> here. And adding Materialized object opens the door to
> >> let
> >>>>>> users
> >>>>>>>>>>>> query
> >>>>>>>>>>>>>>> about it (unless we did something intentionally to still
> >>>>>> forbids
> >>>>>>>>>> it),
> >>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>> also restricts us in the future.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2. About the coupling between Materialized.name() and
> >>>>>> queryable:
> >>>>>>>>>>>> again I
> >>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if the
> >>>> current
> >>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the
> >>>> best
> >>>>>>>>>>>> approach.
> >>>>>>>>>>>>>>> Here I think I agree with John, that generally speaking
> >>>> it's
> >>>>>>>>>> better
> >>>>>>>>>>>> be a
> >>>>>>>>>>>>>>> control function on the `KTable` itself, rather than on
> >>>>>>>>>>>> `Materialized`,
> >>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>> fixing it via adding functions through `Materialized`
> >>>> seems
> >>>>>> not a
> >>>>>>>>>>>> natural
> >>>>>>>>>>>>>> approach either.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I understand your thoughts here, and up to a point, I
> >> agree
> >>>>> with
> >>>>>>>>>> you.
> >>>>>>>>>>>>>> But concerning not providing Materialized as it may
> >>>> restrict
> >>>>> us
> >>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>> future for delivering different implementations, I'm
> >>>> wondering
> >>>>>> if
> >>>>>>>>>> we
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>> doing some premature optimization here.
> >>>>>>>>>>>>>> My rationale for saying so
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>    1. I think the cost of not allowing the naming of
> >> state
> >>>>>> stores
> >>>>>>>>>> for
> >>>>>>>>>>>> joins
> >>>>>>>>>>>>>>    is too big of a gap to leave.   IMHO for joins to
> >> follow
> >>>>> the
> >>>>>>>>>> current
> >>>>>>>>>>>>>>    pattern of using Materialized for naming state stores
> >>>> would
> >>>>>> be
> >>>>>>>>>> what
> >>>>>>>>>>>> most
> >>>>>>>>>>>>>>    users would expect to use.  As I said in my comments
> >>>>> above, I
> >>>>>>>>>> think
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>    should *not include* the changes to Materialized and
> >>>>> enforce
> >>>>>>>>>> named
> >>>>>>>>>>>>>>    stores for joins as unavailable for IQ.
> >>>>>>>>>>>>>>    2. We'll still have the join methods available
> >> without a
> >>>>>>>>>>>> Materialized
> >>>>>>>>>>>>>>    allowing us to do something different internally if a
> >>>>>>>>>> Materialized
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>    provided.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use two
> >> stones
> >>>>>> rather
> >>>>>>>>>>>> than
> >>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>> to kill these two birds, and probably for this KIP we
> >> just
> >>>>>> focus
> >>>>>>>>>> on
> >>>>>>>>>>>> 1)
> >>>>>>>>>>>>>>> above. And for that I'd like to not expose the
> >>>> Materialized
> >>>>>>>>>> either
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> rationales that I've listed above. Instead, we just
> >>>> restrict
> >>>>>>>>>> KIP-307
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> NOT
> >>>>>>>>>>>>>>> use the Joined.name for state store names and always use
> >>>>>> internal
> >>>>>>>>>>>> names
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not being
> >>>> able
> >>>>>> to
> >>>>>>>>>>>> cover
> >>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>> internal names here, but now I feel this `hole` may
> >>>> better be
> >>>>>>>>>> filled
> >>>>>>>>>>>> by,
> >>>>>>>>>>>>>>> e.g. not creating changelog topics but just use the
> >>>> upstream
> >>>>> to
> >>>>>>>>>>>>>>> re-bootstrap the materialized store, more concretely:
> >> when
> >>>>>>>>>>>> materializing
> >>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic on an
> >>>>> existing
> >>>>>>>>>>>> topic,
> >>>>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>> a) if the stream is coming directly from some source
> >> topic
> >>>>>>>>>> (including
> >>>>>>>>>>>>>>> repartition topic), make that as changelog topic and if
> >>>> it is
> >>>>>>>>>>>> repartition
> >>>>>>>>>>>>>>> topic change the retention / data purging policy
> >>>> necessarily
> >>>>> as
> >>>>>>>>>>>> well; b)
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>> the stream is coming from some stateless operators,
> >>>> delegate
> >>>>>> that
> >>>>>>>>>>>>>> stateless
> >>>>>>>>>>>>>>> operator to the parent stream similar as a); if the
> >>>> stream is
> >>>>>>>>>> coming
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>> stream-stream join which is the only stateful operator
> >>>> that
> >>>>> can
> >>>>>>>>>>>> result in
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>> stream, consider merging the join into multi-way joins
> >>>> (yes,
> >>>>>>>>>> this is
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we do not
> >>>> try
> >>>>> to
> >>>>>>>>>>>> tackle it
> >>>>>>>>>>>>>>> now but leave it for a better solution :).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I really like this idea!  I agree with you in that this
> >>>>> approach
> >>>>>>>>>> to too
> >>>>>>>>>>>>>> much for adding in this KIP, but we could pick it up
> >> later
> >>>> and
> >>>>>>>>>>>> leverage the
> >>>>>>>>>>>>>> Optimization framework to accomplish this re-use.
> >>>>>>>>>>>>>> Again, while I agree we should break the naming of join
> >>>> state
> >>>>>>>>>> stores
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>> KIP-307, IMHO it's something we should fix now as it will
> >>>> be
> >>>>> the
> >>>>>>>>>> last
> >>>>>>>>>>>> piece
> >>>>>>>>>>>>>> we can provide to give users the ability to completely
> >> make
> >>>>>> their
> >>>>>>>>>>>>>> topologies "upgrade proof" when adding additional
> >>>> operations.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks again to both of you for comments and I look
> >>>> forward to
> >>>>>>>>>> hearing
> >>>>>>>>>>>> back
> >>>>>>>>>>>>>> from you.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <
> >>>>>> wangguoz@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello Bill,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the KIP. Glad to see that we can likely
> >>>> shooting
> >>>>> two
> >>>>>>>>>> birds
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>> one stone. I have some concerns though about those "two
> >>>>> birds"
> >>>>>>>>>>>>>> themselves:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. About not breaking compatibility of stream-stream
> >> join
> >>>>>>>>>>>> materialized
> >>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, but
> >> after
> >>>>>>>>>> thinking
> >>>>>>>>>>>> about
> >>>>>>>>>>>>>>> it once more I'm not sure if exposing Materialized would
> >>>> be a
> >>>>>>>>>> good
> >>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>> here. My rationles:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of
> >>>> window-store
> >>>>>> is
> >>>>>>>>>> not
> >>>>>>>>>>>>>> ideal,
> >>>>>>>>>>>>>>> and we want to modify it in the near future to be more
> >>>>>>>>>> efficient. Not
> >>>>>>>>>>>>>>> allowing users to override such state store backend
> >> gives
> >>>> us
> >>>>>> such
> >>>>>>>>>>>> freedom
> >>>>>>>>>>>>>>> (which was also considered in the original DSL design),
> >>>>> whereas
> >>>>>>>>>>>> getting a
> >>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out that
> >> freedom
> >>>>> out
> >>>>>>>>>> of the
> >>>>>>>>>>>>>>> window.
> >>>>>>>>>>>>>>> 1.b For strema-stream join, in our original design we
> >>>> intend
> >>>>> to
> >>>>>>>>>>>> "never"
> >>>>>>>>>>>>>>> want users to query the state, since it is just for
> >>>> buffering
> >>>>>> the
> >>>>>>>>>>>>>> upcoming
> >>>>>>>>>>>>>>> records from the stream. Now I know that some users may
> >>>>> indeed
> >>>>>>>>>> want
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> query it from the debugging perspective, but still I
> >>>>> concerned
> >>>>>>>>>> about
> >>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes would be
> >> the
> >>>>> right
> >>>>>>>>>>>> solution
> >>>>>>>>>>>>>>> here. And adding Materialized object opens the door to
> >> let
> >>>>>> users
> >>>>>>>>>>>> query
> >>>>>>>>>>>>>>> about it (unless we did something intentionally to still
> >>>>>> forbids
> >>>>>>>>>> it),
> >>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>> also restricts us in the future.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2. About the coupling between Materialized.name() and
> >>>>>> queryable:
> >>>>>>>>>>>> again I
> >>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if the
> >>>> current
> >>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the
> >>>> best
> >>>>>>>>>>>> approach.
> >>>>>>>>>>>>>>> Here I think I agree with John, that generally speaking
> >>>> it's
> >>>>>>>>>> better
> >>>>>>>>>>>> be a
> >>>>>>>>>>>>>>> control function on the `KTable` itself, rather than on
> >>>>>>>>>>>> `Materialized`,
> >>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>> fixing it via adding functions through `Materialized`
> >>>> seems
> >>>>>> not a
> >>>>>>>>>>>> natural
> >>>>>>>>>>>>>>> approach either.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use two
> >> stones
> >>>>>> rather
> >>>>>>>>>>>> than
> >>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>> to kill these two birds, and probably for this KIP we
> >> just
> >>>>>> focus
> >>>>>>>>>> on
> >>>>>>>>>>>> 1)
> >>>>>>>>>>>>>>> above. And for that I'd like to not expose the
> >>>> Materialized
> >>>>>>>>>> either
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> rationales that I've listed above. Instead, we just
> >>>> restrict
> >>>>>>>>>> KIP-307
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> NOT
> >>>>>>>>>>>>>>> use the Joined.name for state store names and always use
> >>>>>> internal
> >>>>>>>>>>>> names
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not being
> >>>> able
> >>>>>> to
> >>>>>>>>>>>> cover
> >>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>> internal names here, but now I feel this `hole` may
> >>>> better be
> >>>>>>>>>> filled
> >>>>>>>>>>>> by,
> >>>>>>>>>>>>>>> e.g. not creating changelog topics but just use the
> >>>> upstream
> >>>>> to
> >>>>>>>>>>>>>>> re-bootstrap the materialized store, more concretely:
> >> when
> >>>>>>>>>>>> materializing
> >>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic on an
> >>>>> existing
> >>>>>>>>>>>> topic,
> >>>>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>> a) if the stream is coming directly from some source
> >> topic
> >>>>>>>>>> (including
> >>>>>>>>>>>>>>> repartition topic), make that as changelog topic and if
> >>>> it is
> >>>>>>>>>>>> repartition
> >>>>>>>>>>>>>>> topic change the retention / data purging policy
> >>>> necessarily
> >>>>> as
> >>>>>>>>>>>> well; b)
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>> the stream is coming from some stateless operators,
> >>>> delegate
> >>>>>> that
> >>>>>>>>>>>>>> stateless
> >>>>>>>>>>>>>>> operator to the parent stream similar as a); if the
> >>>> stream is
> >>>>>>>>>> coming
> >>>>>>>>>>>>>> from a
> >>>>>>>>>>>>>>> stream-stream join which is the only stateful operator
> >>>> that
> >>>>> can
> >>>>>>>>>>>> result
> >>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>> stream, consider merging the join into multi-way joins
> >>>> (yes,
> >>>>>>>>>> this is
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we do not
> >>>> try
> >>>>> to
> >>>>>>>>>>>> tackle it
> >>>>>>>>>>>>>>> now but leave it for a better solution :).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler <
> >>>>>> john@confluent.io
> >>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the KIP! Awesome job catching this
> >> unexpected
> >>>>>>>>>>>> consequence
> >>>>>>>>>>>>>>>> of the prior KIPs before it was released.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The proposal looks good to me. On top of just fixing
> >> the
> >>>>>>>>>> problem,
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> seems to address two other pain points:
> >>>>>>>>>>>>>>>> * that naming a state store automatically causes it to
> >>>>> become
> >>>>>>>>>>>>>> queriable.
> >>>>>>>>>>>>>>>> * that there's currently no way to configure the bytes
> >>>> store
> >>>>>>>>>> for
> >>>>>>>>>>>> join
> >>>>>>>>>>>>>>>> windows.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It's awesome that we can fix this issue and two others
> >>>> with
> >>>>>> one
> >>>>>>>>>>>>>> feature.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the truth
> >>>> table
> >>>>>>>>>>>> involving
> >>>>>>>>>>>>>>>> whether a Materialized is stored or not and querying is
> >>>>>>>>>>>>>>>> enabled/disabled... What should be the behavior if
> >> there
> >>>> is
> >>>>> no
> >>>>>>>>>>>> store
> >>>>>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) and
> >>>>>>>>>> querying is
> >>>>>>>>>>>>>>>> enabled?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It seems we have two choices:
> >>>>>>>>>>>>>>>> 1. we can force creation of a state store in this case,
> >>>> so
> >>>>> the
> >>>>>>>>>>>> store
> >>>>>>>>>>>>>>>> can be used to serve the queries
> >>>>>>>>>>>>>>>> 2. we can provide just a queriable view, basically
> >>>> letting
> >>>>> IQ
> >>>>>>>>>> query
> >>>>>>>>>>>>>>>> into the "KTableValueGetter", which would transparently
> >>>>>>>>>> construct
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> query response by applying the operator logic to the
> >>>>> upstream
> >>>>>>>>>>>> state if
> >>>>>>>>>>>>>>>> the operator state isn't already stored.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Offhand, it seems like the second is actually a pretty
> >>>>> awesome
> >>>>>>>>>>>>>>>> capability. But it might have an awkward interaction
> >> with
> >>>>> the
> >>>>>>>>>>>> current
> >>>>>>>>>>>>>>>> semantics. Presently, if I provide a
> >>>> Materialized.withName,
> >>>>> it
> >>>>>>>>>>>> implies
> >>>>>>>>>>>>>>>> that querying should be enabled AND that the view
> >> should
> >>>>>>>>>> actually
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> stored in a state store. Under option 2 above, this
> >>>> behavior
> >>>>>>>>>> would
> >>>>>>>>>>>>>>>> change to NOT provision a state store and instead just
> >>>>> consult
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> ValueGetter. To get back to the current behavior, users
> >>>>> would
> >>>>>>>>>> have
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> add a "bytes store supplier" to the Materialized to
> >>>> indicate
> >>>>>>>>>> that,
> >>>>>>>>>>>>>>>> yes, they really want a state store there.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Behavior changes are always kind of scary, but I think
> >> in
> >>>>> this
> >>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>> it might actually be preferable. In the event where
> >> only
> >>>> the
> >>>>>>>>>> name
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> provided, it means that people just wanted to make the
> >>>>>>>>>> operation
> >>>>>>>>>>>>>>>> result queriable. If we automatically convert this to a
> >>>>>>>>>> non-stored
> >>>>>>>>>>>>>>>> view, then simply upgrading results in the same
> >>>> observable
> >>>>>>>>>> behavior
> >>>>>>>>>>>>>>>> and semantics, but a linear reduction in local storage
> >>>>>>>>>> requirements
> >>>>>>>>>>>>>>>> and disk i/o, as well as a corresponding linear
> >>>> reduction in
> >>>>>>>>>> memory
> >>>>>>>>>>>>>>>> usage both on and off heap.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <
> >>>>>> bbejeck@gmail.com
> >>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> All,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'd like to start a discussion for adding a
> >> Materialized
> >>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>> object to KStream.join for naming state stores
> >> involved
> >>>> in
> >>>>>>>>>> joins.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Your comments and suggestions are welcome.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message