kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Wed, 15 Aug 2018 21:51:55 GMT
I think Headers are not meant for user-space only: in fact, in KIP-258
there is also a proposal for compatibility of changelog topics relying on
Headers. But you have a good point how to avoid conflicting with user
header key space.

I think there is no absolute-safe ways to avoid conflicts, but we can still
consider using some name patterns to reduce the likelihood as much as
possible.. e.g. consider sth. like the internal topics naming: e.g.
"__internal_[name]"?

I'm cc'ing contributor of KIP-258 to share thoughts as well.


Guozhang

On Tue, Aug 14, 2018 at 7:44 PM, Adam Bellemare <adam.bellemare@gmail.com>
wrote:

> @Jan
> Thanks Jan. I take it you mean "key-widening" somehow includes information
> about which record is processed first? I understand about a CombinedKey
> with both the Foreign and Primary key, but I don't see how you track
> ordering metadata in there unless you actually included a metadata field in
> the key type as well.
>
> @Guozhang
> As Jan mentioned earlier, is Record Headers mean to strictly be used in
> just the user-space? It seems that it is possible that a collision on the
> (key,value) tuple I wish to add to it could occur. For instance, if I
> wanted to add a ("foreignKeyOffset",10) to the Headers but the user already
> specified their own header with the same key name, then it appears there
> would be a collision. (This is one of the issues I brought up in the KIP).
>
> --------------------------------
>
> I will be posting a prototype PR against trunk within the next day or two.
> One thing I need to point out is that my design very strictly wraps the
> entire foreignKeyJoin process entirely within the DSL function. There is no
> exposure of CombinedKeys or widened keys, nothing to resolve with regards
> to out-of-order processing and no need for the DSL user to even know what's
> going on inside of the function. The code simply returns the results of the
> join, keyed by the original key. Currently my API mirrors identically the
> format of the data returned by the regular join function, and I believe
> that this is very useful to many users of the DSL. It is my understanding
> that one of the main design goals of the DSL is to provide higher level
> functionality without requiring the users to know exactly what's going on
> under the hood. With this in mind, I thought it best to solve ordering and
> partitioning problems within the function and eliminate the requirement for
> users to do additional work after the fact to resolve the results of their
> join. Basically, I am assuming that most users of the DSL just "want it to
> work" and want it to be easy. I did this operating under the assumption
> that if a user truly wants to optimize their own workflow down to the
> finest details then they will break from strictly using the DSL and move
> down to the processors API.
>
> I updated the KIP today with some points worth talking about, should anyone
> be so inclined to check it out. Currently we are running this code in
> production to handle relational joins from our Kafka Connect topics, as per
> the original motivation of the KIP.
>
>
>
>
>
>
>
>
>
>
> I believe the foreignKeyJoin should be responsible for. In my
>
>
>
> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello Adam,
> >
> > As for your question regarding GraphNodes, it is for extending Streams
> > optimization framework. You can find more details on
> > https://issues.apache.org/jira/browse/KAFKA-6761.
> >
> > The main idea is that instead of directly building up the "physical
> > topology" (represented as Topology in the public package, and internally
> > built as the ProcessorTopology class) while users are specifying the
> > transformation operators, we first keep it as a "logical topology"
> > (represented as GraphNode inside InternalStreamsBuilder). And then only
> > execute the optimization and the construction of the "physical" Topology
> > when StreamsBuilder.build() is called.
> >
> > Back to your question, I think it makes more sense to add a new type of
> > StreamsGraphNode (maybe you can consider inheriting from the
> > BaseJoinProcessorNode). Note that although in the Topology we will have
> > multiple connected ProcessorNodes to represent a (foreign-key) join, we
> > still want to keep it as a single StreamsGraphNode, or just a couple of
> > them in the logical representation so that in the future we can construct
> > the physical topology differently (e.g. having another way than the
> current
> > distributed hash-join).
> >
> > -------------------------------------------------------
> >
> > Back to your questions to KIP-213, I think Jan has summarized it
> > pretty-well. Note that back then we do not have headers support so we
> have
> > to do such "key-widening" approach to ensure ordering.
> >
> >
> > Guozhang
> >
> >
> >
> > On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak <Jan.Filipiak@trivago.com
> >
> > wrote:
> >
> > > Hi Adam,
> > >
> > > I love how you are on to this already! I resolve this by
> "key-widening" I
> > > treat the result of FKA,and FKB differently.
> > > As you can see the output of my join has a Combined Key and therefore I
> > > can resolve the "race condition" in a group by
> > > if I so desire.
> > >
> > > I think this reflects more what happens under the hood and makes it
> more
> > > clear to the user what is going on. The Idea
> > > of hiding this behind metadata and handle it in the DSL is from my POV
> > > unideal.
> > >
> > > To write into your example:
> > >
> > > key + A, null)
> > > (key +B, <joined On FK =B>)
> > >
> > > is what my output would look like.
> > >
> > >
> > > Hope that makes sense :D
> > >
> > > Best Jan
> > >
> > >
> > >
> > >
> > >
> > > On 13.08.2018 18:16, Adam Bellemare wrote:
> > >
> > >> Hi Jan
> > >>
> > >> If you do not use headers or other metadata, how do you ensure that
> > >> changes
> > >> to the foreign-key value are not resolved out-of-order?
> > >> ie: If an event has FK = A, but you change it to FK = B, you need to
> > >> propagate both a delete (FK=A -> null) and an addition (FK=B). In my
> > >> solution, without maintaining any metadata, it is possible for the
> final
> > >> output to be in either order - the correctly updated joined value, or
> > the
> > >> null for the delete.
> > >>
> > >> (key, null)
> > >> (key, <joined On FK =B>)
> > >>
> > >> or
> > >>
> > >> (key, <joined On FK =B>)
> > >> (key, null)
> > >>
> > >> I looked back through your code and through the discussion threads,
> and
> > >> didn't see any information on how you resolved this. I have a version
> of
> > >> my
> > >> code working for 2.0, I am just adding more integration tests and will
> > >> update the KIP accordingly. Any insight you could provide on resolving
> > >> out-of-order semantics without metadata would be helpful.
> > >>
> > >> Thanks
> > >> Adam
> > >>
> > >>
> > >> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
> Jan.Filipiak@trivago.com
> > >
> > >> wrote:
> > >>
> > >> Hi,
> > >>>
> > >>> Happy to see that you want to make an effort here.
> > >>>
> > >>> Regarding the ProcessSuppliers I couldn't find a way to not rewrite
> the
> > >>> joiners + the merger.
> > >>> The re-partitioners can be reused in theory. I don't know if
> > repartition
> > >>> is optimized in 2.0 now.
> > >>>
> > >>> I made this
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+
> > >>> KTable+repartition+with+compacted+Topics
> > >>> back then and we are running KIP-213 with KIP-241 in combination.
> > >>>
> > >>> For us it is vital as it minimized the size we had in our repartition
> > >>> topics plus it removed the factor of 2 in events on every message.
> > >>> I know about this new  "delete once consumer has read it".  I don't
> > think
> > >>> 241 is vital for all usecases, for ours it is. I wanted
> > >>> to use 213 to sneak in the foundations for 241 aswell.
> > >>>
> > >>> I don't quite understand what a PropagationWrapper is, but I am
> certain
> > >>> that you do not need RecordHeaders
> > >>> for 213 and I would try to leave them out. They either belong to the
> > DSL
> > >>> or to the user, having a mixed use is
> > >>> to be avoided. We run the join with 0.8 logformat and I don't think
> one
> > >>> needs more.
> > >>>
> > >>> This KIP will be very valuable for the streams project! I couldn't
> > never
> > >>> convince myself to invest into the 1.0+ DSL
> > >>> as I used almost all my energy to fight against it. Maybe this can
> also
> > >>> help me see the good sides a little bit more.
> > >>>
> > >>> If there is anything unclear with all the text that has been written,
> > >>> feel
> > >>> free to just directly cc me so I don't miss it on
> > >>> the mailing list.
> > >>>
> > >>> Best Jan
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On 08.08.2018 15:26, Adam Bellemare wrote:
> > >>>
> > >>> More followup, and +dev as Guozhang replied to me directly
> previously.
> > >>>>
> > >>>> I am currently porting the code over to trunk. One of the major
> > changes
> > >>>> since 1.0 is the usage of GraphNodes. I have a question about this:
> > >>>>
> > >>>> For a foreignKey joiner, should it have its own dedicated node
type?
> > Or
> > >>>> would it be advisable to construct it from existing GraphNode
> > >>>> components?
> > >>>> For instance, I believe I could construct it from several
> > >>>> OptimizableRepartitionNode, some SinkNode, some SourceNode, and
> > several
> > >>>> StatefulProcessorNode. That being said, there is some underlying
> > >>>> complexity
> > >>>> to each approach.
> > >>>>
> > >>>> I will be switching the KIP-213 to use the RecordHeaders in Kafka
> > >>>> Streams
> > >>>> instead of the PropagationWrapper, but conceptually it should be
the
> > >>>> same.
> > >>>>
> > >>>> Again, any feedback is welcomed...
> > >>>>
> > >>>>
> > >>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <
> > >>>> adam.bellemare@gmail.com
> > >>>> wrote:
> > >>>>
> > >>>> Hi Guozhang et al
> > >>>>
> > >>>>> I was just reading the 2.0 release notes and noticed a section
on
> > >>>>> Record
> > >>>>> Headers.
> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API
> > >>>>>
> > >>>>> I am not yet sure if the contents of a RecordHeader is propagated
> all
> > >>>>> the
> > >>>>> way through the Sinks and Sources, but if it is, and if it
remains
> > >>>>> attached
> > >>>>> to the record (including null records) I may be able to ditch
the
> > >>>>> propagationWrapper for an implementation using RecordHeader.
I am
> not
> > >>>>> yet
> > >>>>> sure if this is doable, so if anyone understands RecordHeader
impl
> > >>>>> better
> > >>>>> than I, I would be happy to hear from you.
> > >>>>>
> > >>>>> In the meantime, let me know of any questions. I believe this
PR
> has
> > a
> > >>>>> lot
> > >>>>> of potential to solve problems for other people, as I have
> > encountered
> > >>>>> a
> > >>>>> number of other companies in the wild all home-brewing their
own
> > >>>>> solutions
> > >>>>> to come up with a method of handling relational data in streams.
> > >>>>>
> > >>>>> Adam
> > >>>>>
> > >>>>>
> > >>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang <wangguoz@gmail.com
> >
> > >>>>> wrote:
> > >>>>>
> > >>>>> Hello Adam,
> > >>>>>
> > >>>>>> Thanks for rebooting the discussion of this KIP ! Let me
finish my
> > >>>>>> pass
> > >>>>>> on the wiki and get back to you soon. Sorry for the delays..
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
> > >>>>>> adam.bellemare@gmail.com
> > >>>>>>
> > >>>>>> wrote:
> > >>>>>>> Let me kick this off with a few starting points that
I would like
> > to
> > >>>>>>> generate some discussion on.
> > >>>>>>>
> > >>>>>>> 1) It seems to me that I will need to repartition the
data twice
> -
> > >>>>>>> once
> > >>>>>>> on
> > >>>>>>> the foreign key, and once back to the primary key.
Is there
> > anything
> > >>>>>>> I
> > >>>>>>> am
> > >>>>>>> missing here?
> > >>>>>>>
> > >>>>>>> 2) I believe I will also need to materialize 3 state
stores: the
> > >>>>>>> prefixScan
> > >>>>>>> SS, the highwater mark SS (for out-of-order resolution)
and the
> > final
> > >>>>>>> state
> > >>>>>>> store, due to the workflow I have laid out. I have
not thought
> of a
> > >>>>>>> better
> > >>>>>>> way yet, but would appreciate any input on this matter.
I have
> gone
> > >>>>>>> back
> > >>>>>>> through the mailing list for the previous discussions
on this
> KIP,
> > >>>>>>> and
> > >>>>>>> I
> > >>>>>>> did not see anything relating to resolving out-of-order
compute.
> I
> > >>>>>>> cannot
> > >>>>>>> see a way around the current three-SS structure that
I have.
> > >>>>>>>
> > >>>>>>> 3) Caching is disabled on the prefixScan SS, as I do
not know how
> > to
> > >>>>>>> resolve the iterator obtained from rocksDB with that
of the
> cache.
> > In
> > >>>>>>> addition, I must ensure everything is flushed before
scanning.
> > Since
> > >>>>>>> the
> > >>>>>>> materialized prefixScan SS is under "control" of the
function, I
> do
> > >>>>>>> not
> > >>>>>>> anticipate this to be a problem. Performance throughput
will need
> > to
> > >>>>>>> be
> > >>>>>>> tested, but as Jan observed in his initial overview
of this
> issue,
> > it
> > >>>>>>> is
> > >>>>>>> generally a surge of output events which affect performance
> moreso
> > >>>>>>> than
> > >>>>>>> the
> > >>>>>>> flush or prefixScan itself.
> > >>>>>>>
> > >>>>>>> Thoughts on any of these are greatly appreciated, since
these
> > >>>>>>> elements
> > >>>>>>> are
> > >>>>>>> really the cornerstone of the whole design. I can put
up the
> code I
> > >>>>>>> have
> > >>>>>>> written against 1.0.2 if we so desire, but first I
was hoping to
> > just
> > >>>>>>> tackle some of the fundamental design proposals.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> Adam
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
> > >>>>>>> adam.bellemare@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Here is the new discussion thread for KIP-213. I picked
back up
> on
> > >>>>>>> the
> > >>>>>>> KIP
> > >>>>>>>
> > >>>>>>> as this is something that we too at Flipp are now running
in
> > >>>>>>>>
> > >>>>>>>> production.
> > >>>>>>>
> > >>>>>>> Jan started this last year, and I know that Trivago
is also using
> > >>>>>>>>
> > >>>>>>>> something
> > >>>>>>>
> > >>>>>>> similar in production, at least in terms of APIs and
> functionality.
> > >>>>>>>>
> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>> 213+Support+non-key+joining+in+KTable
> > >>>>>>>>
> > >>>>>>>> I do have an implementation of the code for Kafka
1.0.2 (our
> local
> > >>>>>>>> production version) but I won't post it yet as
I would like to
> > focus
> > >>>>>>>>
> > >>>>>>>> on the
> > >>>>>>>
> > >>>>>>> workflow and design first. That being said, I also
need to add
> some
> > >>>>>>>>
> > >>>>>>>> clearer
> > >>>>>>>
> > >>>>>>> integration tests (I did a lot of testing using a non-Kafka
> Streams
> > >>>>>>>> framework) and clean up the code a bit more before
putting it
> in a
> > >>>>>>>> PR
> > >>>>>>>> against trunk (I can do so later this week likely).
> > >>>>>>>>
> > >>>>>>>> Please take a look,
> > >>>>>>>>
> > >>>>>>>> Thanks
> > >>>>>>>>
> > >>>>>>>> Adam Bellemare
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message