kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-479: Add Materialized to Join
Date Mon, 24 Jun 2019 00:52:05 GMT
Hi Bill,

I think by giving a Materialized param into stream-stream join, it's okay
(though still ideal) to say "we still would not expose the store for
queries", but it would sound a bit awkward to say "we would also ignore
whatever the passed in store supplier but just use our default ones" --
again the concern is that, if in the future we'd want to change the default
implementation of join algorithm which no longer rely on a window store
with deduping enabled, then we need to change this API again by changing
the store supplier type.

If we do want to fill this hole for stream-stream join, I feel just adding
a String typed store-name would even be less future-intrusive if we expect
this parameter to be modified later.

Does that makes sense?


Guozhang

On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbejeck@gmail.com> wrote:

> Thanks for the comments John and Guozhang, I'll address each one of your
> comments in turn.
>
> John,
>
> > I'm wondering about a missing quadrant from the truth table involving
> > whether a Materialized is stored or not and querying is
> > enabled/disabled... What should be the behavior if there is no store
> > configured (e.g., if Materialized with only serdes) and querying is
> enabled?
>
> > It seems we have two choices:
> > 1. we can force creation of a state store in this case, so the store
> > can be used to serve the queries
> > 2. we can provide just a queriable view, basically letting IQ query
> > into the "KTableValueGetter", which would transparently construct the
> > query response by applying the operator logic to the upstream state if
> > the operator state isn't already stored.
>
>
> I agree with your assertion about a missing quadrant from the truth table.
> Additionally, I too like the concept of a queriable view.  But I think that
> goes a bit beyond the scope of this KIP and would like to pursue that
> feature as follow-on work.  Also thinking about this KIP some more, I'm
> thinking of the changes to Materialized might be a reach as well.
> Separating the naming from a store and its queryable state seems like a
> complex issue in and of itself and should be treated accordingly.
>
> So here's what I'm thinking now.  We add Materialzied to Join, but for now,
> we internally disable querying.  I know this breaks our current semantic
> approach, but I think it's crucial that we do two things in this KIP
>
>    1. Break the naming of the state stores from Joined to Materialized, so
>    the naming of state stores follows our current pattern and enables
> upgrades
>    from 2.3 to 2.4
>    2. Offer the ability to configure the state stores of the join, even
>    providing a different implementation (i.e. in-memory) if desired.
>
> With that in mind I'm considering changing the KIP to remove the changes to
> Materialized, and we document very clearly that by providing a Materialized
> object with a name is only for naming the state store, hence the changelog
> topics and any possible configurations of the store, but this store *will
> not be available for IQ.*
>
> WDYT?
>
> Guozhang,
>
> > 1. About not breaking compatibility of stream-stream join materialized
> > stores: I think this is a valid issue to tackle, but after thinking about
> > it once more I'm not sure if exposing Materialized would be a good
> solution
> > here. My rationles:
> >
> > 1.a For stream-stream join, our current usage of window-store is not
> ideal,
> > and we want to modify it in the near future to be more efficient. Not
> > allowing users to override such state store backend gives us such freedom
> > (which was also considered in the original DSL design), whereas getting a
> > Materialized<WindowStore> basically kicks out that freedom out of the
> > window.
> > 1.b For strema-stream join, in our original design we intend to "never"
> > want users to query the state, since it is just for buffering the
> upcoming
> > records from the stream. Now I know that some users may indeed want to
> > query it from the debugging perspective, but still I concerned about
> > whether leveraging IQ for debugging purposes would be the right solution
> > here. And adding Materialized object opens the door to let users query
> > about it (unless we did something intentionally to still forbids it),
> which
> > also restricts us in the future.
> >
> > 2. About the coupling between Materialized.name() and queryable: again I
> > think this is a valid issue. But I'm not sure if the current
> > "withQuerryingDisabled / Enabled" at Materialized is the best approach.
> > Here I think I agree with John, that generally speaking it's better be a
> > control function on the `KTable` itself, rather than on `Materialized`,
> so
> > fixing it via adding functions through `Materialized` seems not a natural
> approach either.
>
> I understand your thoughts here, and up to a point, I agree with you.
> But concerning not providing Materialized as it may restrict us in the
> future for delivering different implementations, I'm wondering if we are
> doing some premature optimization here.
> My rationale for saying so
>
>    1. I think the cost of not allowing the naming of state stores for joins
>    is too big of a gap to leave.   IMHO for joins to follow the current
>    pattern of using Materialized for naming state stores would be what most
>    users would expect to use.  As I said in my comments above, I think we
>    should *not include* the changes to Materialized and enforce named
>    stores for joins as unavailable for IQ.
>    2. We'll still have the join methods available without a Materialized
>    allowing us to do something different internally if a Materialized is
> not
>    provided.
>
>
> > Overall, I'm thinking maybe we should still use two stones rather than
> one
> > to kill these two birds, and probably for this KIP we just focus on 1)
> > above. And for that I'd like to not expose the Materialized either for
> > rationales that I've listed above. Instead, we just restrict KIP-307 to
> NOT
> > use the Joined.name for state store names and always use internal names
> as
> > well, which admittedly indeed leaves a hole of not being able to cover
> all
> > internal names here, but now I feel this `hole` may better be filled by,
> > e.g. not creating changelog topics but just use the upstream to
> > re-bootstrap the materialized store, more concretely: when materializing
> > the store, try to piggy-back the changelog topic on an existing topic,
> e.g.
> > a) if the stream is coming directly from some source topic (including
> > repartition topic), make that as changelog topic and if it is repartition
> > topic change the retention / data purging policy necessarily as well; b)
> if
> > the stream is coming from some stateless operators, delegate that
> stateless
> > operator to the parent stream similar as a); if the stream is coming from
> a
> > stream-stream join which is the only stateful operator that can result in
> a
> > stream, consider merging the join into multi-way joins (yes, this is a
> very
> > hand-wavy thought, but the point here is that we do not try to tackle it
> > now but leave it for a better solution :).
>
> I really like this idea!  I agree with you in that this approach to too
> much for adding in this KIP, but we could pick it up later and leverage the
> Optimization framework to accomplish this re-use.
> Again, while I agree we should break the naming of join state stores from
> KIP-307, IMHO it's something we should fix now as it will be the last piece
> we can provide to give users the ability to completely make their
> topologies "upgrade proof" when adding additional operations.
>
> Thanks again to both of you for comments and I look forward to hearing back
> from you.
>
> Regards,
> Bill
>
>
>
>
>
>
>
> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello Bill,
> >
> > Thanks for the KIP. Glad to see that we can likely shooting two birds
> with
> > one stone. I have some concerns though about those "two birds"
> themselves:
> >
> > 1. About not breaking compatibility of stream-stream join materialized
> > stores: I think this is a valid issue to tackle, but after thinking about
> > it once more I'm not sure if exposing Materialized would be a good
> solution
> > here. My rationles:
> >
> > 1.a For stream-stream join, our current usage of window-store is not
> ideal,
> > and we want to modify it in the near future to be more efficient. Not
> > allowing users to override such state store backend gives us such freedom
> > (which was also considered in the original DSL design), whereas getting a
> > Materialized<WindowStore> basically kicks out that freedom out of the
> > window.
> > 1.b For strema-stream join, in our original design we intend to "never"
> > want users to query the state, since it is just for buffering the
> upcoming
> > records from the stream. Now I know that some users may indeed want to
> > query it from the debugging perspective, but still I concerned about
> > whether leveraging IQ for debugging purposes would be the right solution
> > here. And adding Materialized object opens the door to let users query
> > about it (unless we did something intentionally to still forbids it),
> which
> > also restricts us in the future.
> >
> > 2. About the coupling between Materialized.name() and queryable: again I
> > think this is a valid issue. But I'm not sure if the current
> > "withQuerryingDisabled / Enabled" at Materialized is the best approach.
> > Here I think I agree with John, that generally speaking it's better be a
> > control function on the `KTable` itself, rather than on `Materialized`,
> so
> > fixing it via adding functions through `Materialized` seems not a natural
> > approach either.
> >
> >
> > Overall, I'm thinking maybe we should still use two stones rather than
> one
> > to kill these two birds, and probably for this KIP we just focus on 1)
> > above. And for that I'd like to not expose the Materialized either for
> > rationales that I've listed above. Instead, we just restrict KIP-307 to
> NOT
> > use the Joined.name for state store names and always use internal names
> as
> > well, which admittedly indeed leaves a hole of not being able to cover
> all
> > internal names here, but now I feel this `hole` may better be filled by,
> > e.g. not creating changelog topics but just use the upstream to
> > re-bootstrap the materialized store, more concretely: when materializing
> > the store, try to piggy-back the changelog topic on an existing topic,
> e.g.
> > a) if the stream is coming directly from some source topic (including
> > repartition topic), make that as changelog topic and if it is repartition
> > topic change the retention / data purging policy necessarily as well; b)
> if
> > the stream is coming from some stateless operators, delegate that
> stateless
> > operator to the parent stream similar as a); if the stream is coming
> from a
> > stream-stream join which is the only stateful operator that can result
> in a
> > stream, consider merging the join into multi-way joins (yes, this is a
> very
> > hand-wavy thought, but the point here is that we do not try to tackle it
> > now but leave it for a better solution :).
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jun 19, 2019 at 11:41 AM John Roesler <john@confluent.io> wrote:
> >
> > > Hi Bill,
> > >
> > > Thanks for the KIP! Awesome job catching this unexpected consequence
> > > of the prior KIPs before it was released.
> > >
> > > The proposal looks good to me. On top of just fixing the problem, it
> > > seems to address two other pain points:
> > > * that naming a state store automatically causes it to become
> queriable.
> > > * that there's currently no way to configure the bytes store for join
> > > windows.
> > >
> > > It's awesome that we can fix this issue and two others with one
> feature.
> > >
> > > I'm wondering about a missing quadrant from the truth table involving
> > > whether a Materialized is stored or not and querying is
> > > enabled/disabled... What should be the behavior if there is no store
> > > configured (e.g., if Materialized with only serdes) and querying is
> > > enabled?
> > >
> > > It seems we have two choices:
> > > 1. we can force creation of a state store in this case, so the store
> > > can be used to serve the queries
> > > 2. we can provide just a queriable view, basically letting IQ query
> > > into the "KTableValueGetter", which would transparently construct the
> > > query response by applying the operator logic to the upstream state if
> > > the operator state isn't already stored.
> > >
> > > Offhand, it seems like the second is actually a pretty awesome
> > > capability. But it might have an awkward interaction with the current
> > > semantics. Presently, if I provide a Materialized.withName, it implies
> > > that querying should be enabled AND that the view should actually be
> > > stored in a state store. Under option 2 above, this behavior would
> > > change to NOT provision a state store and instead just consult the
> > > ValueGetter. To get back to the current behavior, users would have to
> > > add a "bytes store supplier" to the Materialized to indicate that,
> > > yes, they really want a state store there.
> > >
> > > Behavior changes are always kind of scary, but I think in this case,
> > > it might actually be preferable. In the event where only the name is
> > > provided, it means that people just wanted to make the operation
> > > result queriable. If we automatically convert this to a non-stored
> > > view, then simply upgrading results in the same observable behavior
> > > and semantics, but a linear reduction in local storage requirements
> > > and disk i/o, as well as a corresponding linear reduction in memory
> > > usage both on and off heap.
> > >
> > > What do you think?
> > > -John
> > >
> > > On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <bbejeck@gmail.com> wrote:
> > > >
> > > > All,
> > > >
> > > > I'd like to start a discussion for adding a Materialized
> configuration
> > > > object to KStream.join for naming state stores involved in joins.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> > > >
> > > > Your comments and suggestions are welcome.
> > > >
> > > > Thanks,
> > > > Bill
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message