kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Bellemare <adam.bellem...@gmail.com>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Mon, 03 Sep 2018 20:55:27 GMT
Hi Jan

Thank you for taking the time to look into my PR. I have updated it
accordingly along with the suggestions from John. Please note that I am by
no means an expert on Java, so I really do appreciate any Java-specific
feedback you may have. Do not worry about being overly verbose on it.

You are correct with regards to the highwater mark growing unbounded. One
option would be to implement the rocksDB TTL to expire records. I am open
to other options as well.

I have tried to detail the reasoning behind it in the KIP - I have added
additional comments and I hope that it is clearer now.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-MultipleRapidForeign-KeyValueChanges-Whyahighwatertableisrequired
.

Please keep in mind that there may be something about ordering guarantees
that I am not aware of. As far as I know, once you begin to operate on
events in parallel across different nodes within the processor API, there
are no ordering guarantees and everything is simple first-come,
first-served(processed). If this is not the case then I am unaware of that
fact.



Thanks

Adam





On Mon, Sep 3, 2018 at 8:38 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
wrote:

> Finished my deeper scan on your approach.
> Most of the comments I put at the PR are minor code style things.
> One forward call seems to be a bug though, would be great if you could
> double check.
>
> the one problem I see is that the high watermark store grows unbounded.
> A key being deleted from the source table does not lead to deletion in the
> watermark store.
>
> I also don't quite grasp the concept why it's needed.  I think the whole
> offset part can go away?
> It seems to deal with node failures of some kind but everything should
> turn out okay without it?
>
> Best Jan
>
>
> On 01.09.2018 20:44, Guozhang Wang wrote:
>
>> Yes Adam, that makes sense.
>>
>> I think it may be better to have a working PR to review before we complete
>> the VOTE thread. In my previous experience a large feature like this are
>> mostly definitely going to miss some devils in the details in the design
>> and wiki discussion phases.
>>
>> That would unfortunately mean that your implementations may need to be
>> modified / updated along with the review and further KIP discussion. I can
>> understand this can be painful, but that may be the best option we can do
>> to avoid as much work to be wasted as possible.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Aug 29, 2018 at 10:06 AM, Adam Bellemare <
>> adam.bellemare@gmail.com>
>> wrote:
>>
>> Hi Guozhang
>>>
>>> By workflow I mean just the overall process of how the KIP is
>>> implemented.
>>> Any ideas on the ways to reduce the topic count, materializations, if
>>> there
>>> is a better way to resolve out-of-order than a highwater mark table, if
>>> the
>>> design philosophy of “keep everything encapsulated within the join
>>> function” is appropriate, etc. I can implement the changes that John
>>> suggested, but if my overall workflow is not acceptable I would rather
>>> address that before making minor changes.
>>>
>>> If this requires a full candidate PR ready to go to prod then I can make
>>> those changes. Hope that clears things up.
>>>
>>> Thanks
>>>
>>> Adam
>>>
>>> On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>>>>
>>>> Hi Adam,
>>>>
>>>> What do you mean by "additional comments on the workflow.", do you mean
>>>>
>>> to
>>>
>>>> let other review your PR https://github.com/apache/kafka/pull/5527 ? Is
>>>>
>>> is
>>>
>>>> ready for reviews?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare <
>>>>
>>> adam.bellemare@gmail.com>
>>>
>>>> wrote:
>>>>
>>>> Okay, I will implement John's suggestion of namespacing the external
>>>>> headers prior to processing, and then removing the namespacing prior to
>>>>> emitting. A potential future KIP could be to provide this namespacing
>>>>> automatically.
>>>>>
>>>>> I would also appreciate any other additional comments on the workflow.
>>>>>
>>>> My
>>>
>>>> goal is suss out agreement prior to moving to a vote.
>>>>>
>>>>> On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangguoz@gmail.com>
>>>>>>
>>>>> wrote:
>>>
>>>> I like John's idea as well: for this KIP specifically as we do not
>>>>>>
>>>>> expect
>>>
>>>> any other consumers to read the repartition topics externally, we can
>>>>>> slightly prefix the header to be safe, while keeping the additional
>>>>>>
>>>>> cost
>>>
>>>> (note the header field is per-record, so any additional byte is
>>>>>>
>>>>> per-record
>>>>>
>>>>>> as well) low.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare <
>>>>>>
>>>>> adam.bellemare@gmail.com
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi John
>>>>>>>
>>>>>>> That is an excellent idea. The header usage I propose would be
>>>>>>> limited
>>>>>>> entirely to internal topics, and this could very well be the solution
>>>>>>>
>>>>>> to
>>>>>
>>>>>> potential conflicts. If we do not officially reserve a prefix "__"
>>>>>>>
>>>>>> then I
>>>>>
>>>>>> think this would be the safest idea, as it would entirely avoid any
>>>>>>> accidents (perhaps if a company is using its own "__" prefix for
>>>>>>> other
>>>>>>> reasons).
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Adam
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <john@confluent.io>
>>>>>>>
>>>>>> wrote:
>>>>>
>>>>>> Just a quick thought regarding headers:
>>>>>>>>
>>>>>>>>> I think there is no absolute-safe ways to avoid conflicts, but we
>>>>>>>>>
>>>>>>>> can
>>>>>
>>>>>> still
>>>>>>>>
>>>>>>>>> consider using some name patterns to reduce the likelihood as much
>>>>>>>>>
>>>>>>>> as
>>>>>
>>>>>> possible.. e.g. consider sth. like the internal topics naming: e.g.
>>>>>>>>> "__internal_[name]"?
>>>>>>>>>
>>>>>>>> I think there is a safe way to avoid conflicts, since these headers
>>>>>>>>
>>>>>>> are
>>>>>
>>>>>> only needed in internal topics (I think):
>>>>>>>> For internal and changelog topics, we can namespace all headers:
>>>>>>>> * user-defined headers are namespaced as "external." + headerKey
>>>>>>>> * internal headers are namespaced as "internal." + headerKey
>>>>>>>>
>>>>>>>> This is a lot of characters, so we could use a sigil instead (e.g.,
>>>>>>>>
>>>>>>> "_"
>>>>>
>>>>>> for
>>>>>>>
>>>>>>>> internal, "~" for external)
>>>>>>>>
>>>>>>>> We simply apply the namespacing when we read user headers from
>>>>>>>>
>>>>>>> external
>>>>>
>>>>>> topics into the topology and then de-namespace them before we emit
>>>>>>>>
>>>>>>> them
>>>>>
>>>>>> to
>>>>>>>
>>>>>>>> an external topic (via "to" or "through").
>>>>>>>> Now, it is not possible to collide with user-defined headers.
>>>>>>>>
>>>>>>>> That said, I'd also be fine with just reserving "__" as a header
>>>>>>>>
>>>>>>> prefix
>>>>>
>>>>>> and
>>>>>>>
>>>>>>>> not worrying about collisions.
>>>>>>>>
>>>>>>>> Thanks for the KIP,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak <
>>>>>>>>
>>>>>>> Jan.Filipiak@trivago.com
>>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>> Still havent completly grabbed it.
>>>>>>>>> sorry will read more
>>>>>>>>>
>>>>>>>>> On 17.08.2018 21:23, Jan Filipiak wrote:
>>>>>>>>>> Cool stuff.
>>>>>>>>>>
>>>>>>>>>> I made some random remarks. Did not touch the core of the
>>>>>>>>>>
>>>>>>>>> algorithm
>>>>>
>>>>>> yet.
>>>>>>>>
>>>>>>>>> Will do Monday 100%
>>>>>>>>>>
>>>>>>>>>> I don't see Interactive Queries :) like that!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 17.08.2018 20:28, Adam Bellemare wrote:
>>>>>>>>>>> I have submitted a PR with my code against trunk:
>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>>>>>>>>>>>
>>>>>>>>>>> Do I continue on this thread or do we begin a new one for
>>>>>>>>>>>
>>>>>>>>>> discussion?
>>>>>>>
>>>>>>>> On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <
>>>>>>>>>>>
>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> even before message headers, the option for me always existed
>>>>>>>>>>>>
>>>>>>>>>>> to
>>>>>
>>>>>> just wrap
>>>>>>>>>>>> the messages into my own custom envelop.
>>>>>>>>>>>> So I of course thought this through. One sentence in your last
>>>>>>>>>>>>
>>>>>>>>>>> email
>>>>>>>
>>>>>>>> triggered all the thought process I put in the back then
>>>>>>>>>>>> again to design it in the, what i think is the "kafka-way". It
>>>>>>>>>>>>
>>>>>>>>>>> ended
>>>>>>>
>>>>>>>> up
>>>>>>>>
>>>>>>>>> ranting a little about what happened in the past.
>>>>>>>>>>>>
>>>>>>>>>>>> I see plenty of colleagues of mine falling into traps in the
>>>>>>>>>>>>
>>>>>>>>>>> API,
>>>>>
>>>>>> that I
>>>>>>>>>>>> did warn about in the 1.0 DSL rewrite. I have the same
>>>>>>>>>>>> feeling again. So I hope it gives you some insights into my
>>>>>>>>>>>>
>>>>>>>>>>> though
>>>>>>
>>>>>>> process. I am aware that since i never ported 213 to higher
>>>>>>>>>>>> streams version, I don't really have a steak here and
>>>>>>>>>>>>
>>>>>>>>>>> initially I
>>>>>
>>>>>> didn't
>>>>>>>>>>>> feel like actually sending it. But maybe you can pull
>>>>>>>>>>>> something good from it.
>>>>>>>>>>>>
>>>>>>>>>>>>   Best jan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 15.08.2018 04:44, Adam Bellemare wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>> Thanks Jan. I take it you mean "key-widening" somehow includes
>>>>>>>>>>>>> information
>>>>>>>>>>>>> about which record is processed first? I understand about a
>>>>>>>>>>>>> CombinedKey
>>>>>>>>>>>>> with both the Foreign and Primary key, but I don't see how you
>>>>>>>>>>>>>
>>>>>>>>>>>> track
>>>>>>>
>>>>>>>> ordering metadata in there unless you actually included a
>>>>>>>>>>>>>
>>>>>>>>>>>> metadata
>>>>>>
>>>>>>> field
>>>>>>>>>>>>> in
>>>>>>>>>>>>> the key type as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Guozhang
>>>>>>>>>>>>> As Jan mentioned earlier, is Record Headers mean to strictly
>>>>>>>>>>>>>
>>>>>>>>>>>> be
>>>>>
>>>>>> used in
>>>>>>>>>>>>> just the user-space? It seems that it is possible that a
>>>>>>>>>>>>>
>>>>>>>>>>>> collision
>>>>>>
>>>>>>> on the
>>>>>>>>>>>>> (key,value) tuple I wish to add to it could occur. For
>>>>>>>>>>>>>
>>>>>>>>>>>> instance,
>>>>>
>>>>>> if
>>>>>>>
>>>>>>>> I
>>>>>>>>
>>>>>>>>> wanted to add a ("foreignKeyOffset",10) to the Headers but the
>>>>>>>>>>>>>
>>>>>>>>>>>> user
>>>>>>>
>>>>>>>> already
>>>>>>>>>>>>> specified their own header with the same key name, then it
>>>>>>>>>>>>>
>>>>>>>>>>>> appears
>>>>>>
>>>>>>> there
>>>>>>>>>>>>> would be a collision. (This is one of the issues I brought up
>>>>>>>>>>>>>
>>>>>>>>>>>> in
>>>>>
>>>>>> the KIP).
>>>>>>>>>>>>>
>>>>>>>>>>>>> --------------------------------
>>>>>>>>>>>>>
>>>>>>>>>>>>> I will be posting a prototype PR against trunk within the next
>>>>>>>>>>>>>
>>>>>>>>>>>> day
>>>>>>
>>>>>>> or two.
>>>>>>>>>>>>> One thing I need to point out is that my design very strictly
>>>>>>>>>>>>>
>>>>>>>>>>>> wraps
>>>>>>>
>>>>>>>> the
>>>>>>>>>>>>> entire foreignKeyJoin process entirely within the DSL
>>>>>>>>>>>>>
>>>>>>>>>>>> function.
>>>>>
>>>>>> There is
>>>>>>>>>>>>> no
>>>>>>>>>>>>> exposure of CombinedKeys or widened keys, nothing to resolve
>>>>>>>>>>>>>
>>>>>>>>>>>> with
>>>>>>
>>>>>>> regards
>>>>>>>>>>>>> to out-of-order processing and no need for the DSL user to
>>>>>>>>>>>>>
>>>>>>>>>>>> even
>>>>>
>>>>>> know
>>>>>>>
>>>>>>>> what's
>>>>>>>>>>>>> going on inside of the function. The code simply returns the
>>>>>>>>>>>>> results of
>>>>>>>>>>>>> the
>>>>>>>>>>>>> join, keyed by the original key. Currently my API mirrors
>>>>>>>>>>>>> identically the
>>>>>>>>>>>>> format of the data returned by the regular join function, and
>>>>>>>>>>>>>
>>>>>>>>>>>> I
>>>>>
>>>>>> believe
>>>>>>>>>>>>> that this is very useful to many users of the DSL. It is my
>>>>>>>>>>>>> understanding
>>>>>>>>>>>>> that one of the main design goals of the DSL is to provide
>>>>>>>>>>>>>
>>>>>>>>>>>> higher
>>>>>>
>>>>>>> level
>>>>>>>>>>>>> functionality without requiring the users to know exactly
>>>>>>>>>>>>>
>>>>>>>>>>>> what's
>>>>>
>>>>>> going on
>>>>>>>>>>>>> under the hood. With this in mind, I thought it best to solve
>>>>>>>>>>>>> ordering and
>>>>>>>>>>>>> partitioning problems within the function and eliminate the
>>>>>>>>>>>>> requirement
>>>>>>>>>>>>> for
>>>>>>>>>>>>> users to do additional work after the fact to resolve the
>>>>>>>>>>>>>
>>>>>>>>>>>> results
>>>>>>
>>>>>>> of their
>>>>>>>>>>>>> join. Basically, I am assuming that most users of the DSL just
>>>>>>>>>>>>> "want it to
>>>>>>>>>>>>> work" and want it to be easy. I did this operating under the
>>>>>>>>>>>>> assumption
>>>>>>>>>>>>> that if a user truly wants to optimize their own workflow down
>>>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> finest details then they will break from strictly using the
>>>>>>>>>>>>>
>>>>>>>>>>>> DSL
>>>>>
>>>>>> and
>>>>>>>
>>>>>>>> move
>>>>>>>>>>>>> down to the processors API.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think. The abstraction is not powerful enough
>>>>>>>>>>>> to not have kafka specifics leak up The leak I currently think
>>>>>>>>>>>>
>>>>>>>>>>> this
>>>>>>
>>>>>>> has is
>>>>>>>>>>>> that you can not reliable prevent the delete coming out first,
>>>>>>>>>>>> before you emit the correct new record. As it is an abstraction
>>>>>>>>>>>> entirely
>>>>>>>>>>>> around kafka.
>>>>>>>>>>>> I can only recommend to not to. Honesty and simplicity should
>>>>>>>>>>>>
>>>>>>>>>>> always
>>>>>>>
>>>>>>>> be
>>>>>>>>
>>>>>>>>> first prio
>>>>>>>>>>>> trying to hide this just makes it more complex, less
>>>>>>>>>>>>
>>>>>>>>>>> understandable
>>>>>>
>>>>>>> and
>>>>>>>>
>>>>>>>>> will lead to mistakes
>>>>>>>>>>>> in usage.
>>>>>>>>>>>>
>>>>>>>>>>>> Exactly why I am also in big disfavour of GraphNodes and later
>>>>>>>>>>>> optimization stages.
>>>>>>>>>>>> Can someone give me an example of an optimisation that really
>>>>>>>>>>>>
>>>>>>>>>>> can't
>>>>>>
>>>>>>> be
>>>>>>>>
>>>>>>>>> handled by the user
>>>>>>>>>>>> constructing his topology differently?
>>>>>>>>>>>> Having reusable Processor API components accessible by the DSL
>>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>
>>>>>>> composable as
>>>>>>>>>>>> one likes is exactly where DSL should max out and KSQL should
>>>>>>>>>>>>
>>>>>>>>>>> do
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> next
>>>>>>>>>>>> step.
>>>>>>>>>>>> I find it very unprofessional from a software engineering
>>>>>>>>>>>>
>>>>>>>>>>> approach
>>>>>>
>>>>>>> to run
>>>>>>>>>>>> software where
>>>>>>>>>>>> you can not at least senseful reason about the inner workings
>>>>>>>>>>>>
>>>>>>>>>>> of
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> libraries used.
>>>>>>>>>>>> Gives this people have to read and understand in anyway, why
>>>>>>>>>>>>
>>>>>>>>>>> try
>>>>>
>>>>>> to
>>>>>>
>>>>>>> hide
>>>>>>>>>>>> it?
>>>>>>>>>>>>
>>>>>>>>>>>> It really miss the beauty of 0.10 version DSL.
>>>>>>>>>>>> Apparently not a thing I can influence but just warn about.
>>>>>>>>>>>>
>>>>>>>>>>>> @gouzhang
>>>>>>>>>>>> you can't imagine how many extra IQ-Statestores I constantly
>>>>>>>>>>>>
>>>>>>>>>>> prune
>>>>>>
>>>>>>> from
>>>>>>>>
>>>>>>>>> stream app's
>>>>>>>>>>>> because people just keep passing Materialized's into all the
>>>>>>>>>>>> operations.
>>>>>>>>>>>> :D :'-(
>>>>>>>>>>>> I regret that I couldn't convince you guys back then. Plus this
>>>>>>>>>>>>
>>>>>>>>>>> whole
>>>>>>>
>>>>>>>> entire topology as a floating
>>>>>>>>>>>> interface chain, never seen it anywhere :-/ :'(
>>>>>>>>>>>>
>>>>>>>>>>>> I don't know. I guess this is just me regretting to only have
>>>>>>>>>>>>
>>>>>>>>>>> 24h/day.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I updated the KIP today with some points worth talking about,
>>>>>>>>>>>>
>>>>>>>>>>> should
>>>>>>>
>>>>>>>> anyone
>>>>>>>>>>>>
>>>>>>>>>>>>> be so inclined to check it out. Currently we are running this
>>>>>>>>>>>>>
>>>>>>>>>>>> code
>>>>>>
>>>>>>> in
>>>>>>>>
>>>>>>>>> production to handle relational joins from our Kafka Connect
>>>>>>>>>>>>> topics, as
>>>>>>>>>>>>> per
>>>>>>>>>>>>> the original motivation of the KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I believe the foreignKeyJoin should be responsible for. In my
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<
>>>>>>>>>>>>>
>>>>>>>>>>>> wangguoz@gmail.com
>>>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> As for your question regarding GraphNodes, it is for
>>>>>>>>>>>>>>
>>>>>>>>>>>>> extending
>>>>>
>>>>>> Streams
>>>>>>>>>>>>>> optimization framework. You can find more details on
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6761.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The main idea is that instead of directly building up the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> "physical
>>>>>>>
>>>>>>>> topology" (represented as Topology in the public package, and
>>>>>>>>>>>>>> internally
>>>>>>>>>>>>>> built as the ProcessorTopology class) while users are
>>>>>>>>>>>>>>
>>>>>>>>>>>>> specifying
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> transformation operators, we first keep it as a "logical
>>>>>>>>>>>>>>
>>>>>>>>>>>>> topology"
>>>>>>>
>>>>>>>> (represented as GraphNode inside InternalStreamsBuilder). And
>>>>>>>>>>>>>>
>>>>>>>>>>>>> then
>>>>>>>
>>>>>>>> only
>>>>>>>>>>>>>> execute the optimization and the construction of the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> "physical"
>>>>>
>>>>>> Topology
>>>>>>>>>>>>>> when StreamsBuilder.build() is called.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Back to your question, I think it makes more sense to add a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> new
>>>>>
>>>>>> type of
>>>>>>>>>>>>>> StreamsGraphNode (maybe you can consider inheriting from the
>>>>>>>>>>>>>> BaseJoinProcessorNode). Note that although in the Topology we
>>>>>>>>>>>>>>
>>>>>>>>>>>>> will
>>>>>>>
>>>>>>>> have
>>>>>>>>>>>>>> multiple connected ProcessorNodes to represent a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> (foreign-key)
>>>>>
>>>>>> join, we
>>>>>>>>>>>>>> still want to keep it as a single StreamsGraphNode, or just a
>>>>>>>>>>>>>> couple of
>>>>>>>>>>>>>> them in the logical representation so that in the future we
>>>>>>>>>>>>>>
>>>>>>>>>>>>> can
>>>>>
>>>>>> construct
>>>>>>>>>>>>>> the physical topology differently (e.g. having another way
>>>>>>>>>>>>>>
>>>>>>>>>>>>> than
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> current
>>>>>>>>>>>>>> distributed hash-join).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -------------------------------------------------------
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Back to your questions to KIP-213, I think Jan has summarized
>>>>>>>>>>>>>>
>>>>>>>>>>>>> it
>>>>>>
>>>>>>> pretty-well. Note that back then we do not have headers
>>>>>>>>>>>>>>
>>>>>>>>>>>>> support
>>>>>
>>>>>> so
>>>>>>>
>>>>>>>> we
>>>>>>>>
>>>>>>>>> have
>>>>>>>>>>>>>> to do such "key-widening" approach to ensure ordering.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 11:39 PM, Jan
>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Adam,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I love how you are on to this already! I resolve this by
>>>>>>>>>>>>>>> "key-widening"
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> treat the result of FKA,and FKB differently.
>>>>>>>>>>>>>>> As you can see the output of my join has a Combined Key and
>>>>>>>>>>>>>>> therefore I
>>>>>>>>>>>>>>> can resolve the "race condition" in a group by
>>>>>>>>>>>>>>> if I so desire.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think this reflects more what happens under the hood and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> makes
>>>>>>
>>>>>>> it more
>>>>>>>>>>>>>>> clear to the user what is going on. The Idea
>>>>>>>>>>>>>>> of hiding this behind metadata and handle it in the DSL is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> from
>>>>>>
>>>>>>> my POV
>>>>>>>>>>>>>>> unideal.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To write into your example:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> key + A, null)
>>>>>>>>>>>>>>> (key +B, <joined On FK =B>)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> is what my output would look like.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope that makes sense :D
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 13.08.2018 18:16, Adam Bellemare wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you do not use headers or other metadata, how do you
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ensure
>>>>>>
>>>>>>> that
>>>>>>>>
>>>>>>>>> changes
>>>>>>>>>>>>>>>> to the foreign-key value are not resolved out-of-order?
>>>>>>>>>>>>>>>> ie: If an event has FK = A, but you change it to FK = B,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> you
>>>>>
>>>>>> need to
>>>>>>>>>>>>>>>> propagate both a delete (FK=A -> null) and an addition
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (FK=B).
>>>>>>
>>>>>>> In my
>>>>>>>>>>>>>>>> solution, without maintaining any metadata, it is possible
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> for
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> final
>>>>>>>>>>>>>>>> output to be in either order - the correctly updated joined
>>>>>>>>>>>>>>>> value, or
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> null for the delete.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> (key, null)
>>>>>>>>>>>>>>>> (key, <joined On FK =B>)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> (key, <joined On FK =B>)
>>>>>>>>>>>>>>>> (key, null)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I looked back through your code and through the discussion
>>>>>>>>>>>>>>>> threads, and
>>>>>>>>>>>>>>>> didn't see any information on how you resolved this. I
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> have a
>>>>>
>>>>>> version
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>> code working for 2.0, I am just adding more integration
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> tests
>>>>>
>>>>>> and will
>>>>>>>>>>>>>>>> update the KIP accordingly. Any insight you could provide
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> on
>>>>>
>>>>>> resolving
>>>>>>>>>>>>>>>> out-of-order semantics without metadata would be helpful.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Happy to see that you want to make an effort here.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding the ProcessSuppliers I couldn't find a way to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> not
>>>>>
>>>>>> rewrite
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> joiners + the merger.
>>>>>>>>>>>>>>>>> The re-partitioners can be reused in theory. I don't know
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> if
>>>>>
>>>>>> repartition
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> is optimized in 2.0 now.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I made this
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 241+
>>>>>
>>>>>> KTable+repartition+with+compacted+Topics
>>>>>>>>>>>>>>>>> back then and we are running KIP-213 with KIP-241 in
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> combination.
>>>>>>>>
>>>>>>>>> For us it is vital as it minimized the size we had in our
>>>>>>>>>>>>>>>>> repartition
>>>>>>>>>>>>>>>>> topics plus it removed the factor of 2 in events on every
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> message.
>>>>>>>>
>>>>>>>>> I know about this new  "delete once consumer has read it".
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I
>>>>>>
>>>>>>> don't
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 241 is vital for all usecases, for ours it is. I wanted
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> to use 213 to sneak in the foundations for 241 aswell.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I don't quite understand what a PropagationWrapper is,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> but I
>>>>>
>>>>>> am
>>>>>>>
>>>>>>>> certain
>>>>>>>>>>>>>>>>> that you do not need RecordHeaders
>>>>>>>>>>>>>>>>> for 213 and I would try to leave them out. They either
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> belong
>>>>>>
>>>>>>> to the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> or to the user, having a mixed use is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> to be avoided. We run the join with 0.8 logformat and I
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> don't
>>>>>>
>>>>>>> think
>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>> needs more.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This KIP will be very valuable for the streams project! I
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> couldn't
>>>>>>>>
>>>>>>>>> never
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> convince myself to invest into the 1.0+ DSL
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> as I used almost all my energy to fight against it. Maybe
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> this
>>>>>>
>>>>>>> can
>>>>>>>>
>>>>>>>>> also
>>>>>>>>>>>>>>>>> help me see the good sides a little bit more.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If there is anything unclear with all the text that has
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> been
>>>>>
>>>>>> written,
>>>>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>>>> free to just directly cc me so I don't miss it on
>>>>>>>>>>>>>>>>> the mailing list.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 08.08.2018 15:26, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> More followup, and +dev as Guozhang replied to me directly
>>>>>>>>>>>>>>>>> previously.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am currently porting the code over to trunk. One of the
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> major
>>>>>>>
>>>>>>>> changes
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> since 1.0 is the usage of GraphNodes. I have a question
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> about
>>>>>
>>>>>> this:
>>>>>>>>
>>>>>>>>> For a foreignKey joiner, should it have its own dedicated
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> node
>>>>>>>
>>>>>>>> type?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Or
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> would it be advisable to construct it from existing
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> GraphNode
>>>>>
>>>>>> components?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message