kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <bbej...@gmail.com>
Subject Re: [DISCUSS] KIP-479: Add Materialized to Join
Date Tue, 17 Sep 2019 15:52:58 GMT
Bumping this discussion as we need to re-vote before the KIP deadline.

On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbejeck@gmail.com> wrote:

> Hi All,
>
> While working on the implementation of KIP-479, some issues came to light
> that the KIP as written won't work.  I have updated the KIP with a solution
> I believe will solve the original problem as well as address the
> impediment to the initial approach.
>
> This update is a significant change, so please review the updated KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join and
> provide feedback.  After we conclude the discussion, there will be a
> re-vote.
>
> Thanks!
> Bill
>
> On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Hi Bill, thanks for your explanations. I'm on board with your decision
>> too.
>>
>>
>> Guozhang
>>
>> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbejeck@gmail.com> wrote:
>>
>> > Thanks for the response, John.
>> >
>> > > If I can offer my thoughts, it seems better to just document on the
>> > > Stream join javadoc for the Materialized parameter that it will not
>> > > make the join result queriable. I'm not opposed to the queriable flag
>> > > in general, but introducing it is a much larger consideration that has
>> > > previously derailed this KIP discussion. In the interest of just
>> > > closing the gap and keeping the API change small, it seems better to
>> > > just go with documentation for now.
>> >
>> > I agree with your statement here.  IMHO the most important goal of this
>> KIP
>> > is to not breaking existing users and gain some consistency of the API.
>> >
>> > I'll update the KIP accordingly.
>> >
>> > -Bill
>> >
>> > On Tue, Jul 16, 2019 at 11:55 AM John Roesler <john@confluent.io>
>> wrote:
>> >
>> > > Hi Bill,
>> > >
>> > > Thanks for driving this KIP toward a conclusion. I'm on board with
>> > > your decision.
>> > >
>> > > You didn't mention whether you're still proposing to add the
>> > > "queriable" flag to the Materialized config object, or just document
>> > > that a Stream join is never queriable. Both options have come up
>> > > earlier in the discussion, so it would be good to pin this down.
>> > >
>> > > If I can offer my thoughts, it seems better to just document on the
>> > > Stream join javadoc for the Materialized parameter that it will not
>> > > make the join result queriable. I'm not opposed to the queriable flag
>> > > in general, but introducing it is a much larger consideration that has
>> > > previously derailed this KIP discussion. In the interest of just
>> > > closing the gap and keeping the API change small, it seems better to
>> > > just go with documentation for now.
>> > >
>> > > Thanks again,
>> > > -John
>> > >
>> > > On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <bbejeck@gmail.com>
>> wrote:
>> > > >
>> > > > Thanks all for the great discussion so far.
>> > > >
>> > > > Everyone has made excellent points, and I appreciate the detail
>> > everyone
>> > > > has put into their arguments.
>> > > >
>> > > > However, after carefully evaluating all the points made so far,
>> > creating
>> > > an
>> > > > overload with Materialized is still my #1 option.
>> > > > My reasoning for saying so is two-fold:
>> > > >
>> > > >    1. It's a small change, and IMHO since it's consistent with our
>> > > current
>> > > >    API concerning state store usage, the cognitive load on users
>> will
>> > be
>> > > >    minimal.
>> > > >    2. It achieves the most important goal of this KIP, namely to
>> close
>> > > the
>> > > >    gap of naming state stores independently of the join operator
>> name.
>> > > >
>> > > > Additionally, I agree with the points made by Matthias earlier (I
>> > realize
>> > > > there is some overlap here).
>> > > >
>> > > > >  - the main purpose of this KIP is to close the naming gap what we
>> > > achieve
>> > > > >  - we can allow people to use the new in-memory store
>> > > > >  - we allow people to enable/disable caching
>> > > > >  - we unify the API
>> > > > >  - we decouple querying from naming
>> > > > >  - it's a small API change
>> > > >
>> > > > Although it's not a perfect solution,  IMHO the positives of using
>> > > > Materialize far outweigh the negatives, and from what we've
>> discussed
>> > so
>> > > > far, anything we implement seems to involve an additional change
>> down
>> > the
>> > > > road.
>> > > >
>> > > > If others are still strongly opposed to using Materialized, my other
>> > > > preferences would be
>> > > >
>> > > >    1. Add a "withStoreName" to Joined.  Although I agree with
>> Guozhang
>> > > that
>> > > >    having a parameter that only applies to one use-case would be
>> > clumsy.
>> > > >    2. Add a String overload for naming the store, but this would be
>> my
>> > > >    least favorite option as IMHO this seems to be a step backward
>> from
>> > > why we
>> > > >    introduced configuration objects in the first place.
>> > > >
>> > > > Thanks,
>> > > > Bill
>> > > >
>> > > > On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax <
>> matthias@confluent.io
>> > >
>> > > > wrote:
>> > > >
>> > > > > Thanks for the KIP Bill!
>> > > > >
>> > > > > Great discussion to far.
>> > > > >
>> > > > > About John's idea about querying upstream stores and don't
>> > materialize
>> > > a
>> > > > > store: I agree with Bill that this seems to be an orthogonal
>> > question,
>> > > > > and it might be better to treat it as an independent optimization
>> and
>> > > > > exclude from this KIP.
>> > > > >
>> > > > > > What should be the behavior if there is no store
>> > > > > > configured (e.g., if Materialized with only serdes) and
>> querying is
>> > > > > > enabled?
>> > > > >
>> > > > > IMHO, this could be an error case. If one wants to query a store,
>> > they
>> > > > > need to provide a name -- if you don't know the name, how would
>> you
>> > > > > actually query the store (even if it would be possible to get the
>> > name
>> > > > > from the `TopologyDescription`, it seems clumsy).
>> > > > >
>> > > > > If we don't want to throw an error, materializing seems to be the
>> > right
>> > > > > option, to exclude "query optimization" from this KIP. I would be
>> ok
>> > > > > with this option, even if it's clumsy to get the name from
>> > > > > `TopologyDescription`; hence, I would prefer to treat it as an
>> error.
>> > > > >
>> > > > > > To get back to the current behavior, users would have to
>> > > > > > add a "bytes store supplier" to the Materialized to indicate
>> that,
>> > > > > > yes, they really want a state store there.
>> > > > >
>> > > > > This sound like a quite subtle semantic difference on how to use
>> the
>> > > > > API. Might be hard to explain to users. I would prefer to not
>> > > introduce it.
>> > > > >
>> > > > >
>> > > > >
>> > > > > About Guozhang's points:
>> > > > >
>> > > > > 1a) That is actually a good point. However, I believe we cannot
>> get
>> > > > > around this issue easily, and it seems ok to me, to expose the
>> actual
>> > > > > store type we are using. (More thoughts later.)
>> > > > >
>> > > > > 1b) I don't see an issue with allowing users to query all stores?
>> > What
>> > > > > is the rational behind it? What do we gain by not allowing it?
>> > > > >
>> > > > > 2) While I understand what you are saying, we also want/need to
>> have
>> > a
>> > > > > way in the PAPI to allow users adding "internal/private"
>> > non-queryable
>> > > > > stores to a topology. That's possible via
>> > > > > `Materialized#withQueryingDisabled()`. We could also update
>> > > > > `Topology#addStateStore(StoreBuilder, boolean isQueryable,
>> > String...)`
>> > > > > to address this. Again, I agree with Bill that the current API is
>> > built
>> > > > > in a certain way, and if we want to change it, it should be a
>> > separate
>> > > > > KIP, as it seems to be an orthogonal concern.
>> > > > >
>> > > > > > Instead, we just restrict KIP-307 to NOT
>> > > > > > use the Joined.name for state store names and always use
>> internal
>> > > names
>> > > > > as
>> > > > > > well, which admittedly indeed leaves a hole of not being able to
>> > > cover
>> > > > > all
>> > > > > > internal names here
>> > > > >
>> > > > > I think it's important to close this gap. Naming entities seems
>> to a
>> > > > > binary feature: if there is a gap, the feature is more or less
>> > useless,
>> > > > > rendering KIP-307 void.
>> > > > >
>> > > > >
>> > > > >
>> > > > > I like John's detailed list of required features and what
>> > > > > Materialized/WindowByteStoreSuppliers offers. My take is, that
>> adding
>> > > > > Materialized including the required run-time checks is the best
>> > option
>> > > > > we have, for the following reasons:
>> > > > >
>> > > > >  - the main purpose of this KIP is to close the naming gap what we
>> > > achieve
>> > > > >  - we can allow people to use the new in-memory store
>> > > > >  - we allow people to enable/disable caching
>> > > > >  - we unify the API
>> > > > >  - we decouple querying from naming
>> > > > >  - it's a small API change
>> > > > >
>> > > > > Adding an overload and only passing in a name, would address the
>> main
>> > > > > purpose of the KIP. However, it falls short on all the other
>> > "goodies".
>> > > > > As you mentioned, passing in `Materialized` might not be perfect
>> and
>> > > > > maybe we need to deprecate is at some point; but this is also true
>> > for
>> > > > > passing in just a name.
>> > > > >
>> > > > > I am also not convinced, that a `StreamJoinStore` would resolve
>> all
>> > the
>> > > > > issues. In the end, as long as we are using a `WindowedStore`
>> > > > > internally, we need to expose this "implemenation detail" to
>> users to
>> > > > > allow them to plug in a custom store. Adding `Materialized` seem
>> to
>> > be
>> > > > > the best short-term fix from my point of view.
>> > > > >
>> > > > >
>> > > > > -Matthias
>> > > > >
>> > > > >
>> > > > > On 6/27/19 9:56 AM, Guozhang Wang wrote:
>> > > > > > Hi John,
>> > > > > >
>> > > > > > I actually feels better about a new interface but I'm not sure
>> if
>> > we
>> > > > > would
>> > > > > > need the full configuration of store / log / cache, now or in
>> the
>> > > future
>> > > > > > ever for stream-stream join.
>> > > > > >
>> > > > > > Right now I feel that 1) we want to improve our implementation
>> of
>> > > > > > stream-stream join, and potentially also allow users to
>> customize
>> > > this
>> > > > > > implementation but with a more suitable interface than the
>> current
>> > > > > > WindowStore interface, how to do that is less clear and
>> > > execution-wise
>> > > > > it's
>> > > > > > (arguably..) not urgent; 2) we want to close the last gap
>> > > (Stream-stream
>> > > > > > join) of allowing users to specify all internal names to help on
>> > > backward
>> > > > > > compatibility, which is urgent.
>> > > > > >
>> > > > > > Therefore if we want to unblock 2) from 1) in the near term, I
>> feel
>> > > > > > slightly inclined to just add overload functions that takes in a
>> > > store
>> > > > > name
>> > > > > > for stream-stream joins only -- and admittedly, in the future
>> this
>> > > > > function
>> > > > > > maybe deprecated -- i.e. if we have to do something that we "may
>> > > regret"
>> > > > > in
>> > > > > > the future, I'd like to pick the least intrusive option.
>> > > > > >
>> > > > > > About `Joined#withStoreName`: since the Joined class itself is
>> also
>> > > used
>> > > > > in
>> > > > > > other join types, I feel less comfortable to have a
>> > > > > `Joined#withStoreName`
>> > > > > > which is only going to be used by stream-stream join. Or maybe I
>> > miss
>> > > > > > something here about the "latter" case that you are referring
>> to?
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > > On Mon, Jun 24, 2019 at 12:16 PM John Roesler <
>> john@confluent.io>
>> > > wrote:
>> > > > > >
>> > > > > >> Thanks Guozhang,
>> > > > > >>
>> > > > > >> Yep. Maybe we can consider just exactly what the join needs:
>> > > > > >>
>> > > > > >>> the WindowStore<Bytes, byte[]> itself is fine, if overly
>> broad,
>> > > > > >>> since the only two methods we need are `window.put(key, value,
>> > > > > >>> context().timestamp())` and `WindowStoreIterator<V2> iter =
>> > > > > >>> window.fetch(key, timeFrom, timeTo)`.
>> > > > > >>
>> > > > > >> One "middle ground" would be to extract _this_ into a new store
>> > > > > >> interface, which only supports these API calls, like
>> > > > > >> StreamJoinStore<K, V>. This would give us the latitude we need
>> to
>> > > > > >> efficiently support the exact operation without concerning
>> > ourselves
>> > > > > >> with all the other things a WindowStore can do (which are
>> > > unreachable
>> > > > > >> for the join use case). It would also let us drop "store
>> > duplicates"
>> > > > > >> from the main WindowStore interface, since it only exists to
>> > support
>> > > > > >> the join use case.
>> > > > > >>
>> > > > > >> If we were to add a new StreamJoinStore interface, then it'd be
>> > > > > >> straightforward how we could add also
>> > > > > >> `Materialized.as(StreamJoinBytesStoreSupplier)` and use
>> > > Materialized,
>> > > > > >> or alternatively add the ability to set the bytes store on
>> Joined.
>> > > > > >>
>> > > > > >> Personally, I'm kind of leaning toward the latter (and also
>> doing
>> > > > > >> `Joined#withStoreName`), since adding the new interface to
>> > > > > >> Materialized then also pollutes the interface for its _actual_
>> use
>> > > > > >> case of materializing a table view. Of course, to solve the
>> > > immediate
>> > > > > >> problem, all we need is the store name, but we might feel
>> better
>> > > about
>> > > > > >> adding the store name to Joined if we _also_ feel like in the
>> > > future,
>> > > > > >> we would add store/log/cache configuration to Joined as well.
>> > > > > >>
>> > > > > >> -John
>> > > > > >>
>> > > > > >> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <
>> > wangguoz@gmail.com>
>> > > > > wrote:
>> > > > > >>>
>> > > > > >>> Hello John,
>> > > > > >>>
>> > > > > >>> My main concern is exactly the first point at the bottom of
>> your
>> > > > > analysis
>> > > > > >>> here: "* configure the bytes store". I'm not sure if using a
>> > window
>> > > > > bytes
>> > > > > >>> store would be ideal for stream-stream windowed join; e.g. we
>> > could
>> > > > > >>> consider two dimensional list sorted by timestamps and then by
>> > > keys to
>> > > > > do
>> > > > > >>> the join, whereas a windowed bytes store is basically sorted
>> by
>> > key
>> > > > > >> first,
>> > > > > >>> then by timestamp. If we expose the Materialized to let user
>> pass
>> > > in a
>> > > > > >>> windowed bytes store, then we would need to change that if we
>> > want
>> > > to
>> > > > > >>> replace it with a different implementation interface.
>> > > > > >>>
>> > > > > >>>
>> > > > > >>> Guozhang
>> > > > > >>>
>> > > > > >>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <
>> john@confluent.io>
>> > > > > wrote:
>> > > > > >>>
>> > > > > >>>> Hey Guozhang and Bill,
>> > > > > >>>>
>> > > > > >>>> For what it's worth, I agree with you both!
>> > > > > >>>>
>> > > > > >>>> I think it might help the discussion to look concretely at
>> what
>> > > > > >>>> Materialized does:
>> > > > > >>>> * set a WindowBytesStoreSupplier
>> > > > > >>>> * set a name
>> > > > > >>>> * set the key/value serdes
>> > > > > >>>> * disable/enable/configure change-logging
>> > > > > >>>> * disable/enable caching
>> > > > > >>>> * configure retention
>> > > > > >>>>
>> > > > > >>>> Further, looking into the WindowBytesStoreSupplier, the
>> > interface
>> > > lets
>> > > > > >> you:
>> > > > > >>>> * get the segment interval
>> > > > > >>>> * get the window size
>> > > > > >>>> * get whether "duplicates" are enabled
>> > > > > >>>> * get the retention period
>> > > > > >>>> * (obviously) get a WindowStore<Bytes, byte[]>
>> > > > > >>>>
>> > > > > >>>> We know that Materialized isn't exactly what we need for
>> stream
>> > > joins,
>> > > > > >>>> but we can see how close Materialized is to what we need. If
>> it
>> > is
>> > > > > >>>> close, maybe we can use it and document the gaps, and if it
>> is
>> > not
>> > > > > >>>> close, then maybe we should just add what we need to Joined.
>> > > > > >>>> Stream Join's requirements for its stores:
>> > > > > >>>> * a multimap store (i.e., it keeps duplicates) for storing
>> > general
>> > > > > >>>> (not windowed) keyed records associated with their insertion
>> > > time, and
>> > > > > >>>> allows efficient time-bounded lookups and also efficient
>> purges
>> > > of old
>> > > > > >>>> data.
>> > > > > >>>> ** Note, a properly configured WindowBytesStoreSupplier
>> > satisfies
>> > > this
>> > > > > >>>> requirement, and the interface supports the queries we need
>> to
>> > > verify
>> > > > > >>>> the configuration at run-time
>> > > > > >>>> * set a name for the store
>> > > > > >>>> * do _not_ set the serdes (they are already set in Joined)
>> > > > > >>>> * logging could be configurable (set to enabled now)
>> > > > > >>>> * caching could be configurable (set to enabled now)
>> > > > > >>>> * do _not_ configure retention (determined by JoinWindows)
>> > > > > >>>>
>> > > > > >>>> So, out of six capabilities for Materialized, there are two
>> we
>> > > don't
>> > > > > >>>> want (serdes and retention). These would become run-time
>> checks
>> > > if we
>> > > > > >>>> use it.
>> > > > > >>>>
>> > > > > >>>> A third questionable capability is to provide a
>> > > > > >>>> WindowBytesStoreSupplier. Looking at whether the
>> > > > > >>>> WindowBytesStoreSupplier is the right interface for Stream
>> Join:
>> > > > > >>>> * configuring segment interval is fine
>> > > > > >>>> * should _not_ configure window size (it's determined by
>> > > JoinWindows)
>> > > > > >>>> * duplicates _must_ be enabled
>> > > > > >>>> * retention should be _at least_ windowSize + gracePeriod,
>> but
>> > > note
>> > > > > >>>> that (unlike for Table window stores) there is no utility in
>> > > having a
>> > > > > >>>> longer retention time.
>> > > > > >>>> * the WindowStore<Bytes, byte[]> itself is fine, if overly
>> > broad,
>> > > > > >>>> since the only two methods we need are `window.put(key,
>> value,
>> > > > > >>>> context().timestamp())` and `WindowStoreIterator<V2> iter =
>> > > > > >>>> window.fetch(key, timeFrom, timeTo)`.
>> > > > > >>>>
>> > > > > >>>> Thus, flattening out the overlap for WindowBytesStoreSupplier
>> > > onto the
>> > > > > >>>> overlap for Materialized, we have 9 capabilities total (note
>> > > retention
>> > > > > >>>> is duplicated), we have 4 that we don't want:
>> > > > > >>>> * do _not_ set the serdes (they are already set in Joined)
>> > > > > >>>> * do _not_ configure retention (determined by JoinWindows)
>> > > > > >>>> * should _not_ configure window size (it's determined by
>> > > JoinWindows)
>> > > > > >>>> * duplicates _must_ be enabled
>> > > > > >>>>
>> > > > > >>>> These gaps would have to be covered with run-time checks if
>> we
>> > > re-use
>> > > > > >>>> Materialized and WindowStoreBytesStoreSupplier both. Maybe
>> this
>> > > sounds
>> > > > > >>>> bad, but consider the other side, that we get 5 new
>> capabilities
>> > > we
>> > > > > >>>> don't require, but are still pretty nice:
>> > > > > >>>> * configure the bytes store
>> > > > > >>>> * set a name for the store
>> > > > > >>>> * configure caching
>> > > > > >>>> * configure logging
>> > > > > >>>> * configure segment interval
>> > > > > >>>>
>> > > > > >>>> Not sure where this nets us out, but it's food for thought.
>> > > > > >>>> -John
>> > > > > >>>>
>> > > > > >>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <
>> > wangguoz@gmail.com
>> > > >
>> > > > > >> wrote:
>> > > > > >>>>>
>> > > > > >>>>> Hi Bill,
>> > > > > >>>>>
>> > > > > >>>>> I think by giving a Materialized param into stream-stream
>> join,
>> > > it's
>> > > > > >> okay
>> > > > > >>>>> (though still ideal) to say "we still would not expose the
>> > store
>> > > for
>> > > > > >>>>> queries", but it would sound a bit awkward to say "we would
>> > also
>> > > > > >> ignore
>> > > > > >>>>> whatever the passed in store supplier but just use our
>> default
>> > > ones"
>> > > > > >> --
>> > > > > >>>>> again the concern is that, if in the future we'd want to
>> change
>> > > the
>> > > > > >>>> default
>> > > > > >>>>> implementation of join algorithm which no longer rely on a
>> > window
>> > > > > >> store
>> > > > > >>>>> with deduping enabled, then we need to change this API
>> again by
>> > > > > >> changing
>> > > > > >>>>> the store supplier type.
>> > > > > >>>>>
>> > > > > >>>>> If we do want to fill this hole for stream-stream join, I
>> feel
>> > > just
>> > > > > >>>> adding
>> > > > > >>>>> a String typed store-name would even be less
>> future-intrusive
>> > if
>> > > we
>> > > > > >>>> expect
>> > > > > >>>>> this parameter to be modified later.
>> > > > > >>>>>
>> > > > > >>>>> Does that makes sense?
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>> Guozhang
>> > > > > >>>>>
>> > > > > >>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <
>> > bbejeck@gmail.com>
>> > > > > >> wrote:
>> > > > > >>>>>
>> > > > > >>>>>> Thanks for the comments John and Guozhang, I'll address
>> each
>> > > one of
>> > > > > >>>> your
>> > > > > >>>>>> comments in turn.
>> > > > > >>>>>>
>> > > > > >>>>>> John,
>> > > > > >>>>>>
>> > > > > >>>>>>> I'm wondering about a missing quadrant from the truth
>> table
>> > > > > >> involving
>> > > > > >>>>>>> whether a Materialized is stored or not and querying is
>> > > > > >>>>>>> enabled/disabled... What should be the behavior if there
>> is
>> > no
>> > > > > >> store
>> > > > > >>>>>>> configured (e.g., if Materialized with only serdes) and
>> > > querying
>> > > > > >> is
>> > > > > >>>>>> enabled?
>> > > > > >>>>>>
>> > > > > >>>>>>> It seems we have two choices:
>> > > > > >>>>>>> 1. we can force creation of a state store in this case, so
>> > the
>> > > > > >> store
>> > > > > >>>>>>> can be used to serve the queries
>> > > > > >>>>>>> 2. we can provide just a queriable view, basically
>> letting IQ
>> > > > > >> query
>> > > > > >>>>>>> into the "KTableValueGetter", which would transparently
>> > > > > >> construct the
>> > > > > >>>>>>> query response by applying the operator logic to the
>> upstream
>> > > > > >> state
>> > > > > >>>> if
>> > > > > >>>>>>> the operator state isn't already stored.
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>> I agree with your assertion about a missing quadrant from
>> the
>> > > truth
>> > > > > >>>> table.
>> > > > > >>>>>> Additionally, I too like the concept of a queriable view.
>> > But I
>> > > > > >> think
>> > > > > >>>> that
>> > > > > >>>>>> goes a bit beyond the scope of this KIP and would like to
>> > pursue
>> > > > > >> that
>> > > > > >>>>>> feature as follow-on work.  Also thinking about this KIP
>> some
>> > > > > >> more, I'm
>> > > > > >>>>>> thinking of the changes to Materialized might be a reach as
>> > > well.
>> > > > > >>>>>> Separating the naming from a store and its queryable state
>> > seems
>> > > > > >> like a
>> > > > > >>>>>> complex issue in and of itself and should be treated
>> > > accordingly.
>> > > > > >>>>>>
>> > > > > >>>>>> So here's what I'm thinking now.  We add Materialzied to
>> Join,
>> > > but
>> > > > > >> for
>> > > > > >>>> now,
>> > > > > >>>>>> we internally disable querying.  I know this breaks our
>> > current
>> > > > > >>>> semantic
>> > > > > >>>>>> approach, but I think it's crucial that we do two things in
>> > this
>> > > > > >> KIP
>> > > > > >>>>>>
>> > > > > >>>>>>    1. Break the naming of the state stores from Joined to
>> > > > > >>>> Materialized, so
>> > > > > >>>>>>    the naming of state stores follows our current pattern
>> and
>> > > > > >> enables
>> > > > > >>>>>> upgrades
>> > > > > >>>>>>    from 2.3 to 2.4
>> > > > > >>>>>>    2. Offer the ability to configure the state stores of
>> the
>> > > join,
>> > > > > >> even
>> > > > > >>>>>>    providing a different implementation (i.e. in-memory) if
>> > > > > >> desired.
>> > > > > >>>>>>
>> > > > > >>>>>> With that in mind I'm considering changing the KIP to
>> remove
>> > the
>> > > > > >>>> changes to
>> > > > > >>>>>> Materialized, and we document very clearly that by
>> providing a
>> > > > > >>>> Materialized
>> > > > > >>>>>> object with a name is only for naming the state store,
>> hence
>> > the
>> > > > > >>>> changelog
>> > > > > >>>>>> topics and any possible configurations of the store, but
>> this
>> > > store
>> > > > > >>>> *will
>> > > > > >>>>>> not be available for IQ.*
>> > > > > >>>>>>
>> > > > > >>>>>> WDYT?
>> > > > > >>>>>>
>> > > > > >>>>>> Guozhang,
>> > > > > >>>>>>
>> > > > > >>>>>>> 1. About not breaking compatibility of stream-stream join
>> > > > > >>>> materialized
>> > > > > >>>>>>> stores: I think this is a valid issue to tackle, but after
>> > > > > >> thinking
>> > > > > >>>> about
>> > > > > >>>>>>> it once more I'm not sure if exposing Materialized would
>> be a
>> > > > > >> good
>> > > > > >>>>>> solution
>> > > > > >>>>>>> here. My rationles:
>> > > > > >>>>>>>
>> > > > > >>>>>>> 1.a For stream-stream join, our current usage of
>> window-store
>> > > is
>> > > > > >> not
>> > > > > >>>>>> ideal,
>> > > > > >>>>>>> and we want to modify it in the near future to be more
>> > > > > >> efficient. Not
>> > > > > >>>>>>> allowing users to override such state store backend gives
>> us
>> > > such
>> > > > > >>>> freedom
>> > > > > >>>>>>> (which was also considered in the original DSL design),
>> > whereas
>> > > > > >>>> getting a
>> > > > > >>>>>>> Materialized<WindowStore> basically kicks out that freedom
>> > out
>> > > > > >> of the
>> > > > > >>>>>>> window.
>> > > > > >>>>>>> 1.b For strema-stream join, in our original design we
>> intend
>> > to
>> > > > > >>>> "never"
>> > > > > >>>>>>> want users to query the state, since it is just for
>> buffering
>> > > the
>> > > > > >>>>>> upcoming
>> > > > > >>>>>>> records from the stream. Now I know that some users may
>> > indeed
>> > > > > >> want
>> > > > > >>>> to
>> > > > > >>>>>>> query it from the debugging perspective, but still I
>> > concerned
>> > > > > >> about
>> > > > > >>>>>>> whether leveraging IQ for debugging purposes would be the
>> > right
>> > > > > >>>> solution
>> > > > > >>>>>>> here. And adding Materialized object opens the door to let
>> > > users
>> > > > > >>>> query
>> > > > > >>>>>>> about it (unless we did something intentionally to still
>> > > forbids
>> > > > > >> it),
>> > > > > >>>>>> which
>> > > > > >>>>>>> also restricts us in the future.
>> > > > > >>>>>>>
>> > > > > >>>>>>> 2. About the coupling between Materialized.name() and
>> > > queryable:
>> > > > > >>>> again I
>> > > > > >>>>>>> think this is a valid issue. But I'm not sure if the
>> current
>> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the
>> best
>> > > > > >>>> approach.
>> > > > > >>>>>>> Here I think I agree with John, that generally speaking
>> it's
>> > > > > >> better
>> > > > > >>>> be a
>> > > > > >>>>>>> control function on the `KTable` itself, rather than on
>> > > > > >>>> `Materialized`,
>> > > > > >>>>>> so
>> > > > > >>>>>>> fixing it via adding functions through `Materialized`
>> seems
>> > > not a
>> > > > > >>>> natural
>> > > > > >>>>>> approach either.
>> > > > > >>>>>>
>> > > > > >>>>>> I understand your thoughts here, and up to a point, I agree
>> > with
>> > > > > >> you.
>> > > > > >>>>>> But concerning not providing Materialized as it may
>> restrict
>> > us
>> > > in
>> > > > > >> the
>> > > > > >>>>>> future for delivering different implementations, I'm
>> wondering
>> > > if
>> > > > > >> we
>> > > > > >>>> are
>> > > > > >>>>>> doing some premature optimization here.
>> > > > > >>>>>> My rationale for saying so
>> > > > > >>>>>>
>> > > > > >>>>>>    1. I think the cost of not allowing the naming of state
>> > > stores
>> > > > > >> for
>> > > > > >>>> joins
>> > > > > >>>>>>    is too big of a gap to leave.   IMHO for joins to follow
>> > the
>> > > > > >> current
>> > > > > >>>>>>    pattern of using Materialized for naming state stores
>> would
>> > > be
>> > > > > >> what
>> > > > > >>>> most
>> > > > > >>>>>>    users would expect to use.  As I said in my comments
>> > above, I
>> > > > > >> think
>> > > > > >>>> we
>> > > > > >>>>>>    should *not include* the changes to Materialized and
>> > enforce
>> > > > > >> named
>> > > > > >>>>>>    stores for joins as unavailable for IQ.
>> > > > > >>>>>>    2. We'll still have the join methods available without a
>> > > > > >>>> Materialized
>> > > > > >>>>>>    allowing us to do something different internally if a
>> > > > > >> Materialized
>> > > > > >>>> is
>> > > > > >>>>>> not
>> > > > > >>>>>>    provided.
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>>> Overall, I'm thinking maybe we should still use two stones
>> > > rather
>> > > > > >>>> than
>> > > > > >>>>>> one
>> > > > > >>>>>>> to kill these two birds, and probably for this KIP we just
>> > > focus
>> > > > > >> on
>> > > > > >>>> 1)
>> > > > > >>>>>>> above. And for that I'd like to not expose the
>> Materialized
>> > > > > >> either
>> > > > > >>>> for
>> > > > > >>>>>>> rationales that I've listed above. Instead, we just
>> restrict
>> > > > > >> KIP-307
>> > > > > >>>> to
>> > > > > >>>>>> NOT
>> > > > > >>>>>>> use the Joined.name for state store names and always use
>> > > internal
>> > > > > >>>> names
>> > > > > >>>>>> as
>> > > > > >>>>>>> well, which admittedly indeed leaves a hole of not being
>> able
>> > > to
>> > > > > >>>> cover
>> > > > > >>>>>> all
>> > > > > >>>>>>> internal names here, but now I feel this `hole` may
>> better be
>> > > > > >> filled
>> > > > > >>>> by,
>> > > > > >>>>>>> e.g. not creating changelog topics but just use the
>> upstream
>> > to
>> > > > > >>>>>>> re-bootstrap the materialized store, more concretely: when
>> > > > > >>>> materializing
>> > > > > >>>>>>> the store, try to piggy-back the changelog topic on an
>> > existing
>> > > > > >>>> topic,
>> > > > > >>>>>> e.g.
>> > > > > >>>>>>> a) if the stream is coming directly from some source topic
>> > > > > >> (including
>> > > > > >>>>>>> repartition topic), make that as changelog topic and if
>> it is
>> > > > > >>>> repartition
>> > > > > >>>>>>> topic change the retention / data purging policy
>> necessarily
>> > as
>> > > > > >>>> well; b)
>> > > > > >>>>>> if
>> > > > > >>>>>>> the stream is coming from some stateless operators,
>> delegate
>> > > that
>> > > > > >>>>>> stateless
>> > > > > >>>>>>> operator to the parent stream similar as a); if the
>> stream is
>> > > > > >> coming
>> > > > > >>>> from
>> > > > > >>>>>> a
>> > > > > >>>>>>> stream-stream join which is the only stateful operator
>> that
>> > can
>> > > > > >>>> result in
>> > > > > >>>>>> a
>> > > > > >>>>>>> stream, consider merging the join into multi-way joins
>> (yes,
>> > > > > >> this is
>> > > > > >>>> a
>> > > > > >>>>>> very
>> > > > > >>>>>>> hand-wavy thought, but the point here is that we do not
>> try
>> > to
>> > > > > >>>> tackle it
>> > > > > >>>>>>> now but leave it for a better solution :).
>> > > > > >>>>>>
>> > > > > >>>>>> I really like this idea!  I agree with you in that this
>> > approach
>> > > > > >> to too
>> > > > > >>>>>> much for adding in this KIP, but we could pick it up later
>> and
>> > > > > >>>> leverage the
>> > > > > >>>>>> Optimization framework to accomplish this re-use.
>> > > > > >>>>>> Again, while I agree we should break the naming of join
>> state
>> > > > > >> stores
>> > > > > >>>> from
>> > > > > >>>>>> KIP-307, IMHO it's something we should fix now as it will
>> be
>> > the
>> > > > > >> last
>> > > > > >>>> piece
>> > > > > >>>>>> we can provide to give users the ability to completely make
>> > > their
>> > > > > >>>>>> topologies "upgrade proof" when adding additional
>> operations.
>> > > > > >>>>>>
>> > > > > >>>>>> Thanks again to both of you for comments and I look
>> forward to
>> > > > > >> hearing
>> > > > > >>>> back
>> > > > > >>>>>> from you.
>> > > > > >>>>>>
>> > > > > >>>>>> Regards,
>> > > > > >>>>>> Bill
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <
>> > > wangguoz@gmail.com>
>> > > > > >>>> wrote:
>> > > > > >>>>>>
>> > > > > >>>>>>> Hello Bill,
>> > > > > >>>>>>>
>> > > > > >>>>>>> Thanks for the KIP. Glad to see that we can likely
>> shooting
>> > two
>> > > > > >> birds
>> > > > > >>>>>> with
>> > > > > >>>>>>> one stone. I have some concerns though about those "two
>> > birds"
>> > > > > >>>>>> themselves:
>> > > > > >>>>>>>
>> > > > > >>>>>>> 1. About not breaking compatibility of stream-stream join
>> > > > > >>>> materialized
>> > > > > >>>>>>> stores: I think this is a valid issue to tackle, but after
>> > > > > >> thinking
>> > > > > >>>> about
>> > > > > >>>>>>> it once more I'm not sure if exposing Materialized would
>> be a
>> > > > > >> good
>> > > > > >>>>>> solution
>> > > > > >>>>>>> here. My rationles:
>> > > > > >>>>>>>
>> > > > > >>>>>>> 1.a For stream-stream join, our current usage of
>> window-store
>> > > is
>> > > > > >> not
>> > > > > >>>>>> ideal,
>> > > > > >>>>>>> and we want to modify it in the near future to be more
>> > > > > >> efficient. Not
>> > > > > >>>>>>> allowing users to override such state store backend gives
>> us
>> > > such
>> > > > > >>>> freedom
>> > > > > >>>>>>> (which was also considered in the original DSL design),
>> > whereas
>> > > > > >>>> getting a
>> > > > > >>>>>>> Materialized<WindowStore> basically kicks out that freedom
>> > out
>> > > > > >> of the
>> > > > > >>>>>>> window.
>> > > > > >>>>>>> 1.b For strema-stream join, in our original design we
>> intend
>> > to
>> > > > > >>>> "never"
>> > > > > >>>>>>> want users to query the state, since it is just for
>> buffering
>> > > the
>> > > > > >>>>>> upcoming
>> > > > > >>>>>>> records from the stream. Now I know that some users may
>> > indeed
>> > > > > >> want
>> > > > > >>>> to
>> > > > > >>>>>>> query it from the debugging perspective, but still I
>> > concerned
>> > > > > >> about
>> > > > > >>>>>>> whether leveraging IQ for debugging purposes would be the
>> > right
>> > > > > >>>> solution
>> > > > > >>>>>>> here. And adding Materialized object opens the door to let
>> > > users
>> > > > > >>>> query
>> > > > > >>>>>>> about it (unless we did something intentionally to still
>> > > forbids
>> > > > > >> it),
>> > > > > >>>>>> which
>> > > > > >>>>>>> also restricts us in the future.
>> > > > > >>>>>>>
>> > > > > >>>>>>> 2. About the coupling between Materialized.name() and
>> > > queryable:
>> > > > > >>>> again I
>> > > > > >>>>>>> think this is a valid issue. But I'm not sure if the
>> current
>> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the
>> best
>> > > > > >>>> approach.
>> > > > > >>>>>>> Here I think I agree with John, that generally speaking
>> it's
>> > > > > >> better
>> > > > > >>>> be a
>> > > > > >>>>>>> control function on the `KTable` itself, rather than on
>> > > > > >>>> `Materialized`,
>> > > > > >>>>>> so
>> > > > > >>>>>>> fixing it via adding functions through `Materialized`
>> seems
>> > > not a
>> > > > > >>>> natural
>> > > > > >>>>>>> approach either.
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> Overall, I'm thinking maybe we should still use two stones
>> > > rather
>> > > > > >>>> than
>> > > > > >>>>>> one
>> > > > > >>>>>>> to kill these two birds, and probably for this KIP we just
>> > > focus
>> > > > > >> on
>> > > > > >>>> 1)
>> > > > > >>>>>>> above. And for that I'd like to not expose the
>> Materialized
>> > > > > >> either
>> > > > > >>>> for
>> > > > > >>>>>>> rationales that I've listed above. Instead, we just
>> restrict
>> > > > > >> KIP-307
>> > > > > >>>> to
>> > > > > >>>>>> NOT
>> > > > > >>>>>>> use the Joined.name for state store names and always use
>> > > internal
>> > > > > >>>> names
>> > > > > >>>>>> as
>> > > > > >>>>>>> well, which admittedly indeed leaves a hole of not being
>> able
>> > > to
>> > > > > >>>> cover
>> > > > > >>>>>> all
>> > > > > >>>>>>> internal names here, but now I feel this `hole` may
>> better be
>> > > > > >> filled
>> > > > > >>>> by,
>> > > > > >>>>>>> e.g. not creating changelog topics but just use the
>> upstream
>> > to
>> > > > > >>>>>>> re-bootstrap the materialized store, more concretely: when
>> > > > > >>>> materializing
>> > > > > >>>>>>> the store, try to piggy-back the changelog topic on an
>> > existing
>> > > > > >>>> topic,
>> > > > > >>>>>> e.g.
>> > > > > >>>>>>> a) if the stream is coming directly from some source topic
>> > > > > >> (including
>> > > > > >>>>>>> repartition topic), make that as changelog topic and if
>> it is
>> > > > > >>>> repartition
>> > > > > >>>>>>> topic change the retention / data purging policy
>> necessarily
>> > as
>> > > > > >>>> well; b)
>> > > > > >>>>>> if
>> > > > > >>>>>>> the stream is coming from some stateless operators,
>> delegate
>> > > that
>> > > > > >>>>>> stateless
>> > > > > >>>>>>> operator to the parent stream similar as a); if the
>> stream is
>> > > > > >> coming
>> > > > > >>>>>> from a
>> > > > > >>>>>>> stream-stream join which is the only stateful operator
>> that
>> > can
>> > > > > >>>> result
>> > > > > >>>>>> in a
>> > > > > >>>>>>> stream, consider merging the join into multi-way joins
>> (yes,
>> > > > > >> this is
>> > > > > >>>> a
>> > > > > >>>>>> very
>> > > > > >>>>>>> hand-wavy thought, but the point here is that we do not
>> try
>> > to
>> > > > > >>>> tackle it
>> > > > > >>>>>>> now but leave it for a better solution :).
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> Guozhang
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler <
>> > > john@confluent.io
>> > > > > >>>
>> > > > > >>>> wrote:
>> > > > > >>>>>>>
>> > > > > >>>>>>>> Hi Bill,
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> Thanks for the KIP! Awesome job catching this unexpected
>> > > > > >>>> consequence
>> > > > > >>>>>>>> of the prior KIPs before it was released.
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> The proposal looks good to me. On top of just fixing the
>> > > > > >> problem,
>> > > > > >>>> it
>> > > > > >>>>>>>> seems to address two other pain points:
>> > > > > >>>>>>>> * that naming a state store automatically causes it to
>> > become
>> > > > > >>>>>> queriable.
>> > > > > >>>>>>>> * that there's currently no way to configure the bytes
>> store
>> > > > > >> for
>> > > > > >>>> join
>> > > > > >>>>>>>> windows.
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> It's awesome that we can fix this issue and two others
>> with
>> > > one
>> > > > > >>>>>> feature.
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> I'm wondering about a missing quadrant from the truth
>> table
>> > > > > >>>> involving
>> > > > > >>>>>>>> whether a Materialized is stored or not and querying is
>> > > > > >>>>>>>> enabled/disabled... What should be the behavior if there
>> is
>> > no
>> > > > > >>>> store
>> > > > > >>>>>>>> configured (e.g., if Materialized with only serdes) and
>> > > > > >> querying is
>> > > > > >>>>>>>> enabled?
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> It seems we have two choices:
>> > > > > >>>>>>>> 1. we can force creation of a state store in this case,
>> so
>> > the
>> > > > > >>>> store
>> > > > > >>>>>>>> can be used to serve the queries
>> > > > > >>>>>>>> 2. we can provide just a queriable view, basically
>> letting
>> > IQ
>> > > > > >> query
>> > > > > >>>>>>>> into the "KTableValueGetter", which would transparently
>> > > > > >> construct
>> > > > > >>>> the
>> > > > > >>>>>>>> query response by applying the operator logic to the
>> > upstream
>> > > > > >>>> state if
>> > > > > >>>>>>>> the operator state isn't already stored.
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> Offhand, it seems like the second is actually a pretty
>> > awesome
>> > > > > >>>>>>>> capability. But it might have an awkward interaction with
>> > the
>> > > > > >>>> current
>> > > > > >>>>>>>> semantics. Presently, if I provide a
>> Materialized.withName,
>> > it
>> > > > > >>>> implies
>> > > > > >>>>>>>> that querying should be enabled AND that the view should
>> > > > > >> actually
>> > > > > >>>> be
>> > > > > >>>>>>>> stored in a state store. Under option 2 above, this
>> behavior
>> > > > > >> would
>> > > > > >>>>>>>> change to NOT provision a state store and instead just
>> > consult
>> > > > > >> the
>> > > > > >>>>>>>> ValueGetter. To get back to the current behavior, users
>> > would
>> > > > > >> have
>> > > > > >>>> to
>> > > > > >>>>>>>> add a "bytes store supplier" to the Materialized to
>> indicate
>> > > > > >> that,
>> > > > > >>>>>>>> yes, they really want a state store there.
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> Behavior changes are always kind of scary, but I think in
>> > this
>> > > > > >>>> case,
>> > > > > >>>>>>>> it might actually be preferable. In the event where only
>> the
>> > > > > >> name
>> > > > > >>>> is
>> > > > > >>>>>>>> provided, it means that people just wanted to make the
>> > > > > >> operation
>> > > > > >>>>>>>> result queriable. If we automatically convert this to a
>> > > > > >> non-stored
>> > > > > >>>>>>>> view, then simply upgrading results in the same
>> observable
>> > > > > >> behavior
>> > > > > >>>>>>>> and semantics, but a linear reduction in local storage
>> > > > > >> requirements
>> > > > > >>>>>>>> and disk i/o, as well as a corresponding linear
>> reduction in
>> > > > > >> memory
>> > > > > >>>>>>>> usage both on and off heap.
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> What do you think?
>> > > > > >>>>>>>> -John
>> > > > > >>>>>>>>
>> > > > > >>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <
>> > > bbejeck@gmail.com
>> > > > > >>>
>> > > > > >>>> wrote:
>> > > > > >>>>>>>>>
>> > > > > >>>>>>>>> All,
>> > > > > >>>>>>>>>
>> > > > > >>>>>>>>> I'd like to start a discussion for adding a Materialized
>> > > > > >>>>>> configuration
>> > > > > >>>>>>>>> object to KStream.join for naming state stores involved
>> in
>> > > > > >> joins.
>> > > > > >>>>>>>>>
>> > > > > >>>>>>>>>
>> > > > > >>>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>
>> > > > > >>>>
>> > > > > >>
>> > > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
>> > > > > >>>>>>>>>
>> > > > > >>>>>>>>> Your comments and suggestions are welcome.
>> > > > > >>>>>>>>>
>> > > > > >>>>>>>>> Thanks,
>> > > > > >>>>>>>>> Bill
>> > > > > >>>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>>
>> > > > > >>>>>>> --
>> > > > > >>>>>>> -- Guozhang
>> > > > > >>>>>>>
>> > > > > >>>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>> --
>> > > > > >>>>> -- Guozhang
>> > > > > >>>>
>> > > > > >>>
>> > > > > >>>
>> > > > > >>> --
>> > > > > >>> -- Guozhang
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message