kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-479: Add Materialized to Join
Date Mon, 24 Jun 2019 17:56:23 GMT
Hello John,

My main concern is exactly the first point at the bottom of your analysis
here: "* configure the bytes store". I'm not sure if using a window bytes
store would be ideal for stream-stream windowed join; e.g. we could
consider two dimensional list sorted by timestamps and then by keys to do
the join, whereas a windowed bytes store is basically sorted by key first,
then by timestamp. If we expose the Materialized to let user pass in a
windowed bytes store, then we would need to change that if we want to
replace it with a different implementation interface.


Guozhang

On Mon, Jun 24, 2019 at 8:59 AM John Roesler <john@confluent.io> wrote:

> Hey Guozhang and Bill,
>
> For what it's worth, I agree with you both!
>
> I think it might help the discussion to look concretely at what
> Materialized does:
> * set a WindowBytesStoreSupplier
> * set a name
> * set the key/value serdes
> * disable/enable/configure change-logging
> * disable/enable caching
> * configure retention
>
> Further, looking into the WindowBytesStoreSupplier, the interface lets you:
> * get the segment interval
> * get the window size
> * get whether "duplicates" are enabled
> * get the retention period
> * (obviously) get a WindowStore<Bytes, byte[]>
>
> We know that Materialized isn't exactly what we need for stream joins,
> but we can see how close Materialized is to what we need. If it is
> close, maybe we can use it and document the gaps, and if it is not
> close, then maybe we should just add what we need to Joined.
> Stream Join's requirements for its stores:
> * a multimap store (i.e., it keeps duplicates) for storing general
> (not windowed) keyed records associated with their insertion time, and
> allows efficient time-bounded lookups and also efficient purges of old
> data.
> ** Note, a properly configured WindowBytesStoreSupplier satisfies this
> requirement, and the interface supports the queries we need to verify
> the configuration at run-time
> * set a name for the store
> * do _not_ set the serdes (they are already set in Joined)
> * logging could be configurable (set to enabled now)
> * caching could be configurable (set to enabled now)
> * do _not_ configure retention (determined by JoinWindows)
>
> So, out of six capabilities for Materialized, there are two we don't
> want (serdes and retention). These would become run-time checks if we
> use it.
>
> A third questionable capability is to provide a
> WindowBytesStoreSupplier. Looking at whether the
> WindowBytesStoreSupplier is the right interface for Stream Join:
> * configuring segment interval is fine
> * should _not_ configure window size (it's determined by JoinWindows)
> * duplicates _must_ be enabled
> * retention should be _at least_ windowSize + gracePeriod, but note
> that (unlike for Table window stores) there is no utility in having a
> longer retention time.
> * the WindowStore<Bytes, byte[]> itself is fine, if overly broad,
> since the only two methods we need are `window.put(key, value,
> context().timestamp())` and `WindowStoreIterator<V2> iter =
> window.fetch(key, timeFrom, timeTo)`.
>
> Thus, flattening out the overlap for WindowBytesStoreSupplier onto the
> overlap for Materialized, we have 9 capabilities total (note retention
> is duplicated), we have 4 that we don't want:
> * do _not_ set the serdes (they are already set in Joined)
> * do _not_ configure retention (determined by JoinWindows)
> * should _not_ configure window size (it's determined by JoinWindows)
> * duplicates _must_ be enabled
>
> These gaps would have to be covered with run-time checks if we re-use
> Materialized and WindowStoreBytesStoreSupplier both. Maybe this sounds
> bad, but consider the other side, that we get 5 new capabilities we
> don't require, but are still pretty nice:
> * configure the bytes store
> * set a name for the store
> * configure caching
> * configure logging
> * configure segment interval
>
> Not sure where this nets us out, but it's food for thought.
> -John
>
> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <wangguoz@gmail.com> wrote:
> >
> > Hi Bill,
> >
> > I think by giving a Materialized param into stream-stream join, it's okay
> > (though still ideal) to say "we still would not expose the store for
> > queries", but it would sound a bit awkward to say "we would also ignore
> > whatever the passed in store supplier but just use our default ones" --
> > again the concern is that, if in the future we'd want to change the
> default
> > implementation of join algorithm which no longer rely on a window store
> > with deduping enabled, then we need to change this API again by changing
> > the store supplier type.
> >
> > If we do want to fill this hole for stream-stream join, I feel just
> adding
> > a String typed store-name would even be less future-intrusive if we
> expect
> > this parameter to be modified later.
> >
> > Does that makes sense?
> >
> >
> > Guozhang
> >
> > On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbejeck@gmail.com> wrote:
> >
> > > Thanks for the comments John and Guozhang, I'll address each one of
> your
> > > comments in turn.
> > >
> > > John,
> > >
> > > > I'm wondering about a missing quadrant from the truth table involving
> > > > whether a Materialized is stored or not and querying is
> > > > enabled/disabled... What should be the behavior if there is no store
> > > > configured (e.g., if Materialized with only serdes) and querying is
> > > enabled?
> > >
> > > > It seems we have two choices:
> > > > 1. we can force creation of a state store in this case, so the store
> > > > can be used to serve the queries
> > > > 2. we can provide just a queriable view, basically letting IQ query
> > > > into the "KTableValueGetter", which would transparently construct the
> > > > query response by applying the operator logic to the upstream state
> if
> > > > the operator state isn't already stored.
> > >
> > >
> > > I agree with your assertion about a missing quadrant from the truth
> table.
> > > Additionally, I too like the concept of a queriable view.  But I think
> that
> > > goes a bit beyond the scope of this KIP and would like to pursue that
> > > feature as follow-on work.  Also thinking about this KIP some more, I'm
> > > thinking of the changes to Materialized might be a reach as well.
> > > Separating the naming from a store and its queryable state seems like a
> > > complex issue in and of itself and should be treated accordingly.
> > >
> > > So here's what I'm thinking now.  We add Materialzied to Join, but for
> now,
> > > we internally disable querying.  I know this breaks our current
> semantic
> > > approach, but I think it's crucial that we do two things in this KIP
> > >
> > >    1. Break the naming of the state stores from Joined to
> Materialized, so
> > >    the naming of state stores follows our current pattern and enables
> > > upgrades
> > >    from 2.3 to 2.4
> > >    2. Offer the ability to configure the state stores of the join, even
> > >    providing a different implementation (i.e. in-memory) if desired.
> > >
> > > With that in mind I'm considering changing the KIP to remove the
> changes to
> > > Materialized, and we document very clearly that by providing a
> Materialized
> > > object with a name is only for naming the state store, hence the
> changelog
> > > topics and any possible configurations of the store, but this store
> *will
> > > not be available for IQ.*
> > >
> > > WDYT?
> > >
> > > Guozhang,
> > >
> > > > 1. About not breaking compatibility of stream-stream join
> materialized
> > > > stores: I think this is a valid issue to tackle, but after thinking
> about
> > > > it once more I'm not sure if exposing Materialized would be a good
> > > solution
> > > > here. My rationles:
> > > >
> > > > 1.a For stream-stream join, our current usage of window-store is not
> > > ideal,
> > > > and we want to modify it in the near future to be more efficient. Not
> > > > allowing users to override such state store backend gives us such
> freedom
> > > > (which was also considered in the original DSL design), whereas
> getting a
> > > > Materialized<WindowStore> basically kicks out that freedom out of
the
> > > > window.
> > > > 1.b For strema-stream join, in our original design we intend to
> "never"
> > > > want users to query the state, since it is just for buffering the
> > > upcoming
> > > > records from the stream. Now I know that some users may indeed want
> to
> > > > query it from the debugging perspective, but still I concerned about
> > > > whether leveraging IQ for debugging purposes would be the right
> solution
> > > > here. And adding Materialized object opens the door to let users
> query
> > > > about it (unless we did something intentionally to still forbids it),
> > > which
> > > > also restricts us in the future.
> > > >
> > > > 2. About the coupling between Materialized.name() and queryable:
> again I
> > > > think this is a valid issue. But I'm not sure if the current
> > > > "withQuerryingDisabled / Enabled" at Materialized is the best
> approach.
> > > > Here I think I agree with John, that generally speaking it's better
> be a
> > > > control function on the `KTable` itself, rather than on
> `Materialized`,
> > > so
> > > > fixing it via adding functions through `Materialized` seems not a
> natural
> > > approach either.
> > >
> > > I understand your thoughts here, and up to a point, I agree with you.
> > > But concerning not providing Materialized as it may restrict us in the
> > > future for delivering different implementations, I'm wondering if we
> are
> > > doing some premature optimization here.
> > > My rationale for saying so
> > >
> > >    1. I think the cost of not allowing the naming of state stores for
> joins
> > >    is too big of a gap to leave.   IMHO for joins to follow the current
> > >    pattern of using Materialized for naming state stores would be what
> most
> > >    users would expect to use.  As I said in my comments above, I think
> we
> > >    should *not include* the changes to Materialized and enforce named
> > >    stores for joins as unavailable for IQ.
> > >    2. We'll still have the join methods available without a
> Materialized
> > >    allowing us to do something different internally if a Materialized
> is
> > > not
> > >    provided.
> > >
> > >
> > > > Overall, I'm thinking maybe we should still use two stones rather
> than
> > > one
> > > > to kill these two birds, and probably for this KIP we just focus on
> 1)
> > > > above. And for that I'd like to not expose the Materialized either
> for
> > > > rationales that I've listed above. Instead, we just restrict KIP-307
> to
> > > NOT
> > > > use the Joined.name for state store names and always use internal
> names
> > > as
> > > > well, which admittedly indeed leaves a hole of not being able to
> cover
> > > all
> > > > internal names here, but now I feel this `hole` may better be filled
> by,
> > > > e.g. not creating changelog topics but just use the upstream to
> > > > re-bootstrap the materialized store, more concretely: when
> materializing
> > > > the store, try to piggy-back the changelog topic on an existing
> topic,
> > > e.g.
> > > > a) if the stream is coming directly from some source topic (including
> > > > repartition topic), make that as changelog topic and if it is
> repartition
> > > > topic change the retention / data purging policy necessarily as
> well; b)
> > > if
> > > > the stream is coming from some stateless operators, delegate that
> > > stateless
> > > > operator to the parent stream similar as a); if the stream is coming
> from
> > > a
> > > > stream-stream join which is the only stateful operator that can
> result in
> > > a
> > > > stream, consider merging the join into multi-way joins (yes, this is
> a
> > > very
> > > > hand-wavy thought, but the point here is that we do not try to
> tackle it
> > > > now but leave it for a better solution :).
> > >
> > > I really like this idea!  I agree with you in that this approach to too
> > > much for adding in this KIP, but we could pick it up later and
> leverage the
> > > Optimization framework to accomplish this re-use.
> > > Again, while I agree we should break the naming of join state stores
> from
> > > KIP-307, IMHO it's something we should fix now as it will be the last
> piece
> > > we can provide to give users the ability to completely make their
> > > topologies "upgrade proof" when adding additional operations.
> > >
> > > Thanks again to both of you for comments and I look forward to hearing
> back
> > > from you.
> > >
> > > Regards,
> > > Bill
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> > >
> > > > Hello Bill,
> > > >
> > > > Thanks for the KIP. Glad to see that we can likely shooting two birds
> > > with
> > > > one stone. I have some concerns though about those "two birds"
> > > themselves:
> > > >
> > > > 1. About not breaking compatibility of stream-stream join
> materialized
> > > > stores: I think this is a valid issue to tackle, but after thinking
> about
> > > > it once more I'm not sure if exposing Materialized would be a good
> > > solution
> > > > here. My rationles:
> > > >
> > > > 1.a For stream-stream join, our current usage of window-store is not
> > > ideal,
> > > > and we want to modify it in the near future to be more efficient. Not
> > > > allowing users to override such state store backend gives us such
> freedom
> > > > (which was also considered in the original DSL design), whereas
> getting a
> > > > Materialized<WindowStore> basically kicks out that freedom out of
the
> > > > window.
> > > > 1.b For strema-stream join, in our original design we intend to
> "never"
> > > > want users to query the state, since it is just for buffering the
> > > upcoming
> > > > records from the stream. Now I know that some users may indeed want
> to
> > > > query it from the debugging perspective, but still I concerned about
> > > > whether leveraging IQ for debugging purposes would be the right
> solution
> > > > here. And adding Materialized object opens the door to let users
> query
> > > > about it (unless we did something intentionally to still forbids it),
> > > which
> > > > also restricts us in the future.
> > > >
> > > > 2. About the coupling between Materialized.name() and queryable:
> again I
> > > > think this is a valid issue. But I'm not sure if the current
> > > > "withQuerryingDisabled / Enabled" at Materialized is the best
> approach.
> > > > Here I think I agree with John, that generally speaking it's better
> be a
> > > > control function on the `KTable` itself, rather than on
> `Materialized`,
> > > so
> > > > fixing it via adding functions through `Materialized` seems not a
> natural
> > > > approach either.
> > > >
> > > >
> > > > Overall, I'm thinking maybe we should still use two stones rather
> than
> > > one
> > > > to kill these two birds, and probably for this KIP we just focus on
> 1)
> > > > above. And for that I'd like to not expose the Materialized either
> for
> > > > rationales that I've listed above. Instead, we just restrict KIP-307
> to
> > > NOT
> > > > use the Joined.name for state store names and always use internal
> names
> > > as
> > > > well, which admittedly indeed leaves a hole of not being able to
> cover
> > > all
> > > > internal names here, but now I feel this `hole` may better be filled
> by,
> > > > e.g. not creating changelog topics but just use the upstream to
> > > > re-bootstrap the materialized store, more concretely: when
> materializing
> > > > the store, try to piggy-back the changelog topic on an existing
> topic,
> > > e.g.
> > > > a) if the stream is coming directly from some source topic (including
> > > > repartition topic), make that as changelog topic and if it is
> repartition
> > > > topic change the retention / data purging policy necessarily as
> well; b)
> > > if
> > > > the stream is coming from some stateless operators, delegate that
> > > stateless
> > > > operator to the parent stream similar as a); if the stream is coming
> > > from a
> > > > stream-stream join which is the only stateful operator that can
> result
> > > in a
> > > > stream, consider merging the join into multi-way joins (yes, this is
> a
> > > very
> > > > hand-wavy thought, but the point here is that we do not try to
> tackle it
> > > > now but leave it for a better solution :).
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Wed, Jun 19, 2019 at 11:41 AM John Roesler <john@confluent.io>
> wrote:
> > > >
> > > > > Hi Bill,
> > > > >
> > > > > Thanks for the KIP! Awesome job catching this unexpected
> consequence
> > > > > of the prior KIPs before it was released.
> > > > >
> > > > > The proposal looks good to me. On top of just fixing the problem,
> it
> > > > > seems to address two other pain points:
> > > > > * that naming a state store automatically causes it to become
> > > queriable.
> > > > > * that there's currently no way to configure the bytes store for
> join
> > > > > windows.
> > > > >
> > > > > It's awesome that we can fix this issue and two others with one
> > > feature.
> > > > >
> > > > > I'm wondering about a missing quadrant from the truth table
> involving
> > > > > whether a Materialized is stored or not and querying is
> > > > > enabled/disabled... What should be the behavior if there is no
> store
> > > > > configured (e.g., if Materialized with only serdes) and querying
is
> > > > > enabled?
> > > > >
> > > > > It seems we have two choices:
> > > > > 1. we can force creation of a state store in this case, so the
> store
> > > > > can be used to serve the queries
> > > > > 2. we can provide just a queriable view, basically letting IQ query
> > > > > into the "KTableValueGetter", which would transparently construct
> the
> > > > > query response by applying the operator logic to the upstream
> state if
> > > > > the operator state isn't already stored.
> > > > >
> > > > > Offhand, it seems like the second is actually a pretty awesome
> > > > > capability. But it might have an awkward interaction with the
> current
> > > > > semantics. Presently, if I provide a Materialized.withName, it
> implies
> > > > > that querying should be enabled AND that the view should actually
> be
> > > > > stored in a state store. Under option 2 above, this behavior would
> > > > > change to NOT provision a state store and instead just consult the
> > > > > ValueGetter. To get back to the current behavior, users would have
> to
> > > > > add a "bytes store supplier" to the Materialized to indicate that,
> > > > > yes, they really want a state store there.
> > > > >
> > > > > Behavior changes are always kind of scary, but I think in this
> case,
> > > > > it might actually be preferable. In the event where only the name
> is
> > > > > provided, it means that people just wanted to make the operation
> > > > > result queriable. If we automatically convert this to a non-stored
> > > > > view, then simply upgrading results in the same observable behavior
> > > > > and semantics, but a linear reduction in local storage requirements
> > > > > and disk i/o, as well as a corresponding linear reduction in memory
> > > > > usage both on and off heap.
> > > > >
> > > > > What do you think?
> > > > > -John
> > > > >
> > > > > On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <bbejeck@gmail.com>
> wrote:
> > > > > >
> > > > > > All,
> > > > > >
> > > > > > I'd like to start a discussion for adding a Materialized
> > > configuration
> > > > > > object to KStream.join for naming state stores involved in joins.
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> > > > > >
> > > > > > Your comments and suggestions are welcome.
> > > > > >
> > > > > > Thanks,
> > > > > > Bill
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message