kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aishwarya kumar <ash26...@gmail.com>
Subject Re: [DISCUSS] KIP-479: Add Materialized to Join
Date Tue, 17 Sep 2019 23:52:06 GMT
Hi Bill,

Thanks for clarifying, and the KIP looks great!!

Best regards,
Aishwarya

On Tue, Sep 17, 2019, 6:16 PM Bill Bejeck <bbejeck@gmail.com> wrote:

> Hi Aishwarya,
>
> On Tue, Sep 17, 2019 at 1:41 PM aishwarya kumar <ash26389@gmail.com>
> wrote:
>
> > Will this be applicable to Kstream-Ktable joins as well? Or do users
> always
> > materialize these joins explicitly?
> >
>
> No, this change applies to KStream-KStream joins only.  With KStream-KTable
> joins KafkaStreams has materialized the KTable already, and we don't need
> to do anything with the KStream instance in this case.
>
>
> > I'm not sure if its even necessary (or makes sense), just trying to
> > understand why the change is applicable to Kstream joins only?
> >
>
> The full details are in the KIP, but in a nutshell, we needed to break the
> naming of the StateStore from `Joined.withName` and provide users a way to
> name the StateStore explicitly.  While making the changes, we realized it
> would be beneficial to give users the ability to use different WindowStore
> implementations.  The most likely reason to use different WindowStores
> would be to enable in-memory joins.
>
>
> > Best,
> > Aishwarya
> >
>
> Regards,
> Bill
>
>
> >
> > On Tue, Sep 17, 2019 at 4:05 PM Bill Bejeck <bbejeck@gmail.com> wrote:
> >
> > > Guozhang,
> > >
> > > Thanks for the comments.
> > >
> > > Yes, you are correct in your assessment regarding names, *if* the users
> > > provide their own StoreSuppliers  When constructing as StoreSupplier,
> the
> > > name can't be null, and additionally, there is no way to update the
> name.
> > > Further downstream, the underlying StateStore instances use the
> provided
> > > name for registration so they must be unique.  If users don't provide
> > > distinct names for the StoreSuppliers, KafkaStreams will thow a
> > > StreamsException when building the topology.
> > >
> > > Thanks,
> > > Bill
> > >
> > >
> > >
> > > On Tue, Sep 17, 2019 at 9:39 AM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Hello Bill,
> > > >
> > > > Thanks for the updated KIP. I made a pass on the StreamJoined
> section.
> > > Just
> > > > a quick question from user's perspective: if a user wants to provide
> a
> > > > customized store-supplier, she is forced to also provide a name via
> the
> > > > store-supplier. So there's no way to say "I want to provide my own
> > store
> > > > engine but let the library decide its name", is that right?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbejeck@gmail.com>
> wrote:
> > > >
> > > > > Bumping this discussion as we need to re-vote before the KIP
> > deadline.
> > > > >
> > > > > On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbejeck@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > While working on the implementation of KIP-479, some issues came
> to
> > > > light
> > > > > > that the KIP as written won't work.  I have updated the KIP with
> a
> > > > > solution
> > > > > > I believe will solve the original problem as well as address the
> > > > > > impediment to the initial approach.
> > > > > >
> > > > > > This update is a significant change, so please review the updated
> > KIP
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> > > > > and
> > > > > > provide feedback.  After we conclude the discussion, there will
> be
> > a
> > > > > > re-vote.
> > > > > >
> > > > > > Thanks!
> > > > > > Bill
> > > > > >
> > > > > > On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hi Bill, thanks for your explanations. I'm on board with your
> > > decision
> > > > > >> too.
> > > > > >>
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbejeck@gmail.com
> >
> > > > wrote:
> > > > > >>
> > > > > >> > Thanks for the response, John.
> > > > > >> >
> > > > > >> > > If I can offer my thoughts, it seems better to just document
> > on
> > > > the
> > > > > >> > > Stream join javadoc for the Materialized parameter that it
> > will
> > > > not
> > > > > >> > > make the join result queriable. I'm not opposed to the
> > queriable
> > > > > flag
> > > > > >> > > in general, but introducing it is a much larger
> consideration
> > > that
> > > > > has
> > > > > >> > > previously derailed this KIP discussion. In the interest of
> > just
> > > > > >> > > closing the gap and keeping the API change small, it seems
> > > better
> > > > to
> > > > > >> > > just go with documentation for now.
> > > > > >> >
> > > > > >> > I agree with your statement here.  IMHO the most important
> goal
> > of
> > > > > this
> > > > > >> KIP
> > > > > >> > is to not breaking existing users and gain some consistency of
> > the
> > > > > API.
> > > > > >> >
> > > > > >> > I'll update the KIP accordingly.
> > > > > >> >
> > > > > >> > -Bill
> > > > > >> >
> > > > > >> > On Tue, Jul 16, 2019 at 11:55 AM John Roesler <
> > john@confluent.io>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Hi Bill,
> > > > > >> > >
> > > > > >> > > Thanks for driving this KIP toward a conclusion. I'm on
> board
> > > with
> > > > > >> > > your decision.
> > > > > >> > >
> > > > > >> > > You didn't mention whether you're still proposing to add the
> > > > > >> > > "queriable" flag to the Materialized config object, or just
> > > > document
> > > > > >> > > that a Stream join is never queriable. Both options have
> come
> > up
> > > > > >> > > earlier in the discussion, so it would be good to pin this
> > down.
> > > > > >> > >
> > > > > >> > > If I can offer my thoughts, it seems better to just document
> > on
> > > > the
> > > > > >> > > Stream join javadoc for the Materialized parameter that it
> > will
> > > > not
> > > > > >> > > make the join result queriable. I'm not opposed to the
> > queriable
> > > > > flag
> > > > > >> > > in general, but introducing it is a much larger
> consideration
> > > that
> > > > > has
> > > > > >> > > previously derailed this KIP discussion. In the interest of
> > just
> > > > > >> > > closing the gap and keeping the API change small, it seems
> > > better
> > > > to
> > > > > >> > > just go with documentation for now.
> > > > > >> > >
> > > > > >> > > Thanks again,
> > > > > >> > > -John
> > > > > >> > >
> > > > > >> > > On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <
> > bbejeck@gmail.com>
> > > > > >> wrote:
> > > > > >> > > >
> > > > > >> > > > Thanks all for the great discussion so far.
> > > > > >> > > >
> > > > > >> > > > Everyone has made excellent points, and I appreciate the
> > > detail
> > > > > >> > everyone
> > > > > >> > > > has put into their arguments.
> > > > > >> > > >
> > > > > >> > > > However, after carefully evaluating all the points made so
> > > far,
> > > > > >> > creating
> > > > > >> > > an
> > > > > >> > > > overload with Materialized is still my #1 option.
> > > > > >> > > > My reasoning for saying so is two-fold:
> > > > > >> > > >
> > > > > >> > > >    1. It's a small change, and IMHO since it's consistent
> > with
> > > > our
> > > > > >> > > current
> > > > > >> > > >    API concerning state store usage, the cognitive load on
> > > users
> > > > > >> will
> > > > > >> > be
> > > > > >> > > >    minimal.
> > > > > >> > > >    2. It achieves the most important goal of this KIP,
> > namely
> > > to
> > > > > >> close
> > > > > >> > > the
> > > > > >> > > >    gap of naming state stores independently of the join
> > > operator
> > > > > >> name.
> > > > > >> > > >
> > > > > >> > > > Additionally, I agree with the points made by Matthias
> > earlier
> > > > (I
> > > > > >> > realize
> > > > > >> > > > there is some overlap here).
> > > > > >> > > >
> > > > > >> > > > >  - the main purpose of this KIP is to close the naming
> gap
> > > > what
> > > > > we
> > > > > >> > > achieve
> > > > > >> > > > >  - we can allow people to use the new in-memory store
> > > > > >> > > > >  - we allow people to enable/disable caching
> > > > > >> > > > >  - we unify the API
> > > > > >> > > > >  - we decouple querying from naming
> > > > > >> > > > >  - it's a small API change
> > > > > >> > > >
> > > > > >> > > > Although it's not a perfect solution,  IMHO the positives
> of
> > > > using
> > > > > >> > > > Materialize far outweigh the negatives, and from what
> we've
> > > > > >> discussed
> > > > > >> > so
> > > > > >> > > > far, anything we implement seems to involve an additional
> > > change
> > > > > >> down
> > > > > >> > the
> > > > > >> > > > road.
> > > > > >> > > >
> > > > > >> > > > If others are still strongly opposed to using
> Materialized,
> > my
> > > > > other
> > > > > >> > > > preferences would be
> > > > > >> > > >
> > > > > >> > > >    1. Add a "withStoreName" to Joined.  Although I agree
> > with
> > > > > >> Guozhang
> > > > > >> > > that
> > > > > >> > > >    having a parameter that only applies to one use-case
> > would
> > > be
> > > > > >> > clumsy.
> > > > > >> > > >    2. Add a String overload for naming the store, but this
> > > would
> > > > > be
> > > > > >> my
> > > > > >> > > >    least favorite option as IMHO this seems to be a step
> > > > backward
> > > > > >> from
> > > > > >> > > why we
> > > > > >> > > >    introduced configuration objects in the first place.
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Bill
> > > > > >> > > >
> > > > > >> > > > On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax <
> > > > > >> matthias@confluent.io
> > > > > >> > >
> > > > > >> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Thanks for the KIP Bill!
> > > > > >> > > > >
> > > > > >> > > > > Great discussion to far.
> > > > > >> > > > >
> > > > > >> > > > > About John's idea about querying upstream stores and
> don't
> > > > > >> > materialize
> > > > > >> > > a
> > > > > >> > > > > store: I agree with Bill that this seems to be an
> > orthogonal
> > > > > >> > question,
> > > > > >> > > > > and it might be better to treat it as an independent
> > > > > optimization
> > > > > >> and
> > > > > >> > > > > exclude from this KIP.
> > > > > >> > > > >
> > > > > >> > > > > > What should be the behavior if there is no store
> > > > > >> > > > > > configured (e.g., if Materialized with only serdes)
> and
> > > > > >> querying is
> > > > > >> > > > > > enabled?
> > > > > >> > > > >
> > > > > >> > > > > IMHO, this could be an error case. If one wants to
> query a
> > > > > store,
> > > > > >> > they
> > > > > >> > > > > need to provide a name -- if you don't know the name,
> how
> > > > would
> > > > > >> you
> > > > > >> > > > > actually query the store (even if it would be possible
> to
> > > get
> > > > > the
> > > > > >> > name
> > > > > >> > > > > from the `TopologyDescription`, it seems clumsy).
> > > > > >> > > > >
> > > > > >> > > > > If we don't want to throw an error, materializing seems
> to
> > > be
> > > > > the
> > > > > >> > right
> > > > > >> > > > > option, to exclude "query optimization" from this KIP. I
> > > would
> > > > > be
> > > > > >> ok
> > > > > >> > > > > with this option, even if it's clumsy to get the name
> from
> > > > > >> > > > > `TopologyDescription`; hence, I would prefer to treat it
> > as
> > > an
> > > > > >> error.
> > > > > >> > > > >
> > > > > >> > > > > > To get back to the current behavior, users would have
> to
> > > > > >> > > > > > add a "bytes store supplier" to the Materialized to
> > > indicate
> > > > > >> that,
> > > > > >> > > > > > yes, they really want a state store there.
> > > > > >> > > > >
> > > > > >> > > > > This sound like a quite subtle semantic difference on
> how
> > to
> > > > use
> > > > > >> the
> > > > > >> > > > > API. Might be hard to explain to users. I would prefer
> to
> > > not
> > > > > >> > > introduce it.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > About Guozhang's points:
> > > > > >> > > > >
> > > > > >> > > > > 1a) That is actually a good point. However, I believe we
> > > > cannot
> > > > > >> get
> > > > > >> > > > > around this issue easily, and it seems ok to me, to
> expose
> > > the
> > > > > >> actual
> > > > > >> > > > > store type we are using. (More thoughts later.)
> > > > > >> > > > >
> > > > > >> > > > > 1b) I don't see an issue with allowing users to query
> all
> > > > > stores?
> > > > > >> > What
> > > > > >> > > > > is the rational behind it? What do we gain by not
> allowing
> > > it?
> > > > > >> > > > >
> > > > > >> > > > > 2) While I understand what you are saying, we also
> > want/need
> > > > to
> > > > > >> have
> > > > > >> > a
> > > > > >> > > > > way in the PAPI to allow users adding "internal/private"
> > > > > >> > non-queryable
> > > > > >> > > > > stores to a topology. That's possible via
> > > > > >> > > > > `Materialized#withQueryingDisabled()`. We could also
> > update
> > > > > >> > > > > `Topology#addStateStore(StoreBuilder, boolean
> isQueryable,
> > > > > >> > String...)`
> > > > > >> > > > > to address this. Again, I agree with Bill that the
> current
> > > API
> > > > > is
> > > > > >> > built
> > > > > >> > > > > in a certain way, and if we want to change it, it should
> > be
> > > a
> > > > > >> > separate
> > > > > >> > > > > KIP, as it seems to be an orthogonal concern.
> > > > > >> > > > >
> > > > > >> > > > > > Instead, we just restrict KIP-307 to NOT
> > > > > >> > > > > > use the Joined.name for state store names and always
> use
> > > > > >> internal
> > > > > >> > > names
> > > > > >> > > > > as
> > > > > >> > > > > > well, which admittedly indeed leaves a hole of not
> being
> > > > able
> > > > > to
> > > > > >> > > cover
> > > > > >> > > > > all
> > > > > >> > > > > > internal names here
> > > > > >> > > > >
> > > > > >> > > > > I think it's important to close this gap. Naming
> entities
> > > > seems
> > > > > >> to a
> > > > > >> > > > > binary feature: if there is a gap, the feature is more
> or
> > > less
> > > > > >> > useless,
> > > > > >> > > > > rendering KIP-307 void.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > I like John's detailed list of required features and
> what
> > > > > >> > > > > Materialized/WindowByteStoreSuppliers offers. My take
> is,
> > > that
> > > > > >> adding
> > > > > >> > > > > Materialized including the required run-time checks is
> the
> > > > best
> > > > > >> > option
> > > > > >> > > > > we have, for the following reasons:
> > > > > >> > > > >
> > > > > >> > > > >  - the main purpose of this KIP is to close the naming
> gap
> > > > what
> > > > > we
> > > > > >> > > achieve
> > > > > >> > > > >  - we can allow people to use the new in-memory store
> > > > > >> > > > >  - we allow people to enable/disable caching
> > > > > >> > > > >  - we unify the API
> > > > > >> > > > >  - we decouple querying from naming
> > > > > >> > > > >  - it's a small API change
> > > > > >> > > > >
> > > > > >> > > > > Adding an overload and only passing in a name, would
> > address
> > > > the
> > > > > >> main
> > > > > >> > > > > purpose of the KIP. However, it falls short on all the
> > other
> > > > > >> > "goodies".
> > > > > >> > > > > As you mentioned, passing in `Materialized` might not be
> > > > perfect
> > > > > >> and
> > > > > >> > > > > maybe we need to deprecate is at some point; but this is
> > > also
> > > > > true
> > > > > >> > for
> > > > > >> > > > > passing in just a name.
> > > > > >> > > > >
> > > > > >> > > > > I am also not convinced, that a `StreamJoinStore` would
> > > > resolve
> > > > > >> all
> > > > > >> > the
> > > > > >> > > > > issues. In the end, as long as we are using a
> > > `WindowedStore`
> > > > > >> > > > > internally, we need to expose this "implemenation
> detail"
> > to
> > > > > >> users to
> > > > > >> > > > > allow them to plug in a custom store. Adding
> > `Materialized`
> > > > seem
> > > > > >> to
> > > > > >> > be
> > > > > >> > > > > the best short-term fix from my point of view.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > -Matthias
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On 6/27/19 9:56 AM, Guozhang Wang wrote:
> > > > > >> > > > > > Hi John,
> > > > > >> > > > > >
> > > > > >> > > > > > I actually feels better about a new interface but I'm
> > not
> > > > sure
> > > > > >> if
> > > > > >> > we
> > > > > >> > > > > would
> > > > > >> > > > > > need the full configuration of store / log / cache,
> now
> > or
> > > > in
> > > > > >> the
> > > > > >> > > future
> > > > > >> > > > > > ever for stream-stream join.
> > > > > >> > > > > >
> > > > > >> > > > > > Right now I feel that 1) we want to improve our
> > > > implementation
> > > > > >> of
> > > > > >> > > > > > stream-stream join, and potentially also allow users
> to
> > > > > >> customize
> > > > > >> > > this
> > > > > >> > > > > > implementation but with a more suitable interface than
> > the
> > > > > >> current
> > > > > >> > > > > > WindowStore interface, how to do that is less clear
> and
> > > > > >> > > execution-wise
> > > > > >> > > > > it's
> > > > > >> > > > > > (arguably..) not urgent; 2) we want to close the last
> > gap
> > > > > >> > > (Stream-stream
> > > > > >> > > > > > join) of allowing users to specify all internal names
> to
> > > > help
> > > > > on
> > > > > >> > > backward
> > > > > >> > > > > > compatibility, which is urgent.
> > > > > >> > > > > >
> > > > > >> > > > > > Therefore if we want to unblock 2) from 1) in the near
> > > > term, I
> > > > > >> feel
> > > > > >> > > > > > slightly inclined to just add overload functions that
> > > takes
> > > > > in a
> > > > > >> > > store
> > > > > >> > > > > name
> > > > > >> > > > > > for stream-stream joins only -- and admittedly, in the
> > > > future
> > > > > >> this
> > > > > >> > > > > function
> > > > > >> > > > > > maybe deprecated -- i.e. if we have to do something
> that
> > > we
> > > > > "may
> > > > > >> > > regret"
> > > > > >> > > > > in
> > > > > >> > > > > > the future, I'd like to pick the least intrusive
> option.
> > > > > >> > > > > >
> > > > > >> > > > > > About `Joined#withStoreName`: since the Joined class
> > > itself
> > > > is
> > > > > >> also
> > > > > >> > > used
> > > > > >> > > > > in
> > > > > >> > > > > > other join types, I feel less comfortable to have a
> > > > > >> > > > > `Joined#withStoreName`
> > > > > >> > > > > > which is only going to be used by stream-stream join.
> Or
> > > > > maybe I
> > > > > >> > miss
> > > > > >> > > > > > something here about the "latter" case that you are
> > > > referring
> > > > > >> to?
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > Guozhang
> > > > > >> > > > > >
> > > > > >> > > > > > On Mon, Jun 24, 2019 at 12:16 PM John Roesler <
> > > > > >> john@confluent.io>
> > > > > >> > > wrote:
> > > > > >> > > > > >
> > > > > >> > > > > >> Thanks Guozhang,
> > > > > >> > > > > >>
> > > > > >> > > > > >> Yep. Maybe we can consider just exactly what the join
> > > > needs:
> > > > > >> > > > > >>
> > > > > >> > > > > >>> the WindowStore<Bytes, byte[]> itself is fine, if
> > overly
> > > > > >> broad,
> > > > > >> > > > > >>> since the only two methods we need are
> > `window.put(key,
> > > > > value,
> > > > > >> > > > > >>> context().timestamp())` and `WindowStoreIterator<V2>
> > > iter
> > > > =
> > > > > >> > > > > >>> window.fetch(key, timeFrom, timeTo)`.
> > > > > >> > > > > >>
> > > > > >> > > > > >> One "middle ground" would be to extract _this_ into a
> > new
> > > > > store
> > > > > >> > > > > >> interface, which only supports these API calls, like
> > > > > >> > > > > >> StreamJoinStore<K, V>. This would give us the
> latitude
> > we
> > > > > need
> > > > > >> to
> > > > > >> > > > > >> efficiently support the exact operation without
> > > concerning
> > > > > >> > ourselves
> > > > > >> > > > > >> with all the other things a WindowStore can do (which
> > are
> > > > > >> > > unreachable
> > > > > >> > > > > >> for the join use case). It would also let us drop
> > "store
> > > > > >> > duplicates"
> > > > > >> > > > > >> from the main WindowStore interface, since it only
> > exists
> > > > to
> > > > > >> > support
> > > > > >> > > > > >> the join use case.
> > > > > >> > > > > >>
> > > > > >> > > > > >> If we were to add a new StreamJoinStore interface,
> then
> > > > it'd
> > > > > be
> > > > > >> > > > > >> straightforward how we could add also
> > > > > >> > > > > >> `Materialized.as(StreamJoinBytesStoreSupplier)` and
> use
> > > > > >> > > Materialized,
> > > > > >> > > > > >> or alternatively add the ability to set the bytes
> store
> > > on
> > > > > >> Joined.
> > > > > >> > > > > >>
> > > > > >> > > > > >> Personally, I'm kind of leaning toward the latter
> (and
> > > also
> > > > > >> doing
> > > > > >> > > > > >> `Joined#withStoreName`), since adding the new
> interface
> > > to
> > > > > >> > > > > >> Materialized then also pollutes the interface for its
> > > > > _actual_
> > > > > >> use
> > > > > >> > > > > >> case of materializing a table view. Of course, to
> solve
> > > the
> > > > > >> > > immediate
> > > > > >> > > > > >> problem, all we need is the store name, but we might
> > feel
> > > > > >> better
> > > > > >> > > about
> > > > > >> > > > > >> adding the store name to Joined if we _also_ feel
> like
> > in
> > > > the
> > > > > >> > > future,
> > > > > >> > > > > >> we would add store/log/cache configuration to Joined
> as
> > > > well.
> > > > > >> > > > > >>
> > > > > >> > > > > >> -John
> > > > > >> > > > > >>
> > > > > >> > > > > >> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <
> > > > > >> > wangguoz@gmail.com>
> > > > > >> > > > > wrote:
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> Hello John,
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> My main concern is exactly the first point at the
> > bottom
> > > > of
> > > > > >> your
> > > > > >> > > > > analysis
> > > > > >> > > > > >>> here: "* configure the bytes store". I'm not sure if
> > > > using a
> > > > > >> > window
> > > > > >> > > > > bytes
> > > > > >> > > > > >>> store would be ideal for stream-stream windowed
> join;
> > > e.g.
> > > > > we
> > > > > >> > could
> > > > > >> > > > > >>> consider two dimensional list sorted by timestamps
> and
> > > > then
> > > > > by
> > > > > >> > > keys to
> > > > > >> > > > > do
> > > > > >> > > > > >>> the join, whereas a windowed bytes store is
> basically
> > > > sorted
> > > > > >> by
> > > > > >> > key
> > > > > >> > > > > >> first,
> > > > > >> > > > > >>> then by timestamp. If we expose the Materialized to
> > let
> > > > user
> > > > > >> pass
> > > > > >> > > in a
> > > > > >> > > > > >>> windowed bytes store, then we would need to change
> > that
> > > if
> > > > > we
> > > > > >> > want
> > > > > >> > > to
> > > > > >> > > > > >>> replace it with a different implementation
> interface.
> > > > > >> > > > > >>>
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> Guozhang
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <
> > > > > >> john@confluent.io>
> > > > > >> > > > > wrote:
> > > > > >> > > > > >>>
> > > > > >> > > > > >>>> Hey Guozhang and Bill,
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> For what it's worth, I agree with you both!
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> I think it might help the discussion to look
> > concretely
> > > > at
> > > > > >> what
> > > > > >> > > > > >>>> Materialized does:
> > > > > >> > > > > >>>> * set a WindowBytesStoreSupplier
> > > > > >> > > > > >>>> * set a name
> > > > > >> > > > > >>>> * set the key/value serdes
> > > > > >> > > > > >>>> * disable/enable/configure change-logging
> > > > > >> > > > > >>>> * disable/enable caching
> > > > > >> > > > > >>>> * configure retention
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> Further, looking into the WindowBytesStoreSupplier,
> > the
> > > > > >> > interface
> > > > > >> > > lets
> > > > > >> > > > > >> you:
> > > > > >> > > > > >>>> * get the segment interval
> > > > > >> > > > > >>>> * get the window size
> > > > > >> > > > > >>>> * get whether "duplicates" are enabled
> > > > > >> > > > > >>>> * get the retention period
> > > > > >> > > > > >>>> * (obviously) get a WindowStore<Bytes, byte[]>
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> We know that Materialized isn't exactly what we
> need
> > > for
> > > > > >> stream
> > > > > >> > > joins,
> > > > > >> > > > > >>>> but we can see how close Materialized is to what we
> > > need.
> > > > > If
> > > > > >> it
> > > > > >> > is
> > > > > >> > > > > >>>> close, maybe we can use it and document the gaps,
> and
> > > if
> > > > it
> > > > > >> is
> > > > > >> > not
> > > > > >> > > > > >>>> close, then maybe we should just add what we need
> to
> > > > > Joined.
> > > > > >> > > > > >>>> Stream Join's requirements for its stores:
> > > > > >> > > > > >>>> * a multimap store (i.e., it keeps duplicates) for
> > > > storing
> > > > > >> > general
> > > > > >> > > > > >>>> (not windowed) keyed records associated with their
> > > > > insertion
> > > > > >> > > time, and
> > > > > >> > > > > >>>> allows efficient time-bounded lookups and also
> > > efficient
> > > > > >> purges
> > > > > >> > > of old
> > > > > >> > > > > >>>> data.
> > > > > >> > > > > >>>> ** Note, a properly configured
> > WindowBytesStoreSupplier
> > > > > >> > satisfies
> > > > > >> > > this
> > > > > >> > > > > >>>> requirement, and the interface supports the queries
> > we
> > > > need
> > > > > >> to
> > > > > >> > > verify
> > > > > >> > > > > >>>> the configuration at run-time
> > > > > >> > > > > >>>> * set a name for the store
> > > > > >> > > > > >>>> * do _not_ set the serdes (they are already set in
> > > > Joined)
> > > > > >> > > > > >>>> * logging could be configurable (set to enabled
> now)
> > > > > >> > > > > >>>> * caching could be configurable (set to enabled
> now)
> > > > > >> > > > > >>>> * do _not_ configure retention (determined by
> > > > JoinWindows)
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> So, out of six capabilities for Materialized, there
> > are
> > > > two
> > > > > >> we
> > > > > >> > > don't
> > > > > >> > > > > >>>> want (serdes and retention). These would become
> > > run-time
> > > > > >> checks
> > > > > >> > > if we
> > > > > >> > > > > >>>> use it.
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> A third questionable capability is to provide a
> > > > > >> > > > > >>>> WindowBytesStoreSupplier. Looking at whether the
> > > > > >> > > > > >>>> WindowBytesStoreSupplier is the right interface for
> > > > Stream
> > > > > >> Join:
> > > > > >> > > > > >>>> * configuring segment interval is fine
> > > > > >> > > > > >>>> * should _not_ configure window size (it's
> determined
> > > by
> > > > > >> > > JoinWindows)
> > > > > >> > > > > >>>> * duplicates _must_ be enabled
> > > > > >> > > > > >>>> * retention should be _at least_ windowSize +
> > > > gracePeriod,
> > > > > >> but
> > > > > >> > > note
> > > > > >> > > > > >>>> that (unlike for Table window stores) there is no
> > > utility
> > > > > in
> > > > > >> > > having a
> > > > > >> > > > > >>>> longer retention time.
> > > > > >> > > > > >>>> * the WindowStore<Bytes, byte[]> itself is fine, if
> > > > overly
> > > > > >> > broad,
> > > > > >> > > > > >>>> since the only two methods we need are
> > `window.put(key,
> > > > > >> value,
> > > > > >> > > > > >>>> context().timestamp())` and
> `WindowStoreIterator<V2>
> > > > iter =
> > > > > >> > > > > >>>> window.fetch(key, timeFrom, timeTo)`.
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> Thus, flattening out the overlap for
> > > > > WindowBytesStoreSupplier
> > > > > >> > > onto the
> > > > > >> > > > > >>>> overlap for Materialized, we have 9 capabilities
> > total
> > > > > (note
> > > > > >> > > retention
> > > > > >> > > > > >>>> is duplicated), we have 4 that we don't want:
> > > > > >> > > > > >>>> * do _not_ set the serdes (they are already set in
> > > > Joined)
> > > > > >> > > > > >>>> * do _not_ configure retention (determined by
> > > > JoinWindows)
> > > > > >> > > > > >>>> * should _not_ configure window size (it's
> determined
> > > by
> > > > > >> > > JoinWindows)
> > > > > >> > > > > >>>> * duplicates _must_ be enabled
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> These gaps would have to be covered with run-time
> > > checks
> > > > if
> > > > > >> we
> > > > > >> > > re-use
> > > > > >> > > > > >>>> Materialized and WindowStoreBytesStoreSupplier
> both.
> > > > Maybe
> > > > > >> this
> > > > > >> > > sounds
> > > > > >> > > > > >>>> bad, but consider the other side, that we get 5 new
> > > > > >> capabilities
> > > > > >> > > we
> > > > > >> > > > > >>>> don't require, but are still pretty nice:
> > > > > >> > > > > >>>> * configure the bytes store
> > > > > >> > > > > >>>> * set a name for the store
> > > > > >> > > > > >>>> * configure caching
> > > > > >> > > > > >>>> * configure logging
> > > > > >> > > > > >>>> * configure segment interval
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> Not sure where this nets us out, but it's food for
> > > > thought.
> > > > > >> > > > > >>>> -John
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <
> > > > > >> > wangguoz@gmail.com
> > > > > >> > > >
> > > > > >> > > > > >> wrote:
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> Hi Bill,
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> I think by giving a Materialized param into
> > > > stream-stream
> > > > > >> join,
> > > > > >> > > it's
> > > > > >> > > > > >> okay
> > > > > >> > > > > >>>>> (though still ideal) to say "we still would not
> > expose
> > > > the
> > > > > >> > store
> > > > > >> > > for
> > > > > >> > > > > >>>>> queries", but it would sound a bit awkward to say
> > "we
> > > > > would
> > > > > >> > also
> > > > > >> > > > > >> ignore
> > > > > >> > > > > >>>>> whatever the passed in store supplier but just use
> > our
> > > > > >> default
> > > > > >> > > ones"
> > > > > >> > > > > >> --
> > > > > >> > > > > >>>>> again the concern is that, if in the future we'd
> > want
> > > to
> > > > > >> change
> > > > > >> > > the
> > > > > >> > > > > >>>> default
> > > > > >> > > > > >>>>> implementation of join algorithm which no longer
> > rely
> > > > on a
> > > > > >> > window
> > > > > >> > > > > >> store
> > > > > >> > > > > >>>>> with deduping enabled, then we need to change this
> > API
> > > > > >> again by
> > > > > >> > > > > >> changing
> > > > > >> > > > > >>>>> the store supplier type.
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> If we do want to fill this hole for stream-stream
> > > join,
> > > > I
> > > > > >> feel
> > > > > >> > > just
> > > > > >> > > > > >>>> adding
> > > > > >> > > > > >>>>> a String typed store-name would even be less
> > > > > >> future-intrusive
> > > > > >> > if
> > > > > >> > > we
> > > > > >> > > > > >>>> expect
> > > > > >> > > > > >>>>> this parameter to be modified later.
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> Does that makes sense?
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> Guozhang
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <
> > > > > >> > bbejeck@gmail.com>
> > > > > >> > > > > >> wrote:
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>> Thanks for the comments John and Guozhang, I'll
> > > address
> > > > > >> each
> > > > > >> > > one of
> > > > > >> > > > > >>>> your
> > > > > >> > > > > >>>>>> comments in turn.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> John,
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>> I'm wondering about a missing quadrant from the
> > > truth
> > > > > >> table
> > > > > >> > > > > >> involving
> > > > > >> > > > > >>>>>>> whether a Materialized is stored or not and
> > querying
> > > > is
> > > > > >> > > > > >>>>>>> enabled/disabled... What should be the behavior
> if
> > > > there
> > > > > >> is
> > > > > >> > no
> > > > > >> > > > > >> store
> > > > > >> > > > > >>>>>>> configured (e.g., if Materialized with only
> > serdes)
> > > > and
> > > > > >> > > querying
> > > > > >> > > > > >> is
> > > > > >> > > > > >>>>>> enabled?
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>> It seems we have two choices:
> > > > > >> > > > > >>>>>>> 1. we can force creation of a state store in
> this
> > > > case,
> > > > > so
> > > > > >> > the
> > > > > >> > > > > >> store
> > > > > >> > > > > >>>>>>> can be used to serve the queries
> > > > > >> > > > > >>>>>>> 2. we can provide just a queriable view,
> basically
> > > > > >> letting IQ
> > > > > >> > > > > >> query
> > > > > >> > > > > >>>>>>> into the "KTableValueGetter", which would
> > > > transparently
> > > > > >> > > > > >> construct the
> > > > > >> > > > > >>>>>>> query response by applying the operator logic to
> > the
> > > > > >> upstream
> > > > > >> > > > > >> state
> > > > > >> > > > > >>>> if
> > > > > >> > > > > >>>>>>> the operator state isn't already stored.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> I agree with your assertion about a missing
> > quadrant
> > > > from
> > > > > >> the
> > > > > >> > > truth
> > > > > >> > > > > >>>> table.
> > > > > >> > > > > >>>>>> Additionally, I too like the concept of a
> queriable
> > > > view.
> > > > > >> > But I
> > > > > >> > > > > >> think
> > > > > >> > > > > >>>> that
> > > > > >> > > > > >>>>>> goes a bit beyond the scope of this KIP and would
> > > like
> > > > to
> > > > > >> > pursue
> > > > > >> > > > > >> that
> > > > > >> > > > > >>>>>> feature as follow-on work.  Also thinking about
> > this
> > > > KIP
> > > > > >> some
> > > > > >> > > > > >> more, I'm
> > > > > >> > > > > >>>>>> thinking of the changes to Materialized might be
> a
> > > > reach
> > > > > as
> > > > > >> > > well.
> > > > > >> > > > > >>>>>> Separating the naming from a store and its
> > queryable
> > > > > state
> > > > > >> > seems
> > > > > >> > > > > >> like a
> > > > > >> > > > > >>>>>> complex issue in and of itself and should be
> > treated
> > > > > >> > > accordingly.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> So here's what I'm thinking now.  We add
> > Materialzied
> > > > to
> > > > > >> Join,
> > > > > >> > > but
> > > > > >> > > > > >> for
> > > > > >> > > > > >>>> now,
> > > > > >> > > > > >>>>>> we internally disable querying.  I know this
> breaks
> > > our
> > > > > >> > current
> > > > > >> > > > > >>>> semantic
> > > > > >> > > > > >>>>>> approach, but I think it's crucial that we do two
> > > > things
> > > > > in
> > > > > >> > this
> > > > > >> > > > > >> KIP
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>    1. Break the naming of the state stores from
> > > Joined
> > > > to
> > > > > >> > > > > >>>> Materialized, so
> > > > > >> > > > > >>>>>>    the naming of state stores follows our current
> > > > pattern
> > > > > >> and
> > > > > >> > > > > >> enables
> > > > > >> > > > > >>>>>> upgrades
> > > > > >> > > > > >>>>>>    from 2.3 to 2.4
> > > > > >> > > > > >>>>>>    2. Offer the ability to configure the state
> > stores
> > > > of
> > > > > >> the
> > > > > >> > > join,
> > > > > >> > > > > >> even
> > > > > >> > > > > >>>>>>    providing a different implementation (i.e.
> > > > in-memory)
> > > > > if
> > > > > >> > > > > >> desired.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> With that in mind I'm considering changing the
> KIP
> > to
> > > > > >> remove
> > > > > >> > the
> > > > > >> > > > > >>>> changes to
> > > > > >> > > > > >>>>>> Materialized, and we document very clearly that
> by
> > > > > >> providing a
> > > > > >> > > > > >>>> Materialized
> > > > > >> > > > > >>>>>> object with a name is only for naming the state
> > > store,
> > > > > >> hence
> > > > > >> > the
> > > > > >> > > > > >>>> changelog
> > > > > >> > > > > >>>>>> topics and any possible configurations of the
> > store,
> > > > but
> > > > > >> this
> > > > > >> > > store
> > > > > >> > > > > >>>> *will
> > > > > >> > > > > >>>>>> not be available for IQ.*
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> WDYT?
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> Guozhang,
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>> 1. About not breaking compatibility of
> > stream-stream
> > > > > join
> > > > > >> > > > > >>>> materialized
> > > > > >> > > > > >>>>>>> stores: I think this is a valid issue to tackle,
> > but
> > > > > after
> > > > > >> > > > > >> thinking
> > > > > >> > > > > >>>> about
> > > > > >> > > > > >>>>>>> it once more I'm not sure if exposing
> Materialized
> > > > would
> > > > > >> be a
> > > > > >> > > > > >> good
> > > > > >> > > > > >>>>>> solution
> > > > > >> > > > > >>>>>>> here. My rationles:
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of
> > > > > >> window-store
> > > > > >> > > is
> > > > > >> > > > > >> not
> > > > > >> > > > > >>>>>> ideal,
> > > > > >> > > > > >>>>>>> and we want to modify it in the near future to
> be
> > > more
> > > > > >> > > > > >> efficient. Not
> > > > > >> > > > > >>>>>>> allowing users to override such state store
> > backend
> > > > > gives
> > > > > >> us
> > > > > >> > > such
> > > > > >> > > > > >>>> freedom
> > > > > >> > > > > >>>>>>> (which was also considered in the original DSL
> > > > design),
> > > > > >> > whereas
> > > > > >> > > > > >>>> getting a
> > > > > >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out
> that
> > > > > freedom
> > > > > >> > out
> > > > > >> > > > > >> of the
> > > > > >> > > > > >>>>>>> window.
> > > > > >> > > > > >>>>>>> 1.b For strema-stream join, in our original
> design
> > > we
> > > > > >> intend
> > > > > >> > to
> > > > > >> > > > > >>>> "never"
> > > > > >> > > > > >>>>>>> want users to query the state, since it is just
> > for
> > > > > >> buffering
> > > > > >> > > the
> > > > > >> > > > > >>>>>> upcoming
> > > > > >> > > > > >>>>>>> records from the stream. Now I know that some
> > users
> > > > may
> > > > > >> > indeed
> > > > > >> > > > > >> want
> > > > > >> > > > > >>>> to
> > > > > >> > > > > >>>>>>> query it from the debugging perspective, but
> > still I
> > > > > >> > concerned
> > > > > >> > > > > >> about
> > > > > >> > > > > >>>>>>> whether leveraging IQ for debugging purposes
> would
> > > be
> > > > > the
> > > > > >> > right
> > > > > >> > > > > >>>> solution
> > > > > >> > > > > >>>>>>> here. And adding Materialized object opens the
> > door
> > > to
> > > > > let
> > > > > >> > > users
> > > > > >> > > > > >>>> query
> > > > > >> > > > > >>>>>>> about it (unless we did something intentionally
> to
> > > > still
> > > > > >> > > forbids
> > > > > >> > > > > >> it),
> > > > > >> > > > > >>>>>> which
> > > > > >> > > > > >>>>>>> also restricts us in the future.
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> 2. About the coupling between
> Materialized.name()
> > > and
> > > > > >> > > queryable:
> > > > > >> > > > > >>>> again I
> > > > > >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if
> > the
> > > > > >> current
> > > > > >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at
> Materialized
> > is
> > > > the
> > > > > >> best
> > > > > >> > > > > >>>> approach.
> > > > > >> > > > > >>>>>>> Here I think I agree with John, that generally
> > > > speaking
> > > > > >> it's
> > > > > >> > > > > >> better
> > > > > >> > > > > >>>> be a
> > > > > >> > > > > >>>>>>> control function on the `KTable` itself, rather
> > than
> > > > on
> > > > > >> > > > > >>>> `Materialized`,
> > > > > >> > > > > >>>>>> so
> > > > > >> > > > > >>>>>>> fixing it via adding functions through
> > > `Materialized`
> > > > > >> seems
> > > > > >> > > not a
> > > > > >> > > > > >>>> natural
> > > > > >> > > > > >>>>>> approach either.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> I understand your thoughts here, and up to a
> > point, I
> > > > > agree
> > > > > >> > with
> > > > > >> > > > > >> you.
> > > > > >> > > > > >>>>>> But concerning not providing Materialized as it
> may
> > > > > >> restrict
> > > > > >> > us
> > > > > >> > > in
> > > > > >> > > > > >> the
> > > > > >> > > > > >>>>>> future for delivering different implementations,
> > I'm
> > > > > >> wondering
> > > > > >> > > if
> > > > > >> > > > > >> we
> > > > > >> > > > > >>>> are
> > > > > >> > > > > >>>>>> doing some premature optimization here.
> > > > > >> > > > > >>>>>> My rationale for saying so
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>    1. I think the cost of not allowing the naming
> > of
> > > > > state
> > > > > >> > > stores
> > > > > >> > > > > >> for
> > > > > >> > > > > >>>> joins
> > > > > >> > > > > >>>>>>    is too big of a gap to leave.   IMHO for joins
> > to
> > > > > follow
> > > > > >> > the
> > > > > >> > > > > >> current
> > > > > >> > > > > >>>>>>    pattern of using Materialized for naming state
> > > > stores
> > > > > >> would
> > > > > >> > > be
> > > > > >> > > > > >> what
> > > > > >> > > > > >>>> most
> > > > > >> > > > > >>>>>>    users would expect to use.  As I said in my
> > > comments
> > > > > >> > above, I
> > > > > >> > > > > >> think
> > > > > >> > > > > >>>> we
> > > > > >> > > > > >>>>>>    should *not include* the changes to
> Materialized
> > > and
> > > > > >> > enforce
> > > > > >> > > > > >> named
> > > > > >> > > > > >>>>>>    stores for joins as unavailable for IQ.
> > > > > >> > > > > >>>>>>    2. We'll still have the join methods available
> > > > > without a
> > > > > >> > > > > >>>> Materialized
> > > > > >> > > > > >>>>>>    allowing us to do something different
> internally
> > > if
> > > > a
> > > > > >> > > > > >> Materialized
> > > > > >> > > > > >>>> is
> > > > > >> > > > > >>>>>> not
> > > > > >> > > > > >>>>>>    provided.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use
> > two
> > > > > stones
> > > > > >> > > rather
> > > > > >> > > > > >>>> than
> > > > > >> > > > > >>>>>> one
> > > > > >> > > > > >>>>>>> to kill these two birds, and probably for this
> KIP
> > > we
> > > > > just
> > > > > >> > > focus
> > > > > >> > > > > >> on
> > > > > >> > > > > >>>> 1)
> > > > > >> > > > > >>>>>>> above. And for that I'd like to not expose the
> > > > > >> Materialized
> > > > > >> > > > > >> either
> > > > > >> > > > > >>>> for
> > > > > >> > > > > >>>>>>> rationales that I've listed above. Instead, we
> > just
> > > > > >> restrict
> > > > > >> > > > > >> KIP-307
> > > > > >> > > > > >>>> to
> > > > > >> > > > > >>>>>> NOT
> > > > > >> > > > > >>>>>>> use the Joined.name for state store names and
> > always
> > > > use
> > > > > >> > > internal
> > > > > >> > > > > >>>> names
> > > > > >> > > > > >>>>>> as
> > > > > >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of
> not
> > > > being
> > > > > >> able
> > > > > >> > > to
> > > > > >> > > > > >>>> cover
> > > > > >> > > > > >>>>>> all
> > > > > >> > > > > >>>>>>> internal names here, but now I feel this `hole`
> > may
> > > > > >> better be
> > > > > >> > > > > >> filled
> > > > > >> > > > > >>>> by,
> > > > > >> > > > > >>>>>>> e.g. not creating changelog topics but just use
> > the
> > > > > >> upstream
> > > > > >> > to
> > > > > >> > > > > >>>>>>> re-bootstrap the materialized store, more
> > > concretely:
> > > > > when
> > > > > >> > > > > >>>> materializing
> > > > > >> > > > > >>>>>>> the store, try to piggy-back the changelog topic
> > on
> > > an
> > > > > >> > existing
> > > > > >> > > > > >>>> topic,
> > > > > >> > > > > >>>>>> e.g.
> > > > > >> > > > > >>>>>>> a) if the stream is coming directly from some
> > source
> > > > > topic
> > > > > >> > > > > >> (including
> > > > > >> > > > > >>>>>>> repartition topic), make that as changelog topic
> > and
> > > > if
> > > > > >> it is
> > > > > >> > > > > >>>> repartition
> > > > > >> > > > > >>>>>>> topic change the retention / data purging policy
> > > > > >> necessarily
> > > > > >> > as
> > > > > >> > > > > >>>> well; b)
> > > > > >> > > > > >>>>>> if
> > > > > >> > > > > >>>>>>> the stream is coming from some stateless
> > operators,
> > > > > >> delegate
> > > > > >> > > that
> > > > > >> > > > > >>>>>> stateless
> > > > > >> > > > > >>>>>>> operator to the parent stream similar as a); if
> > the
> > > > > >> stream is
> > > > > >> > > > > >> coming
> > > > > >> > > > > >>>> from
> > > > > >> > > > > >>>>>> a
> > > > > >> > > > > >>>>>>> stream-stream join which is the only stateful
> > > operator
> > > > > >> that
> > > > > >> > can
> > > > > >> > > > > >>>> result in
> > > > > >> > > > > >>>>>> a
> > > > > >> > > > > >>>>>>> stream, consider merging the join into multi-way
> > > joins
> > > > > >> (yes,
> > > > > >> > > > > >> this is
> > > > > >> > > > > >>>> a
> > > > > >> > > > > >>>>>> very
> > > > > >> > > > > >>>>>>> hand-wavy thought, but the point here is that we
> > do
> > > > not
> > > > > >> try
> > > > > >> > to
> > > > > >> > > > > >>>> tackle it
> > > > > >> > > > > >>>>>>> now but leave it for a better solution :).
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> I really like this idea!  I agree with you in
> that
> > > this
> > > > > >> > approach
> > > > > >> > > > > >> to too
> > > > > >> > > > > >>>>>> much for adding in this KIP, but we could pick it
> > up
> > > > > later
> > > > > >> and
> > > > > >> > > > > >>>> leverage the
> > > > > >> > > > > >>>>>> Optimization framework to accomplish this re-use.
> > > > > >> > > > > >>>>>> Again, while I agree we should break the naming
> of
> > > join
> > > > > >> state
> > > > > >> > > > > >> stores
> > > > > >> > > > > >>>> from
> > > > > >> > > > > >>>>>> KIP-307, IMHO it's something we should fix now as
> > it
> > > > will
> > > > > >> be
> > > > > >> > the
> > > > > >> > > > > >> last
> > > > > >> > > > > >>>> piece
> > > > > >> > > > > >>>>>> we can provide to give users the ability to
> > > completely
> > > > > make
> > > > > >> > > their
> > > > > >> > > > > >>>>>> topologies "upgrade proof" when adding additional
> > > > > >> operations.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> Thanks again to both of you for comments and I
> look
> > > > > >> forward to
> > > > > >> > > > > >> hearing
> > > > > >> > > > > >>>> back
> > > > > >> > > > > >>>>>> from you.
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> Regards,
> > > > > >> > > > > >>>>>> Bill
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <
> > > > > >> > > wangguoz@gmail.com>
> > > > > >> > > > > >>>> wrote:
> > > > > >> > > > > >>>>>>
> > > > > >> > > > > >>>>>>> Hello Bill,
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> Thanks for the KIP. Glad to see that we can
> likely
> > > > > >> shooting
> > > > > >> > two
> > > > > >> > > > > >> birds
> > > > > >> > > > > >>>>>> with
> > > > > >> > > > > >>>>>>> one stone. I have some concerns though about
> those
> > > > "two
> > > > > >> > birds"
> > > > > >> > > > > >>>>>> themselves:
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> 1. About not breaking compatibility of
> > stream-stream
> > > > > join
> > > > > >> > > > > >>>> materialized
> > > > > >> > > > > >>>>>>> stores: I think this is a valid issue to tackle,
> > but
> > > > > after
> > > > > >> > > > > >> thinking
> > > > > >> > > > > >>>> about
> > > > > >> > > > > >>>>>>> it once more I'm not sure if exposing
> Materialized
> > > > would
> > > > > >> be a
> > > > > >> > > > > >> good
> > > > > >> > > > > >>>>>> solution
> > > > > >> > > > > >>>>>>> here. My rationles:
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of
> > > > > >> window-store
> > > > > >> > > is
> > > > > >> > > > > >> not
> > > > > >> > > > > >>>>>> ideal,
> > > > > >> > > > > >>>>>>> and we want to modify it in the near future to
> be
> > > more
> > > > > >> > > > > >> efficient. Not
> > > > > >> > > > > >>>>>>> allowing users to override such state store
> > backend
> > > > > gives
> > > > > >> us
> > > > > >> > > such
> > > > > >> > > > > >>>> freedom
> > > > > >> > > > > >>>>>>> (which was also considered in the original DSL
> > > > design),
> > > > > >> > whereas
> > > > > >> > > > > >>>> getting a
> > > > > >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out
> that
> > > > > freedom
> > > > > >> > out
> > > > > >> > > > > >> of the
> > > > > >> > > > > >>>>>>> window.
> > > > > >> > > > > >>>>>>> 1.b For strema-stream join, in our original
> design
> > > we
> > > > > >> intend
> > > > > >> > to
> > > > > >> > > > > >>>> "never"
> > > > > >> > > > > >>>>>>> want users to query the state, since it is just
> > for
> > > > > >> buffering
> > > > > >> > > the
> > > > > >> > > > > >>>>>> upcoming
> > > > > >> > > > > >>>>>>> records from the stream. Now I know that some
> > users
> > > > may
> > > > > >> > indeed
> > > > > >> > > > > >> want
> > > > > >> > > > > >>>> to
> > > > > >> > > > > >>>>>>> query it from the debugging perspective, but
> > still I
> > > > > >> > concerned
> > > > > >> > > > > >> about
> > > > > >> > > > > >>>>>>> whether leveraging IQ for debugging purposes
> would
> > > be
> > > > > the
> > > > > >> > right
> > > > > >> > > > > >>>> solution
> > > > > >> > > > > >>>>>>> here. And adding Materialized object opens the
> > door
> > > to
> > > > > let
> > > > > >> > > users
> > > > > >> > > > > >>>> query
> > > > > >> > > > > >>>>>>> about it (unless we did something intentionally
> to
> > > > still
> > > > > >> > > forbids
> > > > > >> > > > > >> it),
> > > > > >> > > > > >>>>>> which
> > > > > >> > > > > >>>>>>> also restricts us in the future.
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> 2. About the coupling between
> Materialized.name()
> > > and
> > > > > >> > > queryable:
> > > > > >> > > > > >>>> again I
> > > > > >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if
> > the
> > > > > >> current
> > > > > >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at
> Materialized
> > is
> > > > the
> > > > > >> best
> > > > > >> > > > > >>>> approach.
> > > > > >> > > > > >>>>>>> Here I think I agree with John, that generally
> > > > speaking
> > > > > >> it's
> > > > > >> > > > > >> better
> > > > > >> > > > > >>>> be a
> > > > > >> > > > > >>>>>>> control function on the `KTable` itself, rather
> > than
> > > > on
> > > > > >> > > > > >>>> `Materialized`,
> > > > > >> > > > > >>>>>> so
> > > > > >> > > > > >>>>>>> fixing it via adding functions through
> > > `Materialized`
> > > > > >> seems
> > > > > >> > > not a
> > > > > >> > > > > >>>> natural
> > > > > >> > > > > >>>>>>> approach either.
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use
> > two
> > > > > stones
> > > > > >> > > rather
> > > > > >> > > > > >>>> than
> > > > > >> > > > > >>>>>> one
> > > > > >> > > > > >>>>>>> to kill these two birds, and probably for this
> KIP
> > > we
> > > > > just
> > > > > >> > > focus
> > > > > >> > > > > >> on
> > > > > >> > > > > >>>> 1)
> > > > > >> > > > > >>>>>>> above. And for that I'd like to not expose the
> > > > > >> Materialized
> > > > > >> > > > > >> either
> > > > > >> > > > > >>>> for
> > > > > >> > > > > >>>>>>> rationales that I've listed above. Instead, we
> > just
> > > > > >> restrict
> > > > > >> > > > > >> KIP-307
> > > > > >> > > > > >>>> to
> > > > > >> > > > > >>>>>> NOT
> > > > > >> > > > > >>>>>>> use the Joined.name for state store names and
> > always
> > > > use
> > > > > >> > > internal
> > > > > >> > > > > >>>> names
> > > > > >> > > > > >>>>>> as
> > > > > >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of
> not
> > > > being
> > > > > >> able
> > > > > >> > > to
> > > > > >> > > > > >>>> cover
> > > > > >> > > > > >>>>>> all
> > > > > >> > > > > >>>>>>> internal names here, but now I feel this `hole`
> > may
> > > > > >> better be
> > > > > >> > > > > >> filled
> > > > > >> > > > > >>>> by,
> > > > > >> > > > > >>>>>>> e.g. not creating changelog topics but just use
> > the
> > > > > >> upstream
> > > > > >> > to
> > > > > >> > > > > >>>>>>> re-bootstrap the materialized store, more
> > > concretely:
> > > > > when
> > > > > >> > > > > >>>> materializing
> > > > > >> > > > > >>>>>>> the store, try to piggy-back the changelog topic
> > on
> > > an
> > > > > >> > existing
> > > > > >> > > > > >>>> topic,
> > > > > >> > > > > >>>>>> e.g.
> > > > > >> > > > > >>>>>>> a) if the stream is coming directly from some
> > source
> > > > > topic
> > > > > >> > > > > >> (including
> > > > > >> > > > > >>>>>>> repartition topic), make that as changelog topic
> > and
> > > > if
> > > > > >> it is
> > > > > >> > > > > >>>> repartition
> > > > > >> > > > > >>>>>>> topic change the retention / data purging policy
> > > > > >> necessarily
> > > > > >> > as
> > > > > >> > > > > >>>> well; b)
> > > > > >> > > > > >>>>>> if
> > > > > >> > > > > >>>>>>> the stream is coming from some stateless
> > operators,
> > > > > >> delegate
> > > > > >> > > that
> > > > > >> > > > > >>>>>> stateless
> > > > > >> > > > > >>>>>>> operator to the parent stream similar as a); if
> > the
> > > > > >> stream is
> > > > > >> > > > > >> coming
> > > > > >> > > > > >>>>>> from a
> > > > > >> > > > > >>>>>>> stream-stream join which is the only stateful
> > > operator
> > > > > >> that
> > > > > >> > can
> > > > > >> > > > > >>>> result
> > > > > >> > > > > >>>>>> in a
> > > > > >> > > > > >>>>>>> stream, consider merging the join into multi-way
> > > joins
> > > > > >> (yes,
> > > > > >> > > > > >> this is
> > > > > >> > > > > >>>> a
> > > > > >> > > > > >>>>>> very
> > > > > >> > > > > >>>>>>> hand-wavy thought, but the point here is that we
> > do
> > > > not
> > > > > >> try
> > > > > >> > to
> > > > > >> > > > > >>>> tackle it
> > > > > >> > > > > >>>>>>> now but leave it for a better solution :).
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> Guozhang
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler <
> > > > > >> > > john@confluent.io
> > > > > >> > > > > >>>
> > > > > >> > > > > >>>> wrote:
> > > > > >> > > > > >>>>>>>
> > > > > >> > > > > >>>>>>>> Hi Bill,
> > > > > >> > > > > >>>>>>>>
> > > > > >> > > > > >>>>>>>> Thanks for the KIP! Awesome job catching this
> > > > > unexpected
> > > > > >> > > > > >>>> consequence
> > > > > >> > > > > >>>>>>>> of the prior KIPs before it was released.
> > > > > >> > > > > >>>>>>>>
> > > > > >> > > > > >>>>>>>> The proposal looks good to me. On top of just
> > > fixing
> > > > > the
> > > > > >> > > > > >> problem,
> > > > > >> > > > > >>>> it
> > > > > >> > > > > >>>>>>>> seems to address two other pain points:
> > > > > >> > > > > >>>>>>>> * that naming a s

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message