kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: [DISCUSS] KIP-479: Add Materialized to Join
Date Mon, 24 Jun 2019 19:15:50 GMT
Thanks Guozhang,

Yep. Maybe we can consider just exactly what the join needs:

> the WindowStore<Bytes, byte[]> itself is fine, if overly broad,
> since the only two methods we need are `window.put(key, value,
> context().timestamp())` and `WindowStoreIterator<V2> iter =
> window.fetch(key, timeFrom, timeTo)`.

One "middle ground" would be to extract _this_ into a new store
interface, which only supports these API calls, like
StreamJoinStore<K, V>. This would give us the latitude we need to
efficiently support the exact operation without concerning ourselves
with all the other things a WindowStore can do (which are unreachable
for the join use case). It would also let us drop "store duplicates"
from the main WindowStore interface, since it only exists to support
the join use case.

If we were to add a new StreamJoinStore interface, then it'd be
straightforward how we could add also
`Materialized.as(StreamJoinBytesStoreSupplier)` and use Materialized,
or alternatively add the ability to set the bytes store on Joined.

Personally, I'm kind of leaning toward the latter (and also doing
`Joined#withStoreName`), since adding the new interface to
Materialized then also pollutes the interface for its _actual_ use
case of materializing a table view. Of course, to solve the immediate
problem, all we need is the store name, but we might feel better about
adding the store name to Joined if we _also_ feel like in the future,
we would add store/log/cache configuration to Joined as well.

-John

On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> Hello John,
>
> My main concern is exactly the first point at the bottom of your analysis
> here: "* configure the bytes store". I'm not sure if using a window bytes
> store would be ideal for stream-stream windowed join; e.g. we could
> consider two dimensional list sorted by timestamps and then by keys to do
> the join, whereas a windowed bytes store is basically sorted by key first,
> then by timestamp. If we expose the Materialized to let user pass in a
> windowed bytes store, then we would need to change that if we want to
> replace it with a different implementation interface.
>
>
> Guozhang
>
> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <john@confluent.io> wrote:
>
> > Hey Guozhang and Bill,
> >
> > For what it's worth, I agree with you both!
> >
> > I think it might help the discussion to look concretely at what
> > Materialized does:
> > * set a WindowBytesStoreSupplier
> > * set a name
> > * set the key/value serdes
> > * disable/enable/configure change-logging
> > * disable/enable caching
> > * configure retention
> >
> > Further, looking into the WindowBytesStoreSupplier, the interface lets you:
> > * get the segment interval
> > * get the window size
> > * get whether "duplicates" are enabled
> > * get the retention period
> > * (obviously) get a WindowStore<Bytes, byte[]>
> >
> > We know that Materialized isn't exactly what we need for stream joins,
> > but we can see how close Materialized is to what we need. If it is
> > close, maybe we can use it and document the gaps, and if it is not
> > close, then maybe we should just add what we need to Joined.
> > Stream Join's requirements for its stores:
> > * a multimap store (i.e., it keeps duplicates) for storing general
> > (not windowed) keyed records associated with their insertion time, and
> > allows efficient time-bounded lookups and also efficient purges of old
> > data.
> > ** Note, a properly configured WindowBytesStoreSupplier satisfies this
> > requirement, and the interface supports the queries we need to verify
> > the configuration at run-time
> > * set a name for the store
> > * do _not_ set the serdes (they are already set in Joined)
> > * logging could be configurable (set to enabled now)
> > * caching could be configurable (set to enabled now)
> > * do _not_ configure retention (determined by JoinWindows)
> >
> > So, out of six capabilities for Materialized, there are two we don't
> > want (serdes and retention). These would become run-time checks if we
> > use it.
> >
> > A third questionable capability is to provide a
> > WindowBytesStoreSupplier. Looking at whether the
> > WindowBytesStoreSupplier is the right interface for Stream Join:
> > * configuring segment interval is fine
> > * should _not_ configure window size (it's determined by JoinWindows)
> > * duplicates _must_ be enabled
> > * retention should be _at least_ windowSize + gracePeriod, but note
> > that (unlike for Table window stores) there is no utility in having a
> > longer retention time.
> > * the WindowStore<Bytes, byte[]> itself is fine, if overly broad,
> > since the only two methods we need are `window.put(key, value,
> > context().timestamp())` and `WindowStoreIterator<V2> iter =
> > window.fetch(key, timeFrom, timeTo)`.
> >
> > Thus, flattening out the overlap for WindowBytesStoreSupplier onto the
> > overlap for Materialized, we have 9 capabilities total (note retention
> > is duplicated), we have 4 that we don't want:
> > * do _not_ set the serdes (they are already set in Joined)
> > * do _not_ configure retention (determined by JoinWindows)
> > * should _not_ configure window size (it's determined by JoinWindows)
> > * duplicates _must_ be enabled
> >
> > These gaps would have to be covered with run-time checks if we re-use
> > Materialized and WindowStoreBytesStoreSupplier both. Maybe this sounds
> > bad, but consider the other side, that we get 5 new capabilities we
> > don't require, but are still pretty nice:
> > * configure the bytes store
> > * set a name for the store
> > * configure caching
> > * configure logging
> > * configure segment interval
> >
> > Not sure where this nets us out, but it's food for thought.
> > -John
> >
> > On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <wangguoz@gmail.com> wrote:
> > >
> > > Hi Bill,
> > >
> > > I think by giving a Materialized param into stream-stream join, it's okay
> > > (though still ideal) to say "we still would not expose the store for
> > > queries", but it would sound a bit awkward to say "we would also ignore
> > > whatever the passed in store supplier but just use our default ones" --
> > > again the concern is that, if in the future we'd want to change the
> > default
> > > implementation of join algorithm which no longer rely on a window store
> > > with deduping enabled, then we need to change this API again by changing
> > > the store supplier type.
> > >
> > > If we do want to fill this hole for stream-stream join, I feel just
> > adding
> > > a String typed store-name would even be less future-intrusive if we
> > expect
> > > this parameter to be modified later.
> > >
> > > Does that makes sense?
> > >
> > >
> > > Guozhang
> > >
> > > On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbejeck@gmail.com> wrote:
> > >
> > > > Thanks for the comments John and Guozhang, I'll address each one of
> > your
> > > > comments in turn.
> > > >
> > > > John,
> > > >
> > > > > I'm wondering about a missing quadrant from the truth table involving
> > > > > whether a Materialized is stored or not and querying is
> > > > > enabled/disabled... What should be the behavior if there is no store
> > > > > configured (e.g., if Materialized with only serdes) and querying
is
> > > > enabled?
> > > >
> > > > > It seems we have two choices:
> > > > > 1. we can force creation of a state store in this case, so the store
> > > > > can be used to serve the queries
> > > > > 2. we can provide just a queriable view, basically letting IQ query
> > > > > into the "KTableValueGetter", which would transparently construct
the
> > > > > query response by applying the operator logic to the upstream state
> > if
> > > > > the operator state isn't already stored.
> > > >
> > > >
> > > > I agree with your assertion about a missing quadrant from the truth
> > table.
> > > > Additionally, I too like the concept of a queriable view.  But I think
> > that
> > > > goes a bit beyond the scope of this KIP and would like to pursue that
> > > > feature as follow-on work.  Also thinking about this KIP some more, I'm
> > > > thinking of the changes to Materialized might be a reach as well.
> > > > Separating the naming from a store and its queryable state seems like
a
> > > > complex issue in and of itself and should be treated accordingly.
> > > >
> > > > So here's what I'm thinking now.  We add Materialzied to Join, but for
> > now,
> > > > we internally disable querying.  I know this breaks our current
> > semantic
> > > > approach, but I think it's crucial that we do two things in this KIP
> > > >
> > > >    1. Break the naming of the state stores from Joined to
> > Materialized, so
> > > >    the naming of state stores follows our current pattern and enables
> > > > upgrades
> > > >    from 2.3 to 2.4
> > > >    2. Offer the ability to configure the state stores of the join, even
> > > >    providing a different implementation (i.e. in-memory) if desired.
> > > >
> > > > With that in mind I'm considering changing the KIP to remove the
> > changes to
> > > > Materialized, and we document very clearly that by providing a
> > Materialized
> > > > object with a name is only for naming the state store, hence the
> > changelog
> > > > topics and any possible configurations of the store, but this store
> > *will
> > > > not be available for IQ.*
> > > >
> > > > WDYT?
> > > >
> > > > Guozhang,
> > > >
> > > > > 1. About not breaking compatibility of stream-stream join
> > materialized
> > > > > stores: I think this is a valid issue to tackle, but after thinking
> > about
> > > > > it once more I'm not sure if exposing Materialized would be a good
> > > > solution
> > > > > here. My rationles:
> > > > >
> > > > > 1.a For stream-stream join, our current usage of window-store is
not
> > > > ideal,
> > > > > and we want to modify it in the near future to be more efficient.
Not
> > > > > allowing users to override such state store backend gives us such
> > freedom
> > > > > (which was also considered in the original DSL design), whereas
> > getting a
> > > > > Materialized<WindowStore> basically kicks out that freedom
out of the
> > > > > window.
> > > > > 1.b For strema-stream join, in our original design we intend to
> > "never"
> > > > > want users to query the state, since it is just for buffering the
> > > > upcoming
> > > > > records from the stream. Now I know that some users may indeed want
> > to
> > > > > query it from the debugging perspective, but still I concerned about
> > > > > whether leveraging IQ for debugging purposes would be the right
> > solution
> > > > > here. And adding Materialized object opens the door to let users
> > query
> > > > > about it (unless we did something intentionally to still forbids
it),
> > > > which
> > > > > also restricts us in the future.
> > > > >
> > > > > 2. About the coupling between Materialized.name() and queryable:
> > again I
> > > > > think this is a valid issue. But I'm not sure if the current
> > > > > "withQuerryingDisabled / Enabled" at Materialized is the best
> > approach.
> > > > > Here I think I agree with John, that generally speaking it's better
> > be a
> > > > > control function on the `KTable` itself, rather than on
> > `Materialized`,
> > > > so
> > > > > fixing it via adding functions through `Materialized` seems not a
> > natural
> > > > approach either.
> > > >
> > > > I understand your thoughts here, and up to a point, I agree with you.
> > > > But concerning not providing Materialized as it may restrict us in the
> > > > future for delivering different implementations, I'm wondering if we
> > are
> > > > doing some premature optimization here.
> > > > My rationale for saying so
> > > >
> > > >    1. I think the cost of not allowing the naming of state stores for
> > joins
> > > >    is too big of a gap to leave.   IMHO for joins to follow the current
> > > >    pattern of using Materialized for naming state stores would be what
> > most
> > > >    users would expect to use.  As I said in my comments above, I think
> > we
> > > >    should *not include* the changes to Materialized and enforce named
> > > >    stores for joins as unavailable for IQ.
> > > >    2. We'll still have the join methods available without a
> > Materialized
> > > >    allowing us to do something different internally if a Materialized
> > is
> > > > not
> > > >    provided.
> > > >
> > > >
> > > > > Overall, I'm thinking maybe we should still use two stones rather
> > than
> > > > one
> > > > > to kill these two birds, and probably for this KIP we just focus
on
> > 1)
> > > > > above. And for that I'd like to not expose the Materialized either
> > for
> > > > > rationales that I've listed above. Instead, we just restrict KIP-307
> > to
> > > > NOT
> > > > > use the Joined.name for state store names and always use internal
> > names
> > > > as
> > > > > well, which admittedly indeed leaves a hole of not being able to
> > cover
> > > > all
> > > > > internal names here, but now I feel this `hole` may better be filled
> > by,
> > > > > e.g. not creating changelog topics but just use the upstream to
> > > > > re-bootstrap the materialized store, more concretely: when
> > materializing
> > > > > the store, try to piggy-back the changelog topic on an existing
> > topic,
> > > > e.g.
> > > > > a) if the stream is coming directly from some source topic (including
> > > > > repartition topic), make that as changelog topic and if it is
> > repartition
> > > > > topic change the retention / data purging policy necessarily as
> > well; b)
> > > > if
> > > > > the stream is coming from some stateless operators, delegate that
> > > > stateless
> > > > > operator to the parent stream similar as a); if the stream is coming
> > from
> > > > a
> > > > > stream-stream join which is the only stateful operator that can
> > result in
> > > > a
> > > > > stream, consider merging the join into multi-way joins (yes, this
is
> > a
> > > > very
> > > > > hand-wavy thought, but the point here is that we do not try to
> > tackle it
> > > > > now but leave it for a better solution :).
> > > >
> > > > I really like this idea!  I agree with you in that this approach to too
> > > > much for adding in this KIP, but we could pick it up later and
> > leverage the
> > > > Optimization framework to accomplish this re-use.
> > > > Again, while I agree we should break the naming of join state stores
> > from
> > > > KIP-307, IMHO it's something we should fix now as it will be the last
> > piece
> > > > we can provide to give users the ability to completely make their
> > > > topologies "upgrade proof" when adding additional operations.
> > > >
> > > > Thanks again to both of you for comments and I look forward to hearing
> > back
> > > > from you.
> > > >
> > > > Regards,
> > > > Bill
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > > >
> > > > > Hello Bill,
> > > > >
> > > > > Thanks for the KIP. Glad to see that we can likely shooting two birds
> > > > with
> > > > > one stone. I have some concerns though about those "two birds"
> > > > themselves:
> > > > >
> > > > > 1. About not breaking compatibility of stream-stream join
> > materialized
> > > > > stores: I think this is a valid issue to tackle, but after thinking
> > about
> > > > > it once more I'm not sure if exposing Materialized would be a good
> > > > solution
> > > > > here. My rationles:
> > > > >
> > > > > 1.a For stream-stream join, our current usage of window-store is
not
> > > > ideal,
> > > > > and we want to modify it in the near future to be more efficient.
Not
> > > > > allowing users to override such state store backend gives us such
> > freedom
> > > > > (which was also considered in the original DSL design), whereas
> > getting a
> > > > > Materialized<WindowStore> basically kicks out that freedom
out of the
> > > > > window.
> > > > > 1.b For strema-stream join, in our original design we intend to
> > "never"
> > > > > want users to query the state, since it is just for buffering the
> > > > upcoming
> > > > > records from the stream. Now I know that some users may indeed want
> > to
> > > > > query it from the debugging perspective, but still I concerned about
> > > > > whether leveraging IQ for debugging purposes would be the right
> > solution
> > > > > here. And adding Materialized object opens the door to let users
> > query
> > > > > about it (unless we did something intentionally to still forbids
it),
> > > > which
> > > > > also restricts us in the future.
> > > > >
> > > > > 2. About the coupling between Materialized.name() and queryable:
> > again I
> > > > > think this is a valid issue. But I'm not sure if the current
> > > > > "withQuerryingDisabled / Enabled" at Materialized is the best
> > approach.
> > > > > Here I think I agree with John, that generally speaking it's better
> > be a
> > > > > control function on the `KTable` itself, rather than on
> > `Materialized`,
> > > > so
> > > > > fixing it via adding functions through `Materialized` seems not a
> > natural
> > > > > approach either.
> > > > >
> > > > >
> > > > > Overall, I'm thinking maybe we should still use two stones rather
> > than
> > > > one
> > > > > to kill these two birds, and probably for this KIP we just focus
on
> > 1)
> > > > > above. And for that I'd like to not expose the Materialized either
> > for
> > > > > rationales that I've listed above. Instead, we just restrict KIP-307
> > to
> > > > NOT
> > > > > use the Joined.name for state store names and always use internal
> > names
> > > > as
> > > > > well, which admittedly indeed leaves a hole of not being able to
> > cover
> > > > all
> > > > > internal names here, but now I feel this `hole` may better be filled
> > by,
> > > > > e.g. not creating changelog topics but just use the upstream to
> > > > > re-bootstrap the materialized store, more concretely: when
> > materializing
> > > > > the store, try to piggy-back the changelog topic on an existing
> > topic,
> > > > e.g.
> > > > > a) if the stream is coming directly from some source topic (including
> > > > > repartition topic), make that as changelog topic and if it is
> > repartition
> > > > > topic change the retention / data purging policy necessarily as
> > well; b)
> > > > if
> > > > > the stream is coming from some stateless operators, delegate that
> > > > stateless
> > > > > operator to the parent stream similar as a); if the stream is coming
> > > > from a
> > > > > stream-stream join which is the only stateful operator that can
> > result
> > > > in a
> > > > > stream, consider merging the join into multi-way joins (yes, this
is
> > a
> > > > very
> > > > > hand-wavy thought, but the point here is that we do not try to
> > tackle it
> > > > > now but leave it for a better solution :).
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jun 19, 2019 at 11:41 AM John Roesler <john@confluent.io>
> > wrote:
> > > > >
> > > > > > Hi Bill,
> > > > > >
> > > > > > Thanks for the KIP! Awesome job catching this unexpected
> > consequence
> > > > > > of the prior KIPs before it was released.
> > > > > >
> > > > > > The proposal looks good to me. On top of just fixing the problem,
> > it
> > > > > > seems to address two other pain points:
> > > > > > * that naming a state store automatically causes it to become
> > > > queriable.
> > > > > > * that there's currently no way to configure the bytes store
for
> > join
> > > > > > windows.
> > > > > >
> > > > > > It's awesome that we can fix this issue and two others with
one
> > > > feature.
> > > > > >
> > > > > > I'm wondering about a missing quadrant from the truth table
> > involving
> > > > > > whether a Materialized is stored or not and querying is
> > > > > > enabled/disabled... What should be the behavior if there is
no
> > store
> > > > > > configured (e.g., if Materialized with only serdes) and querying
is
> > > > > > enabled?
> > > > > >
> > > > > > It seems we have two choices:
> > > > > > 1. we can force creation of a state store in this case, so the
> > store
> > > > > > can be used to serve the queries
> > > > > > 2. we can provide just a queriable view, basically letting IQ
query
> > > > > > into the "KTableValueGetter", which would transparently construct
> > the
> > > > > > query response by applying the operator logic to the upstream
> > state if
> > > > > > the operator state isn't already stored.
> > > > > >
> > > > > > Offhand, it seems like the second is actually a pretty awesome
> > > > > > capability. But it might have an awkward interaction with the
> > current
> > > > > > semantics. Presently, if I provide a Materialized.withName,
it
> > implies
> > > > > > that querying should be enabled AND that the view should actually
> > be
> > > > > > stored in a state store. Under option 2 above, this behavior
would
> > > > > > change to NOT provision a state store and instead just consult
the
> > > > > > ValueGetter. To get back to the current behavior, users would
have
> > to
> > > > > > add a "bytes store supplier" to the Materialized to indicate
that,
> > > > > > yes, they really want a state store there.
> > > > > >
> > > > > > Behavior changes are always kind of scary, but I think in this
> > case,
> > > > > > it might actually be preferable. In the event where only the
name
> > is
> > > > > > provided, it means that people just wanted to make the operation
> > > > > > result queriable. If we automatically convert this to a non-stored
> > > > > > view, then simply upgrading results in the same observable behavior
> > > > > > and semantics, but a linear reduction in local storage requirements
> > > > > > and disk i/o, as well as a corresponding linear reduction in
memory
> > > > > > usage both on and off heap.
> > > > > >
> > > > > > What do you think?
> > > > > > -John
> > > > > >
> > > > > > On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <bbejeck@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > All,
> > > > > > >
> > > > > > > I'd like to start a discussion for adding a Materialized
> > > > configuration
> > > > > > > object to KStream.join for naming state stores involved
in joins.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> > > > > > >
> > > > > > > Your comments and suggestions are welcome.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bill
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
> --
> -- Guozhang

Mime
View raw message