kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Bellemare <adam.bellem...@gmail.com>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Wed, 02 Jan 2019 20:35:30 GMT
Hi Jan

Ahh, I got it! It is deterministic once you apply the groupBy function you
mentioned a few months ago to the output, but not before you apply it...
correct? I was not thinking about the groupBy function.

Here's how I understand how it could work from an API perspective: I am
going to use the terminology "KScatteredTable" to represent the
intermediate table that is not yet resolved - basically the join was
performed but no race condition handling is done.

If I wanted to join three KTables together on foreign keys, one of the ways
I could do it is:

KScatteredTable scatteredOne =  ktableOne.oneToManyJoin(kTableTwo,
joinerFuncTwo, foreignKeyExtractorTwo);
KScatteredTable scatteredTwo = scatteredOne.oneToManyJoin(kTableThree,
joinerFuncThree, foreignKeyExtractorThree)

//Now I groupBy the key that I want to obtain, and I can resolve the out of
order dependencies here.
scatteredTwo.groupBy( keyValueMapper )   ( shown here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-SolutionB-User-ManagedGroupBy(Jan's)
)

Is this in line with what you're doing? Can this be done without exposing
the CombinedKey? As you mentioned before "A Table
KTable<CombinedKey<A,B>,JoinedResult> is not a good return type. It breaks
the KTable invariant that a table is currently partitioned by its key".
With that being said, are the only two operations that a KScatteredTable
would need to support be oneToManyJoin and groupBy?

Thanks for your thoughts

Adam


On Wed, Jan 2, 2019 at 3:07 PM Jan Filipiak <Jan.Filipiak@trivago.com>
wrote:

> Hi Adam,
>
> I am kinda surprised! Yes my solution of course is correct. Don't really
> know what to show in an example as I am convinced you grabbed the
> concept of how mine works,
>
> If there is a race condition there is a race condition. It doesn't
> matter if there is 10 minutes or milliseconds between events. Either
> they are properly guarded or not. My solution has no such race
> condition. It is 'eventual consistent'. You gonna see all sort of stuff
> coming up during a reprocess.
>
> The user can still fk it up later though. But that is usual business.
>
> In reality I try to supress updates from left sides as long as possible
> because right side updates are more expensive if left is already
> fullish. So that limits the space a little but there are no grantees.
> The result however, after lag is zero is the same every time.
>
> The trade-offs can be shifted as you like. My solution gives full power
> to the user and only does a minimum in the framework. You push
> everything into streams.
>
> If you ask me, not a good choice. Will anyone listen. No.
> I do actually think its to late to do my way. It's not like if you
> haven't been gone through the effort and building it.
>
> Just wanted to give you guys another chance, to think it through  ;)
>
> Regarding what will be observed. I consider it a plus that all events
> that are in the inputs have an respective output. Whereas your solution
> might "swallow" events.
>
> Best Jan
>
>
> On 02.01.2019 15:30, Adam Bellemare wrote:
> > Jan
> >
> > I have been thinking a lot about the history of the discussion and your
> > original proposal, and why you believe it is a better solution. The
> biggest
> > problem with your original proposed design is that it seems to me to be
> > non-deterministic. It is subject to race conditions that are dependent
> > entirely on the data, and without resolution of these races you can end
> up
> > with different results each time. If I am mistaken and this is indeed
> > deterministic, then please let me know and provide an explanation,
> ideally
> > with an example.
> >
> > The way I see it is that you will get very different answers to your
> > non-race-condition-resolved join topology, especially if you are nesting
> it
> > with additional joins as you have indicated you are doing. Consider
> > rebuilding an application state from the beginning of two topics. If the
> > left/this side has multiple foreign-key changes in a row, spaced out
> every
> > ten minutes, you may see something like this:
> >
> > (foo, foreignKey=red) t=0
> > (foo, foreignKey=blue) t=0+10m
> > (foo, foreignKey=green) t=0+20m
> > (foo, foreignKey=purple) t=0+30m
> > (foo, foreignKey=blue) t=0+40m
> > (foo, foreignKey=white) t=0+50m
> >
> > During realtime processing, all of the updates may have correctly
> > propagated because it took less than 10 minutes to resolve each join.
> Upon
> > rebuilding from the start, however, all of these events would be
> processed
> > in quick succession. The presence or absence of data will affect the
> > results of your join, and the results can vary with each run depending on
> > the data. Because of this, I cannot support any kind of solution that
> would
> > allow the exposure of an unresolved intermediate state. I can understand
> if
> > you don't support this, but this is why, as you said, you have the
> freedom
> > to use the Processor API.
> >
> >
> > With that being said, either the solution that I originally proposed
> > (join's ocurring on the foreign node) or John + Guozhang's solution
> > (registering with the foreign node for notifications) is fine with me -
> > both have the same API and we can evaluate it further during
> implementation.
> >
> >
> > Thanks
> >
> > Adam
> >
> > On Thu, Dec 27, 2018 at 2:38 PM Jan Filipiak <Jan.Filipiak@trivago.com>
> > wrote:
> >
> >> Hi,
> >>
> >> just want to let you guys know that this thing is spiralling out of
> >> control if you ask me.
> >>
> >> First you take away the possibility for the user to optimize. Now you
> >> pile up complexity to perform some afterwards optimisation, that from my
> >> POV completely misses the point. As if the actual call to the joiner
> >> really gonna be an expensive part. It wont. Truth is, you don't have a
> >> clue which side is gonna be smaller. might be the key you shuffle around
> >> is >>> than the value on the other side already.
> >>
> >> You know my opinion on this. For me its dead, I just leave you the
> >> message here as an opportunity to reconsider the choices that were made.
> >>
> >> Whish y'll a happy new year :)
> >>
> >>
> >>
> >>
> >>
> >>
> >> On 27.12.2018 17:22, Adam Bellemare wrote:
> >>> Hi All
> >>>
> >>> Sorry for the delay - holidays and all. I have since updated the KIP
> with
> >>> John's original suggestion and have pruned a number of the no longer
> >>> relevant diagrams. Any more comments would be welcomed, otherwise I
> will
> >>> look to kick off the vote again shortly.
> >>>
> >>> Thanks
> >>> Adam
> >>>
> >>> On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare <
> adam.bellemare@gmail.com
> >>>
> >>> wrote:
> >>>
> >>>> Hi John and Guozhang
> >>>>
> >>>> Ah yes, I lost that in the mix! Thanks for the convergent solutions -
> I
> >> do
> >>>> think that the attachment that John included makes for a better
> design.
> >> It
> >>>> should also help with overall performance as very high-cardinality
> >> foreign
> >>>> keyed data (say millions of events with the same entity) will be able
> to
> >>>> leverage the multiple nodes for join functionality instead of having
> it
> >> all
> >>>> performed in one node. There is still a bottleneck in the right table
> >>>> having to propagate all those events, but with slimmer structures,
> less
> >> IO
> >>>> and no need to perform the join I think the throughput will be much
> >> higher
> >>>> in those scenarios.
> >>>>
> >>>> Okay, I am convinced. I will update the KIP accordingly to a Gliffy
> >>>> version of John's diagram and ensure that the example flow matches
> >>>> correctly. Then I can go back to working on the PR to match the
> diagram.
> >>>>
> >>>> Thanks both of you for all the help - very much appreciated.
> >>>>
> >>>> Adam
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi John,
> >>>>>
> >>>>> Just made a pass on your diagram (nice hand-drawing btw!), and
> >> obviously
> >>>>> we
> >>>>> are thinking about the same thing :) A neat difference that I like,
> is
> >>>>> that
> >>>>> in the pre-join repartition topic we can still send message in the
> >> format
> >>>>> of `K=k, V=(i=2)` while using "i" as the partition key in
> >>>>> StreamsPartition,
> >>>>> this way we do not need to even augment the key for the repartition
> >> topic,
> >>>>> but just do a projection on the foreign key part but trim all other
> >>>>> fields:
> >>>>> as long as we still materialize the store as `A-2` co-located with
> the
> >>>>> right KTable, that is fine.
> >>>>>
> >>>>> As I mentioned in my previous email, I also think this has a few
> >>>>> advantages
> >>>>> on saving over-the-wire bytes as well as disk bytes.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <john@confluent.io>
> >> wrote:
> >>>>>
> >>>>>> Hi Guozhang,
> >>>>>>
> >>>>>> Thanks for taking a look! I think Adam's already addressed your
> >>>>> questions
> >>>>>> as well as I could have.
> >>>>>>
> >>>>>> Hi Adam,
> >>>>>>
> >>>>>> Thanks for updating the KIP. It looks great, especially how all the
> >>>>>> need-to-know information is right at the top, followed by the
> details.
> >>>>>>
> >>>>>> Also, thanks for that high-level diagram. Actually, now that I'm
> >> looking
> >>>>>> at it, I think part of my proposal got lost in translation,
> although I
> >>>>> do
> >>>>>> think that what you have there is also correct.
> >>>>>>
> >>>>>> I sketched up a crude diagram based on yours and attached it to the
> >> KIP
> >>>>>> (I'm not sure if attached or inline images work on the mailing
> list):
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
> >>>>>> . It's also attached to this email for convenience.
> >>>>>>
> >>>>>> Hopefully, you can see how it's intended to line up, and which parts
> >> are
> >>>>>> modified.
> >>>>>> At a high level, instead of performing the join on the right-hand
> >> side,
> >>>>>> we're essentially just registering interest, like "LHS key A wishes
> to
> >>>>>> receive updates for RHS key 2". Then, when there is a new "interest"
> >> or
> >>>>> any
> >>>>>> updates to the RHS records, it "broadcasts" its state back to the
> LHS
> >>>>>> records who are interested in it.
> >>>>>>
> >>>>>> Thus, instead of sending the LHS values to the RHS joiner workers
> and
> >>>>> then
> >>>>>> sending the join results back to the LHS worke be co-partitioned and
> >>>>>> validated, we instead only send the LHS *keys* to the RHS workers
> and
> >>>>> then
> >>>>>> only the RHS k/v back to be joined by the LHS worker.
> >>>>>>
> >>>>>> I've been considering both your diagram and mine, and I *think* what
> >> I'm
> >>>>>> proposing has a few advantages.
> >>>>>>
> >>>>>> Here are some points of interest as you look at the diagram:
> >>>>>> * When we extract the foreign key and send it to the Pre-Join
> >>>>> Repartition
> >>>>>> Topic, we can send only the FK/PK pair. There's no need to worry
> about
> >>>>>> custom partitioner logic, since we can just use the foreign key
> >> plainly
> >>>>> as
> >>>>>> the repartition record key. Also, we save on transmitting the LHS
> >> value,
> >>>>>> since we only send its key in this step.
> >>>>>> * We also only need to store the RHSKey:LHSKey mapping in the
> >>>>>> MaterializedSubscriptionStore, saving on disk. We can use the same
> >> rocks
> >>>>>> key format you proposed and the same algorithm involving range scans
> >>>>> when
> >>>>>> the RHS records get updated.
> >>>>>> * Instead of joining on the right side, all we do is compose a
> >>>>>> re-repartition record so we can broadcast the RHS k/v pair back to
> the
> >>>>>> original LHS partition. (this is what the "rekey" node is doing)
> >>>>>> * Then, there is a special kind of Joiner that's co-resident in the
> >> same
> >>>>>> StreamTask as the LHS table, subscribed to the Post-Join Repartition
> >>>>> Topic.
> >>>>>> ** This Joiner is *not* triggered directly by any changes in the LHS
> >>>>>> KTable. Instead, LHS events indirectly trigger the join via the
> whole
> >>>>>> lifecycle.
> >>>>>> ** For each event arriving from the Post-Join Repartition Topic, the
> >>>>>> Joiner looks up the corresponding record in the LHS KTable. It
> >> validates
> >>>>>> the FK as you noted, discarding any inconsistent events. Otherwise,
> it
> >>>>>> unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the
> >> join
> >>>>>> result
> >>>>>> ** Note that the Joiner itself is stateless, so materializing the
> join
> >>>>>> result is optional, just as with the 1:1 joins.
> >>>>>>
> >>>>>> So in summary:
> >>>>>> * instead of transmitting the LHS keys and values to the right and
> the
> >>>>>> JoinResult back to the left, we only transmit the LHS keys to the
> >> right
> >>>>> and
> >>>>>> the RHS values to the left. Assuming the average RHS value is on
> >> smaller
> >>>>>> than or equal to the average join result size, it's a clear win on
> >>>>> broker
> >>>>>> traffic. I think this is actually a reasonable assumption, which we
> >> can
> >>>>>> discuss more if you're suspicious.
> >>>>>> * we only need one copy of the data (the left and right tables need
> to
> >>>>> be
> >>>>>> materialized) and one extra copy of the PK:FK pairs in the
> >> Materialized
> >>>>>> Subscription Store. Materializing the join result is optional, just
> as
> >>>>> with
> >>>>>> the existing 1:1 joins.
> >>>>>> * we still need the fancy range-scan algorithm on the right to
> locate
> >>>>> all
> >>>>>> interested LHS keys when a RHS value is updated, but we don't need a
> >>>>> custom
> >>>>>> partitioner for either repartition topic (this is of course a
> >>>>> modification
> >>>>>> we could make to your version as well)
> >>>>>>
> >>>>>> How does this sound to you? (And did I miss anything?)
> >>>>>> -John
> >>>>>>
> >>>>>> On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <
> >>>>> adam.bellemare@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi John & Guozhang
> >>>>>>>
> >>>>>>> @John & @Guozhang Wang <wangguoz@gmail.com> - I have cleaned up
> the
> >>>>> KIP,
> >>>>>>> pruned much of what I wrote and put a simplified diagram near the
> top
> >>>>> to
> >>>>>>> illustrate the workflow. I encapsulated Jan's content at the bottom
> >> of
> >>>>> the
> >>>>>>> document. I believe it is simpler to read by far now.
> >>>>>>>
> >>>>>>> @Guozhang Wang <wangguoz@gmail.com>:
> >>>>>>>> #1: rekey left table
> >>>>>>>>     -> source from the left upstream, send to rekey-processor to
> >>>>> generate
> >>>>>>> combined key, and then sink to copartition topic.
> >>>>>>> Correct.
> >>>>>>>
> >>>>>>>> #2: first-join with right table
> >>>>>>>>     -> source from the right table upstream, materialize the right
> >>>>> table.
> >>>>>>>>     -> source from the co-partition topic, materialize the rekeyed
> >> left
> >>>>>>> table, join with the right table, rekey back, and then sink to the
> >>>>>>> rekeyed-back topic.
> >>>>>>> Almost - I cleared up the KIP. We do not rekey back yet, as I need
> >> the
> >>>>>>> Foreign-Key value generated in #1 above to compare in the
> resolution
> >>>>>>> stage.
> >>>>>>>
> >>>>>>>> #3: second join
> >>>>>>>>      -> source from the rekeyed-back topic, materialize the
> rekeyed
> >>>>> back
> >>>>>>> table.
> >>>>>>>>     -> source from the left upstream, materialize the left table,
> >> join
> >>>>>>> with
> >>>>>>> the rekeyed back table.
> >>>>>>> Almost - As each event comes in, we just run it through a stateful
> >>>>>>> processor that checks the original ("This") KTable for the key. The
> >>>>> value
> >>>>>>> payload then has the foreignKeyExtractor applied again as in Part
> #1
> >>>>>>> above,
> >>>>>>> and gets the current foreign key. Then we compare it to the joined
> >>>>> event
> >>>>>>> that we are currently resolving. If they have the same foreign-key,
> >>>>>>> propagate the result out. If they don't, throw the event away.
> >>>>>>>
> >>>>>>> The end result is that we do need to materialize 2 additional
> tables
> >>>>>>> (left/this-combinedkey table, and the final Joined table) as I've
> >>>>>>> illustrated in the updated KIP. I hope the diagram clears it up a
> lot
> >>>>>>> better. Please let me know.
> >>>>>>>
> >>>>>>> Thanks again
> >>>>>>> Adam
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangguoz@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> John,
> >>>>>>>>
> >>>>>>>> Thanks a lot for the suggestions on refactoring the wiki, I agree
> >>>>> with
> >>>>>>> you
> >>>>>>>> that we should consider the KIP proposal to be easily understood
> by
> >>>>>>> anyone
> >>>>>>>> in the future to read, and hence should provide a good summary on
> >> the
> >>>>>>>> user-facing interfaces, as well as rejected alternatives to
> >> represent
> >>>>>>>> briefly "how we came a long way to this conclusion, and what we
> have
> >>>>>>>> argued, disagreed, and agreed about, etc" so that readers do not
> >>>>> need to
> >>>>>>>> dig into the DISCUSS thread to get all the details. We can, of
> >>>>> course,
> >>>>>>> keep
> >>>>>>>> the implementation details like "workflows" on the wiki page as a
> >>>>>>> addendum
> >>>>>>>> section since it also has correlations.
> >>>>>>>>
> >>>>>>>> Regarding your proposal on comment 6): that's a very interesting
> >>>>> idea!
> >>>>>>> Just
> >>>>>>>> to clarify that I understands it fully correctly: the proposal's
> >>>>>>> resulted
> >>>>>>>> topology is still the same as the current proposal, where we will
> >>>>> have 3
> >>>>>>>> sub-topologies for this operator:
> >>>>>>>>
> >>>>>>>> #1: rekey left table
> >>>>>>>>      -> source from the left upstream, send to rekey-processor to
> >>>>> generate
> >>>>>>>> combined key, and then sink to copartition topic.
> >>>>>>>>
> >>>>>>>> #2: first-join with right table
> >>>>>>>>      -> source from the right table upstream, materialize the
> right
> >>>>> table.
> >>>>>>>>      -> source from the co-partition topic, materialize the
> rekeyed
> >>>>> left
> >>>>>>>> table, join with the right table, rekey back, and then sink to the
> >>>>>>>> rekeyed-back topic.
> >>>>>>>>
> >>>>>>>> #3: second join
> >>>>>>>>      -> source from the rekeyed-back topic, materialize the
> rekeyed
> >>>>> back
> >>>>>>>> table.
> >>>>>>>>      -> source from the left upstream, materialize the left table,
> >> join
> >>>>>>> with
> >>>>>>>> the rekeyed back table.
> >>>>>>>>
> >>>>>>>> Sub-topology #1 and #3 may be merged to a single sub-topology
> since
> >>>>>>> both of
> >>>>>>>> them read from the left table source stream. In this workflow, we
> >>>>> need
> >>>>>>> to
> >>>>>>>> materialize 4 tables (left table in #3, right table in #2, rekeyed
> >>>>> left
> >>>>>>>> table in #2, rekeyed-back table in #3), and 2 repartition topics
> >>>>>>>> (copartition topic, rekeyed-back topic).
> >>>>>>>>
> >>>>>>>> Compared with Adam's current proposal in the workflow overview, it
> >>>>> has
> >>>>>>> the
> >>>>>>>> same num.materialize tables (left table, rekeyed left table, right
> >>>>>>> table,
> >>>>>>>> out-of-ordering resolver table), and same num.internal topics
> (two).
> >>>>> The
> >>>>>>>> advantage is that on the copartition topic, we can save bandwidth
> by
> >>>>> not
> >>>>>>>> sending value, and in #2 the rekeyed left table is smaller since
> we
> >>>>> do
> >>>>>>> not
> >>>>>>>> have any values to materialize. Is that right?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Dec 12, 2018 at 1:22 PM John Roesler <john@confluent.io>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Adam,
> >>>>>>>>>
> >>>>>>>>> Given that the committers are all pretty busy right now, I think
> >>>>> that
> >>>>>>> it
> >>>>>>>>> would help if you were to refactor the KIP a little to reduce the
> >>>>>>>> workload
> >>>>>>>>> for reviewers.
> >>>>>>>>>
> >>>>>>>>> I'd recommend the following changes:
> >>>>>>>>> * relocate all internal details to a section at the end called
> >>>>>>> something
> >>>>>>>>> like "Implementation Notes" or something like that.
> >>>>>>>>> * rewrite the rest of the KIP to be a succinct as possible and
> >>>>> mention
> >>>>>>>> only
> >>>>>>>>> publicly-facing API changes.
> >>>>>>>>> ** for example, the interface that you've already listed there,
> as
> >>>>>>> well
> >>>>>>>> as
> >>>>>>>>> a textual description of the guarantees we'll be providing (join
> >>>>>>> result
> >>>>>>>> is
> >>>>>>>>> copartitioned with the LHS, and the join result is guaranteed
> >>>>> correct)
> >>>>>>>>>
> >>>>>>>>> A good target would be that the whole main body of the KIP,
> >>>>> including
> >>>>>>>>> Status, Motivation, Proposal, Justification, and Rejected
> >>>>> Alternatives
> >>>>>>>> all
> >>>>>>>>> fit "above the fold" (i.e., all fit on the screen at a
> comfortable
> >>>>>>> zoom
> >>>>>>>>> level).
> >>>>>>>>> I think the only real Rejected Alternative that bears mention at
> >>>>> this
> >>>>>>>> point
> >>>>>>>>> is KScatteredTable, which you could just include the executive
> >>>>>>> summary on
> >>>>>>>>> (no implementation details), and link to extra details in the
> >>>>>>>>> Implementation Notes section.
> >>>>>>>>>
> >>>>>>>>> Taking a look at the wiki page, ~90% of the text there is
> internal
> >>>>>>>> detail,
> >>>>>>>>> which is useful for the dubious, but doesn't need to be ratified
> >>>>> in a
> >>>>>>>> vote
> >>>>>>>>> (and would be subject to change without notice in the future
> >>>>> anyway).
> >>>>>>>>> There's also a lot of conflicting discussion, as you've very
> >>>>>>> respectfully
> >>>>>>>>> tried to preserve the original proposal from Jan while adding
> your
> >>>>>>> own.
> >>>>>>>>> Isolating all this information in a dedicated section at the
> bottom
> >>>>>>> frees
> >>>>>>>>> the voters up to focus on the public API part of the proposal,
> >>>>> which
> >>>>>>> is
> >>>>>>>>> really all they need to consider.
> >>>>>>>>>
> >>>>>>>>> Plus, it'll be clear to future readers which parts of the
> document
> >>>>> are
> >>>>>>>>> enduring, and which parts are a snapshot of our implementation
> >>>>>>> thinking
> >>>>>>>> at
> >>>>>>>>> the time.
> >>>>>>>>>
> >>>>>>>>> I'm suggesting this because I suspect that the others haven't
> made
> >>>>>>> time
> >>>>>>>> to
> >>>>>>>>> review it partly because it seems daunting. If it seems like it
> >>>>> would
> >>>>>>> be
> >>>>>>>> a
> >>>>>>>>> huge time investment to review, people will just keep putting it
> >>>>> off.
> >>>>>>> But
> >>>>>>>>> if the KIP is a single page, then they'll be more inclined to
> give
> >>>>> it
> >>>>>>> a
> >>>>>>>>> read.
> >>>>>>>>>
> >>>>>>>>> Honestly, I don't think the KIP itself is that controversial
> (apart
> >>>>>>> from
> >>>>>>>>> the scattered table thing (sorry, Jan) ). Most of the discussion
> >>>>> has
> >>>>>>> been
> >>>>>>>>> around the implementation, which we can continue more effectively
> >>>>> in
> >>>>>>> a PR
> >>>>>>>>> once the KIP has passed.
> >>>>>>>>>
> >>>>>>>>> How does that sound?
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>>>> On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
> >>>>>>> adam.bellemare@gmail.com
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> 1) I believe that the resolution mechanism John has proposed is
> >>>>>>>>> sufficient
> >>>>>>>>>> - it is clean and easy and doesn't require additional RocksDB
> >>>>>>> stores,
> >>>>>>>>> which
> >>>>>>>>>> reduces the footprint greatly. I don't think we need to resolve
> >>>>>>> based
> >>>>>>>> on
> >>>>>>>>>> timestamp or offset anymore, but if we decide to do to that
> >>>>> would be
> >>>>>>>>> within
> >>>>>>>>>> the bounds of the existing API.
> >>>>>>>>>>
> >>>>>>>>>> 2) Is the current API sufficient, or does it need to be altered
> >>>>> to
> >>>>>>> go
> >>>>>>>>> back
> >>>>>>>>>> to vote?
> >>>>>>>>>>
> >>>>>>>>>> 3) KScatteredTable implementation can always be added in a
> future
> >>>>>>>>> revision.
> >>>>>>>>>> This API does not rule it out. This implementation of this
> >>>>> function
> >>>>>>>> would
> >>>>>>>>>> simply be replaced with `KScatteredTable.resolve()` while still
> >>>>>>>>> maintaining
> >>>>>>>>>> the existing API, thereby giving both features as Jan outlined
> >>>>>>> earlier.
> >>>>>>>>>> Would this work?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks Guozhang, John and Jan
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Dec 10, 2018 at 10:39 AM John Roesler <
> john@confluent.io
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi, all,
> >>>>>>>>>>>
> >>>>>>>>>>>>> In fact, we
> >>>>>>>>>>>>> can just keep a single final-result store with timestamps
> >>>>> and
> >>>>>>>> reject
> >>>>>>>>>>> values
> >>>>>>>>>>>>> that have a smaller timestamp, is that right?
> >>>>>>>>>>>
> >>>>>>>>>>>> Which is the correct output should at least be decided on the
> >>>>>>>> offset
> >>>>>>>>> of
> >>>>>>>>>>>> the original message.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for this point, Jan.
> >>>>>>>>>>>
> >>>>>>>>>>> KIP-258 is merely to allow embedding the record timestamp  in
> >>>>> the
> >>>>>>> k/v
> >>>>>>>>>>> store,
> >>>>>>>>>>> as well as providing a storage-format upgrade path.
> >>>>>>>>>>>
> >>>>>>>>>>> I might have missed it, but I think we have yet to discuss
> >>>>> whether
> >>>>>>>> it's
> >>>>>>>>>>> safe
> >>>>>>>>>>> or desirable just to swap topic-ordering our for
> >>>>>>> timestamp-ordering.
> >>>>>>>>> This
> >>>>>>>>>>> is
> >>>>>>>>>>> a very deep topic, and I think it would only pollute the
> >>>>> current
> >>>>>>>>>>> discussion.
> >>>>>>>>>>>
> >>>>>>>>>>> What Adam has proposed is safe, given the *current* ordering
> >>>>>>>> semantics
> >>>>>>>>>>> of the system. If we can agree on his proposal, I think we can
> >>>>>>> merge
> >>>>>>>>> the
> >>>>>>>>>>> feature well before the conversation about timestamp ordering
> >>>>> even
> >>>>>>>>> takes
> >>>>>>>>>>> place, much less reaches a conclusion. In the mean time, it
> >>>>> would
> >>>>>>>> seem
> >>>>>>>>> to
> >>>>>>>>>>> be unfortunate to have one join operator with different
> >>>>> ordering
> >>>>>>>>>> semantics
> >>>>>>>>>>> from every other KTable operator.
> >>>>>>>>>>>
> >>>>>>>>>>> If and when that timestamp discussion takes place, many (all?)
> >>>>>>> KTable
> >>>>>>>>>>> operations
> >>>>>>>>>>> will need to be updated, rendering the many:one join a small
> >>>>>>> marginal
> >>>>>>>>>> cost.
> >>>>>>>>>>>
> >>>>>>>>>>> And, just to plug it again, I proposed an algorithm above that
> >>>>> I
> >>>>>>>>> believe
> >>>>>>>>>>> provides
> >>>>>>>>>>> correct ordering without any additional metadata, and
> >>>>> regardless
> >>>>>>> of
> >>>>>>>> the
> >>>>>>>>>>> ordering semantics. I didn't bring it up further, because I
> >>>>> felt
> >>>>>>> the
> >>>>>>>>> KIP
> >>>>>>>>>>> only needs
> >>>>>>>>>>> to agree on the public API, and we can discuss the
> >>>>> implementation
> >>>>>>> at
> >>>>>>>>>>> leisure in
> >>>>>>>>>>> a PR...
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> -John
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
> >>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 10.12.2018 07:42, Guozhang Wang wrote:
> >>>>>>>>>>>>> Hello Adam / Jan / John,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Sorry for being late on this thread! I've finally got some
> >>>>>>> time
> >>>>>>>>> this
> >>>>>>>>>>>>> weekend to cleanup a load of tasks on my queue (actually
> >>>>> I've
> >>>>>>>> also
> >>>>>>>>>>>> realized
> >>>>>>>>>>>>> there are a bunch of other things I need to enqueue while
> >>>>>>>> cleaning
> >>>>>>>>>> them
> >>>>>>>>>>>> up
> >>>>>>>>>>>>> --- sth I need to improve on my side). So here are my
> >>>>>>> thoughts:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding the APIs: I like the current written API in the
> >>>>> KIP.
> >>>>>>>> More
> >>>>>>>>>>>>> generally I'd prefer to keep the 1) one-to-many join
> >>>>>>>>> functionalities
> >>>>>>>>>> as
> >>>>>>>>>>>>> well as 2) other join types than inner as separate KIPs
> >>>>> since
> >>>>>>> 1)
> >>>>>>>>> may
> >>>>>>>>>>>> worth
> >>>>>>>>>>>>> a general API refactoring that can benefit not only
> >>>>> foreignkey
> >>>>>>>>> joins
> >>>>>>>>>>> but
> >>>>>>>>>>>>> collocate joins as well (e.g. an extended proposal of
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> >>>>>>>>>>>> ),
> >>>>>>>>>>>>> and I'm not sure if other join types would actually be
> >>>>> needed
> >>>>>>>>> (maybe
> >>>>>>>>>>> left
> >>>>>>>>>>>>> join still makes sense), so it's better to
> >>>>>>>>>>> wait-for-people-to-ask-and-add
> >>>>>>>>>>>>> than add-sth-that-no-one-uses.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding whether we enforce step 3) / 4) v.s. introducing
> >>>>> a
> >>>>>>>>>>>>> KScatteredTable for users to inject their own optimization:
> >>>>>>> I'd
> >>>>>>>>>> prefer
> >>>>>>>>>>> to
> >>>>>>>>>>>>> do the current option as-is, and my main rationale is for
> >>>>>>>>>> optimization
> >>>>>>>>>>>>> rooms inside the Streams internals and the API
> >>>>> succinctness.
> >>>>>>> For
> >>>>>>>>>>> advanced
> >>>>>>>>>>>>> users who may indeed prefer KScatteredTable and do their
> >>>>> own
> >>>>>>>>>>>> optimization,
> >>>>>>>>>>>>> while it is too much of the work to use Processor API
> >>>>>>> directly, I
> >>>>>>>>>> think
> >>>>>>>>>>>> we
> >>>>>>>>>>>>> can still extend the current API to support it in the
> >>>>> future
> >>>>>>> if
> >>>>>>>> it
> >>>>>>>>>>>> becomes
> >>>>>>>>>>>>> necessary.
> >>>>>>>>>>>>
> >>>>>>>>>>>> no internal optimization potential. it's a myth
> >>>>>>>>>>>>
> >>>>>>>>>>>> ¯\_(ツ)_/¯
> >>>>>>>>>>>>
> >>>>>>>>>>>> :-)
> >>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Another note about step 4) resolving out-of-ordering data,
> >>>>> as
> >>>>>>> I
> >>>>>>>>>>> mentioned
> >>>>>>>>>>>>> before I think with KIP-258 (embedded timestamp with
> >>>>> key-value
> >>>>>>>>> store)
> >>>>>>>>>>> we
> >>>>>>>>>>>>> can actually make this step simpler than the current
> >>>>>>> proposal. In
> >>>>>>>>>> fact,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>> can just keep a single final-result store with timestamps
> >>>>> and
> >>>>>>>>> reject
> >>>>>>>>>>>> values
> >>>>>>>>>>>>> that have a smaller timestamp, is that right?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Which is the correct output should at least be decided on the
> >>>>>>>> offset
> >>>>>>>>> of
> >>>>>>>>>>>> the original message.
> >>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> That's all I have in mind now. Again, great appreciation to
> >>>>>>> Adam
> >>>>>>>> to
> >>>>>>>>>>> make
> >>>>>>>>>>>>> such HUGE progress on this KIP!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
> >>>>>>>>>> Jan.Filipiak@trivago.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> If they don't find the time:
> >>>>>>>>>>>>>> They usually take the opposite path from me :D
> >>>>>>>>>>>>>> so the answer would be clear.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> hence my suggestion to vote.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 04.12.2018 21:06, Adam Bellemare wrote:
> >>>>>>>>>>>>>>> Hi Guozhang and Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I know both of you are quite busy, but we've gotten this
> >>>>> KIP
> >>>>>>>> to a
> >>>>>>>>>>> point
> >>>>>>>>>>>>>>> where we need more guidance on the API (perhaps a bit of
> >>>>> a
> >>>>>>>>>>> tie-breaker,
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>> you will). If you have anyone else you may think should
> >>>>>>> look at
> >>>>>>>>>> this,
> >>>>>>>>>>>>>>> please tag them accordingly.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The scenario is as such:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Current Option:
> >>>>>>>>>>>>>>> API:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> >>>>>>>>>>>>>>> 1) Rekey the data to CombinedKey, and shuffles it to the
> >>>>>>>>> partition
> >>>>>>>>>>> with
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> foreignKey (repartition 1)
> >>>>>>>>>>>>>>> 2) Join the data
> >>>>>>>>>>>>>>> 3) Shuffle the data back to the original node
> >>>>> (repartition
> >>>>>>> 2)
> >>>>>>>>>>>>>>> 4) Resolve out-of-order arrival / race condition due to
> >>>>>>>>> foreign-key
> >>>>>>>>>>>>>> changes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Alternate Option:
> >>>>>>>>>>>>>>> Perform #1 and #2 above, and return a KScatteredTable.
> >>>>>>>>>>>>>>> - It would be keyed on a wrapped key function:
> >>>>>>> <CombinedKey<KO,
> >>>>>>>>> K>,
> >>>>>>>>>>> VR>
> >>>>>>>>>>>>>> (KO
> >>>>>>>>>>>>>>> = Other Table Key, K = This Table Key, VR = Joined
> >>>>> Result)
> >>>>>>>>>>>>>>> - KScatteredTable.resolve() would perform #3 and #4 but
> >>>>>>>>> otherwise a
> >>>>>>>>>>>> user
> >>>>>>>>>>>>>>> would be able to perform additional functions directly
> >>>>> from
> >>>>>>> the
> >>>>>>>>>>>>>>> KScatteredTable (TBD - currently out of scope).
> >>>>>>>>>>>>>>> - John's analysis 2-emails up is accurate as to the
> >>>>>>> tradeoffs.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Current Option is coded as-is. Alternate option is
> >>>>> possible,
> >>>>>>>> but
> >>>>>>>>>> will
> >>>>>>>>>>>>>>> require for implementation details to be made in the API
> >>>>> and
> >>>>>>>> some
> >>>>>>>>>>>>>> exposure
> >>>>>>>>>>>>>>> of new data structures into the API (ie: CombinedKey).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I appreciate any insight into this.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
> >>>>>>>>>>>> adam.bellemare@gmail.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi John
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for your feedback and assistance. I think your
> >>>>>>> summary
> >>>>>>>> is
> >>>>>>>>>>>>>> accurate
> >>>>>>>>>>>>>>>> from my perspective. Additionally, I would like to add
> >>>>> that
> >>>>>>>>> there
> >>>>>>>>>>> is a
> >>>>>>>>>>>>>> risk
> >>>>>>>>>>>>>>>> of inconsistent final states without performing the
> >>>>>>>> resolution.
> >>>>>>>>>> This
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> major concern for me as most of the data I have dealt
> >>>>> with
> >>>>>>> is
> >>>>>>>>>>> produced
> >>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>> relational databases. We have seen a number of cases
> >>>>> where
> >>>>>>> a
> >>>>>>>>> user
> >>>>>>>>>> in
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> Rails UI has modified the field (foreign key), realized
> >>>>>>> they
> >>>>>>>>> made
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>> mistake, and then updated the field again with a new
> >>>>> key.
> >>>>>>> The
> >>>>>>>>>> events
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>> propagated out as they are produced, and as such we have
> >>>>>>> had
> >>>>>>>>>>>> real-world
> >>>>>>>>>>>>>>>> cases where these inconsistencies were propagated
> >>>>>>> downstream
> >>>>>>>> as
> >>>>>>>>>> the
> >>>>>>>>>>>>>> final
> >>>>>>>>>>>>>>>> values due to the race conditions in the fanout of the
> >>>>>>> data.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This solution that I propose values correctness of the
> >>>>>>> final
> >>>>>>>>>> result
> >>>>>>>>>>>> over
> >>>>>>>>>>>>>>>> other factors.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> We could always move this function over to using a
> >>>>>>>>> KScatteredTable
> >>>>>>>>>>>>>>>> implementation in the future, and simply deprecate it
> >>>>> this
> >>>>>>>> join
> >>>>>>>>>> API
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>>> time. I think I would like to hear more from some of the
> >>>>>>> other
> >>>>>>>>>> major
> >>>>>>>>>>>>>>>> committers on which course of action they would think is
> >>>>>>> best
> >>>>>>>>>> before
> >>>>>>>>>>>> any
> >>>>>>>>>>>>>>>> more coding is done.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks again
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
> >>>>>>>> john@confluent.io>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Jan and Adam,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Wow, thanks for doing that test, Adam. Those results
> >>>>> are
> >>>>>>>>>>> encouraging.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for your performance experience as well, Jan. I
> >>>>>>> agree
> >>>>>>>>> that
> >>>>>>>>>>>>>> avoiding
> >>>>>>>>>>>>>>>>> unnecessary join outputs is especially important when
> >>>>> the
> >>>>>>>>> fan-out
> >>>>>>>>>>> is
> >>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>> high. I suppose this could also be built into the
> >>>>>>>>> implementation
> >>>>>>>>>>>> we're
> >>>>>>>>>>>>>>>>> discussing, but it wouldn't have to be specified in the
> >>>>>>> KIP
> >>>>>>>>>> (since
> >>>>>>>>>>>>>> it's an
> >>>>>>>>>>>>>>>>> API-transparent optimization).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> As far as whether or not to re-repartition the data, I
> >>>>>>> didn't
> >>>>>>>>>> bring
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>> because it sounded like the two of you agreed to leave
> >>>>> the
> >>>>>>>> KIP
> >>>>>>>>>>> as-is,
> >>>>>>>>>>>>>>>>> despite the disagreement.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If you want my opinion, I feel like both approaches are
> >>>>>>>>>> reasonable.
> >>>>>>>>>>>>>>>>> It sounds like Jan values more the potential for
> >>>>>>> developers
> >>>>>>>> to
> >>>>>>>>>>>> optimize
> >>>>>>>>>>>>>>>>> their topologies to re-use the intermediate nodes,
> >>>>> whereas
> >>>>>>>> Adam
> >>>>>>>>>>>> places
> >>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>> value on having a single operator that people can use
> >>>>>>> without
> >>>>>>>>>> extra
> >>>>>>>>>>>>>> steps
> >>>>>>>>>>>>>>>>> at the end.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Personally, although I do find it exceptionally
> >>>>> annoying
> >>>>>>>> when a
> >>>>>>>>>>>>>> framework
> >>>>>>>>>>>>>>>>> gets in my way when I'm trying to optimize something,
> >>>>> it
> >>>>>>>> seems
> >>>>>>>>>>> better
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> go
> >>>>>>>>>>>>>>>>> for a single operation.
> >>>>>>>>>>>>>>>>> * Encapsulating the internal transitions gives us
> >>>>>>> significant
> >>>>>>>>>>>> latitude
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the implementation (for example, joining only at the
> >>>>> end,
> >>>>>>> not
> >>>>>>>>> in
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> middle
> >>>>>>>>>>>>>>>>> to avoid extra data copying and out-of-order
> >>>>> resolution;
> >>>>>>> how
> >>>>>>>> we
> >>>>>>>>>>>>>> represent
> >>>>>>>>>>>>>>>>> the first repartition keys (combined keys vs. value
> >>>>>>> vectors),
> >>>>>>>>>>> etc.).
> >>>>>>>>>>>>>> If we
> >>>>>>>>>>>>>>>>> publish something like a KScatteredTable with the
> >>>>>>>>>> right-partitioned
> >>>>>>>>>>>>>> joined
> >>>>>>>>>>>>>>>>> data, then the API pretty much locks in the
> >>>>>>> implementation as
> >>>>>>>>>> well.
> >>>>>>>>>>>>>>>>> * The API seems simpler to understand and use. I do
> >>>>> mean
> >>>>>>>>> "seems";
> >>>>>>>>>>> if
> >>>>>>>>>>>>>>>>> anyone
> >>>>>>>>>>>>>>>>> wants to make the case that KScatteredTable is actually
> >>>>>>>>> simpler,
> >>>>>>>>>> I
> >>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>> hypothetical usage code would help. From a relational
> >>>>>>> algebra
> >>>>>>>>>>>>>> perspective,
> >>>>>>>>>>>>>>>>> it seems like KTable.join(KTable) should produce a new
> >>>>>>> KTable
> >>>>>>>>> in
> >>>>>>>>>>> all
> >>>>>>>>>>>>>>>>> cases.
> >>>>>>>>>>>>>>>>> * That said, there might still be room in the API for a
> >>>>>>>>> different
> >>>>>>>>>>>>>>>>> operation
> >>>>>>>>>>>>>>>>> like what Jan has proposed to scatter a KTable, and
> >>>>> then
> >>>>>>> do
> >>>>>>>>>> things
> >>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>> join, re-group, etc from there... I'm not sure; I
> >>>>> haven't
> >>>>>>>>> thought
> >>>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>> all the consequences yet.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This is all just my opinion after thinking over the
> >>>>>>>> discussion
> >>>>>>>>> so
> >>>>>>>>>>>>>> far...
> >>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> >>>>>>>>>>>>>> adam.bellemare@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Updated the PR to take into account John's feedback.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I did some preliminary testing for the performance of
> >>>>> the
> >>>>>>>>>>>> prefixScan.
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>> have attached the file, but I will also include the
> >>>>> text
> >>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>> body
> >>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>> for archival purposes (I am not sure what happens to
> >>>>>>>> attached
> >>>>>>>>>>>> files).
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>> also updated the PR and the KIP accordingly.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Summary: It scales exceptionally well for scanning
> >>>>> large
> >>>>>>>>> values
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> records. As Jan mentioned previously, the real issue
> >>>>>>> would
> >>>>>>>> be
> >>>>>>>>>> more
> >>>>>>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>> processing the resulting records after obtaining them.
> >>>>>>> For
> >>>>>>>>>>> instance,
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> takes approximately ~80-120 mS to flush the buffer
> >>>>> and a
> >>>>>>>>> further
> >>>>>>>>>>>>>>>>> ~35-85mS
> >>>>>>>>>>>>>>>>>> to scan 27.5M records, obtaining matches for 2.5M of
> >>>>>>> them.
> >>>>>>>>>>> Iterating
> >>>>>>>>>>>>>>>>>> through the records just to generate a simple count
> >>>>>>> takes ~
> >>>>>>>> 40
> >>>>>>>>>>> times
> >>>>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>> than the flush + scan combined.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> ============================================================================================
> >>>>>>>>>>>>>>>>>> Setup:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> ============================================================================================
> >>>>>>>>>>>>>>>>>> Java 9 with default settings aside from a 512 MB heap
> >>>>>>>>> (Xmx512m,
> >>>>>>>>>>>>>> Xms512m)
> >>>>>>>>>>>>>>>>>> CPU: i7 2.2 Ghz.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Note: I am using a slightly-modified,
> >>>>> directly-accessible
> >>>>>>>>> Kafka
> >>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>>> implementation (RocksDB.java, basically just avoiding
> >>>>> the
> >>>>>>>>>>>>>>>>>> ProcessorContext).
> >>>>>>>>>>>>>>>>>> There are no modifications to the default RocksDB
> >>>>> values
> >>>>>>>>>> provided
> >>>>>>>>>>> in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> 2.1/trunk release.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> keysize = 128 bytes
> >>>>>>>>>>>>>>>>>> valsize = 512 bytes
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Step 1:
> >>>>>>>>>>>>>>>>>> Write X positive matching events: (key = prefix +
> >>>>>>>> left-padded
> >>>>>>>>>>>>>>>>>> auto-incrementing integer)
> >>>>>>>>>>>>>>>>>> Step 2:
> >>>>>>>>>>>>>>>>>> Write 10X negative matching events (key = left-padded
> >>>>>>>>>>>>>> auto-incrementing
> >>>>>>>>>>>>>>>>>> integer)
> >>>>>>>>>>>>>>>>>> Step 3:
> >>>>>>>>>>>>>>>>>> Perform flush
> >>>>>>>>>>>>>>>>>> Step 4:
> >>>>>>>>>>>>>>>>>> Perform prefixScan
> >>>>>>>>>>>>>>>>>> Step 5:
> >>>>>>>>>>>>>>>>>> Iterate through return Iterator and validate the
> >>>>> count of
> >>>>>>>>>> expected
> >>>>>>>>>>>>>>>>> events.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> ============================================================================================
> >>>>>>>>>>>>>>>>>> Results:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> ============================================================================================
> >>>>>>>>>>>>>>>>>> X = 1k (11k events total)
> >>>>>>>>>>>>>>>>>> Flush Time = 39 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 7 mS
> >>>>>>>>>>>>>>>>>> 6.9 MB disk
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>> X = 10k (110k events total)
> >>>>>>>>>>>>>>>>>> Flush Time = 45 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 8 mS
> >>>>>>>>>>>>>>>>>> 127 MB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>> X = 100k (1.1M events total)
> >>>>>>>>>>>>>>>>>> Test1:
> >>>>>>>>>>>>>>>>>> Flush Time = 60 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 12 mS
> >>>>>>>>>>>>>>>>>> 678 MB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Test2:
> >>>>>>>>>>>>>>>>>> Flush Time = 45 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 7 mS
> >>>>>>>>>>>>>>>>>> 576 MB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>> X = 1MB (11M events total)
> >>>>>>>>>>>>>>>>>> Test1:
> >>>>>>>>>>>>>>>>>> Flush Time = 52 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 19 mS
> >>>>>>>>>>>>>>>>>> 7.2 GB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Test2:
> >>>>>>>>>>>>>>>>>> Flush Time = 84 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 34 mS
> >>>>>>>>>>>>>>>>>> 9.1 GB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>> X = 2.5M (27.5M events total)
> >>>>>>>>>>>>>>>>>> Test1:
> >>>>>>>>>>>>>>>>>> Flush Time = 82 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 63 mS
> >>>>>>>>>>>>>>>>>> 17GB - 276 sst files
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Test2:
> >>>>>>>>>>>>>>>>>> Flush Time = 116 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 35 mS
> >>>>>>>>>>>>>>>>>> 23GB - 361 sst files
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Test3:
> >>>>>>>>>>>>>>>>>> Flush Time = 103 mS
> >>>>>>>>>>>>>>>>>> Scan Time = 82 mS
> >>>>>>>>>>>>>>>>>> 19 GB - 300 sst files
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I had to limit my testing on my laptop to X = 2.5M
> >>>>>>> events. I
> >>>>>>>>>> tried
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> go
> >>>>>>>>>>>>>>>>>> to X = 10M (110M events) but RocksDB was going into
> >>>>> the
> >>>>>>>> 100GB+
> >>>>>>>>>>> range
> >>>>>>>>>>>>>>>>> and my
> >>>>>>>>>>>>>>>>>> laptop ran out of disk. More extensive testing could
> >>>>> be
> >>>>>>> done
> >>>>>>>>>> but I
> >>>>>>>>>>>>>>>>> suspect
> >>>>>>>>>>>>>>>>>> that it would be in line with what we're seeing in the
> >>>>>>>> results
> >>>>>>>>>>>> above.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> At this point in time, I think the only major
> >>>>> discussion
> >>>>>>>> point
> >>>>>>>>>> is
> >>>>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>>> around what Jan and I have disagreed on:
> >>>>> repartitioning
> >>>>>>>> back +
> >>>>>>>>>>>>>> resolving
> >>>>>>>>>>>>>>>>>> potential out of order issues or leaving that up to
> >>>>> the
> >>>>>>>> client
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> handle.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks folks,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
> >>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 29.11.2018 15:14, John Roesler wrote:
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Sorry that this discussion petered out... I think
> >>>>> the
> >>>>>>> 2.1
> >>>>>>>>>>> release
> >>>>>>>>>>>>>>>>>>> caused an
> >>>>>>>>>>>>>>>>>>>> extended distraction that pushed it off everyone's
> >>>>>>> radar
> >>>>>>>>>> (which
> >>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>> precisely Adam's concern). Personally, I've also had
> >>>>>>> some
> >>>>>>>>>> extend
> >>>>>>>>>>>>>>>>>>>> distractions of my own that kept (and continue to
> >>>>>>> keep) me
> >>>>>>>>>>>>>>>>> preoccupied.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> However, calling for a vote did wake me up, so I
> >>>>> guess
> >>>>>>> Jan
> >>>>>>>>> was
> >>>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>>>>> track!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I've gone back and reviewed the whole KIP document
> >>>>> and
> >>>>>>> the
> >>>>>>>>>> prior
> >>>>>>>>>>>>>>>>>>>> discussion, and I'd like to offer a few thoughts:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> API Thoughts:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1. If I read the KIP right, you are proposing a
> >>>>>>>> many-to-one
> >>>>>>>>>>> join.
> >>>>>>>>>>>>>>>>> Could
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
> >>>>>>> flip
> >>>>>>>>> the
> >>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>>>> and make it a oneToManyJoin?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The proposed name "joinOnForeignKey" disguises the
> >>>>> join
> >>>>>>>>> type,
> >>>>>>>>>>> and
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>> like it might trick some people into using it for a
> >>>>>>>>> one-to-one
> >>>>>>>>>>>> join.
> >>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>> would work, of course, but it would be super
> >>>>>>> inefficient
> >>>>>>>>>>> compared
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> simple rekey-and-join.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2. I might have missed it, but I don't think it's
> >>>>>>>> specified
> >>>>>>>>>>>> whether
> >>>>>>>>>>>>>>>>>>> it's an
> >>>>>>>>>>>>>>>>>>>> inner, outer, or left join. I'm guessing an outer
> >>>>>>> join, as
> >>>>>>>>>>>>>>>>> (neglecting
> >>>>>>>>>>>>>>>>>>> IQ),
> >>>>>>>>>>>>>>>>>>>> the rest can be achieved by filtering or by handling
> >>>>>>> it in
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> ValueJoiner.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3. The arg list to joinOnForeignKey doesn't look
> >>>>> quite
> >>>>>>>>> right.
> >>>>>>>>>>>>>>>>>>>> 3a. Regarding Serialized: There are a few different
> >>>>>>>>> paradigms
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>>> play in
> >>>>>>>>>>>>>>>>>>>> the Streams API, so it's confusing, but instead of
> >>>>>>> three
> >>>>>>>>>>>> Serialized
> >>>>>>>>>>>>>>>>>>> args, I
> >>>>>>>>>>>>>>>>>>>> think it would be better to have one that allows
> >>>>>>>>> (optionally)
> >>>>>>>>>>>>>> setting
> >>>>>>>>>>>>>>>>>>> the 4
> >>>>>>>>>>>>>>>>>>>> incoming serdes. The result serde is defined by the
> >>>>>>>>>>> Materialized.
> >>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>> incoming serdes can be optional because they might
> >>>>>>> already
> >>>>>>>>> be
> >>>>>>>>>>>>>>>>> available
> >>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>> the source KTables, or the default serdes from the
> >>>>>>> config
> >>>>>>>>>> might
> >>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> applicable.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3b. Is the StreamPartitioner necessary? The other
> >>>>> joins
> >>>>>>>>> don't
> >>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>> setting
> >>>>>>>>>>>>>>>>>>>> one, and it seems like it might actually be harmful,
> >>>>>>> since
> >>>>>>>>> the
> >>>>>>>>>>>> rekey
> >>>>>>>>>>>>>>>>>>>> operation needs to produce results that are
> >>>>>>> co-partitioned
> >>>>>>>>>> with
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> "other"
> >>>>>>>>>>>>>>>>>>>> KTable.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 4. I'm fine with the "reserved word" header, but I
> >>>>>>> didn't
> >>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>> follow
> >>>>>>>>>>>>>>>>>>>> what Matthias meant about namespacing requiring
> >>>>>>>>>> "deserializing"
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>> header. The headers are already Strings, so I don't
> >>>>>>> think
> >>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> deserialization is required. If we applied the
> >>>>>>> namespace
> >>>>>>>> at
> >>>>>>>>>>> source
> >>>>>>>>>>>>>>>>> nodes
> >>>>>>>>>>>>>>>>>>>> and stripped it at sink nodes, this would be
> >>>>>>> practically
> >>>>>>>> no
> >>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>> advantage of the namespace idea is that no public
> >>>>> API
> >>>>>>>> change
> >>>>>>>>>> wrt
> >>>>>>>>>>>>>>>>> headers
> >>>>>>>>>>>>>>>>>>>> needs to happen, and no restrictions need to be
> >>>>> placed
> >>>>>>> on
> >>>>>>>>>> users'
> >>>>>>>>>>>>>>>>>>> headers.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> (Although I'm wondering if we can get away without
> >>>>> the
> >>>>>>>>> header
> >>>>>>>>>> at
> >>>>>>>>>>>>>>>>> all...
> >>>>>>>>>>>>>>>>>>>> stay tuned)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 5. I also didn't follow the discussion about the HWM
> >>>>>>> table
> >>>>>>>>>>> growing
> >>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>> bound. As I read it, the HWM table is effectively
> >>>>>>>>> implementing
> >>>>>>>>>>> OCC
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> resolve the problem you noted with disordering when
> >>>>> the
> >>>>>>>>> rekey
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> reversed... particularly notable when the FK
> >>>>> changes.
> >>>>>>> As
> >>>>>>>>> such,
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>> needs to track the most recent "version" (the
> >>>>> offset in
> >>>>>>>> the
> >>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>>> partition) of each key. Therefore, it should have
> >>>>> the
> >>>>>>> same
> >>>>>>>>>>> number
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> keys
> >>>>>>>>>>>>>>>>>>>> as the source table at all times.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I see that you are aware of KIP-258, which I think
> >>>>>>> might
> >>>>>>>> be
> >>>>>>>>>>>> relevant
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> couple of ways. One: it's just about storing the
> >>>>>>> timestamp
> >>>>>>>>> in
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> store, but the ultimate idea is to effectively use
> >>>>> the
> >>>>>>>>>> timestamp
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>> OCC
> >>>>>>>>>>>>>>>>>>>> "version" to drop disordered updates. You wouldn't
> >>>>>>> want to
> >>>>>>>>> use
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> timestamp for this operation, but if you were to
> >>>>> use a
> >>>>>>>>> similar
> >>>>>>>>>>>>>>>>>>> mechanism to
> >>>>>>>>>>>>>>>>>>>> store the source offset in the store alongside the
> >>>>>>>> re-keyed
> >>>>>>>>>>>> values,
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>> you could avoid a separate table.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 6. You and Jan have been thinking about this for a
> >>>>> long
> >>>>>>>>> time,
> >>>>>>>>>> so
> >>>>>>>>>>>>>> I've
> >>>>>>>>>>>>>>>>>>>> probably missed something here, but I'm wondering
> >>>>> if we
> >>>>>>>> can
> >>>>>>>>>>> avoid
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> HWM
> >>>>>>>>>>>>>>>>>>>> tracking at all and resolve out-of-order during a
> >>>>> final
> >>>>>>>> join
> >>>>>>>>>>>>>>>>> instead...
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Let's say we're joining a left table (Integer K:
> >>>>> Letter
> >>>>>>>> FK,
> >>>>>>>>>>> (other
> >>>>>>>>>>>>>>>>>>> data))
> >>>>>>>>>>>>>>>>>>>> to a right table (Letter K: (some data)).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Left table:
> >>>>>>>>>>>>>>>>>>>> 1: (A, xyz)
> >>>>>>>>>>>>>>>>>>>> 2: (B, asd)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Right table:
> >>>>>>>>>>>>>>>>>>>> A: EntityA
> >>>>>>>>>>>>>>>>>>>> B: EntityB
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We could do a rekey as you proposed with a combined
> >>>>>>> key,
> >>>>>>>> but
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> propagating the value at all..
> >>>>>>>>>>>>>>>>>>>> Rekey table:
> >>>>>>>>>>>>>>>>>>>> A-1: (dummy value)
> >>>>>>>>>>>>>>>>>>>> B-2: (dummy value)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Which we then join with the right table to produce:
> >>>>>>>>>>>>>>>>>>>> A-1: EntityA
> >>>>>>>>>>>>>>>>>>>> B-2: EntityB
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Which gets rekeyed back:
> >>>>>>>>>>>>>>>>>>>> 1: A, EntityA
> >>>>>>>>>>>>>>>>>>>> 2: B, EntityB
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> And finally we do the actual join:
> >>>>>>>>>>>>>>>>>>>> Result table:
> >>>>>>>>>>>>>>>>>>>> 1: ((A, xyz), EntityA)
> >>>>>>>>>>>>>>>>>>>> 2: ((B, asd), EntityB)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The thing is that in that last join, we have the
> >>>>>>>> opportunity
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> compare
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> current FK in the left table with the incoming PK of
> >>>>>>> the
> >>>>>>>>> right
> >>>>>>>>>>>>>>>>> table. If
> >>>>>>>>>>>>>>>>>>>> they don't match, we just drop the event, since it
> >>>>>>> must be
> >>>>>>>>>>>> outdated.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In your KIP, you gave an example in which (1: A,
> >>>>> xyz)
> >>>>>>> gets
> >>>>>>>>>>> updated
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> (1:
> >>>>>>>>>>>>>>>>>>>> B, xyz), ultimately yielding a conundrum about
> >>>>> whether
> >>>>>>> the
> >>>>>>>>>> final
> >>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> should be (1: null) or (1: joined-on-B). With the
> >>>>>>>> algorithm
> >>>>>>>>>>> above,
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
> >>>>>>> (B,
> >>>>>>>>> xyz),
> >>>>>>>>>>> (B,
> >>>>>>>>>>>>>>>>>>>> EntityB)). It seems like this does give you enough
> >>>>>>>>> information
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> right choice, regardless of disordering.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Will check Adams patch, but this should work. As
> >>>>>>> mentioned
> >>>>>>>>>> often
> >>>>>>>>>>> I
> >>>>>>>>>>>> am
> >>>>>>>>>>>>>>>>>>> not convinced on partitioning back for the user
> >>>>>>>>> automatically.
> >>>>>>>>>> I
> >>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>> this is the real performance eater ;)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 7. Last thought... I'm a little concerned about the
> >>>>>>>>>> performance
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> range scans when records change in the right table.
> >>>>>>> You've
> >>>>>>>>>> said
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> you've
> >>>>>>>>>>>>>>>>>>>> been using the algorithm you presented in production
> >>>>>>> for a
> >>>>>>>>>>> while.
> >>>>>>>>>>>>>> Can
> >>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> give us a sense of the performance characteristics
> >>>>>>> you've
> >>>>>>>>>>>> observed?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Make it work, make it fast, make it beautiful. The
> >>>>>>> topmost
> >>>>>>>>>> thing
> >>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> / was correctness. In practice I do not measure the
> >>>>>>>>> performance
> >>>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> range scan. Usual cases I run this with is emitting
> >>>>>>> 500k -
> >>>>>>>>> 1kk
> >>>>>>>>>>> rows
> >>>>>>>>>>>>>>>>>>> on a left hand side change. The range scan is just
> >>>>> the
> >>>>>>> work
> >>>>>>>>> you
> >>>>>>>>>>>> gotta
> >>>>>>>>>>>>>>>>>>> do, also when you pack your data into different
> >>>>> formats,
> >>>>>>>>>> usually
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> rocks performance is very tight to the size of the
> >>>>> data
> >>>>>>> and
> >>>>>>>>> we
> >>>>>>>>>>>> can't
> >>>>>>>>>>>>>>>>>>> really change that. It is more important for users to
> >>>>>>>> prevent
> >>>>>>>>>>>> useless
> >>>>>>>>>>>>>>>>>>> updates to begin with. My left hand side is guarded
> >>>>> to
> >>>>>>> drop
> >>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> are not going to change my join output.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> usually it's:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> drop unused fields and then don't forward if
> >>>>>>>> old.equals(new)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> regarding to the performance of creating an iterator
> >>>>> for
> >>>>>>>>>> smaller
> >>>>>>>>>>>>>>>>>>> fanouts, users can still just do a group by first
> >>>>> then
> >>>>>>>>> anyways.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I could only think of one alternative, but I'm not
> >>>>>>> sure if
> >>>>>>>>>> it's
> >>>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>> worse... If the first re-key only needs to preserve
> >>>>> the
> >>>>>>>>>> original
> >>>>>>>>>>>>>> key,
> >>>>>>>>>>>>>>>>>>> as I
> >>>>>>>>>>>>>>>>>>>> proposed in #6, then we could store a vector of
> >>>>> keys in
> >>>>>>>> the
> >>>>>>>>>>> value:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Left table:
> >>>>>>>>>>>>>>>>>>>> 1: A,...
> >>>>>>>>>>>>>>>>>>>> 2: B,...
> >>>>>>>>>>>>>>>>>>>> 3: A,...
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Gets re-keyed:
> >>>>>>>>>>>>>>>>>>>> A: [1, 3]
> >>>>>>>>>>>>>>>>>>>> B: [2]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Then, the rhs part of the join would only need a
> >>>>>>> regular
> >>>>>>>>>>>> single-key
> >>>>>>>>>>>>>>>>>>> lookup.
> >>>>>>>>>>>>>>>>>>>> Of course we have to deal with the problem of large
> >>>>>>>> values,
> >>>>>>>>> as
> >>>>>>>>>>>>>>>>> there's
> >>>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>> bound on the number of lhs records that can
> >>>>> reference
> >>>>>>> rhs
> >>>>>>>>>>> records.
> >>>>>>>>>>>>>>>>>>> Offhand,
> >>>>>>>>>>>>>>>>>>>> I'd say we could page the values, so when one row is
> >>>>>>> past
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> threshold, we
> >>>>>>>>>>>>>>>>>>>> append the key for the next page. Then in most
> >>>>> cases,
> >>>>>>> it
> >>>>>>>>> would
> >>>>>>>>>>> be
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>> key lookup, but for large fan-out updates, it would
> >>>>> be
> >>>>>>> one
> >>>>>>>>> per
> >>>>>>>>>>>> (max
> >>>>>>>>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>>>>>> size)/(avg lhs key size).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> This seems more complex, though... Plus, I think
> >>>>>>> there's
> >>>>>>>>> some
> >>>>>>>>>>>> extra
> >>>>>>>>>>>>>>>>>>>> tracking we'd need to do to know when to emit a
> >>>>>>>> retraction.
> >>>>>>>>>> For
> >>>>>>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>>>>>>> when record 1 is deleted, the re-key table would
> >>>>> just
> >>>>>>> have
> >>>>>>>>> (A:
> >>>>>>>>>>>> [3]).
> >>>>>>>>>>>>>>>>>>> Some
> >>>>>>>>>>>>>>>>>>>> kind of tombstone is needed so that the join result
> >>>>>>> for 1
> >>>>>>>>> can
> >>>>>>>>>>> also
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> retracted.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> That's all!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks so much to both Adam and Jan for the
> >>>>> thoughtful
> >>>>>>>> KIP.
> >>>>>>>>>>> Sorry
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> discussion has been slow.
> >>>>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> >>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Id say you can just call the vote.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> that happens all the time, and if something comes
> >>>>> up,
> >>>>>>> it
> >>>>>>>>> just
> >>>>>>>>>>>> goes
> >>>>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>>>> to discuss.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> would not expect to much attention with another
> >>>>>>> another
> >>>>>>>>> email
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> best Jan
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>> Hello Contributors
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I know that 2.1 is about to be released, but I do
> >>>>>>> need
> >>>>>>>> to
> >>>>>>>>>> bump
> >>>>>>>>>>>>>>>>> this to
> >>>>>>>>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>>>>>> visibility up. I am still intending to push this
> >>>>>>> through
> >>>>>>>>>> once
> >>>>>>>>>>>>>>>>>>> contributor
> >>>>>>>>>>>>>>>>>>>>>> feedback is given.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Main points that need addressing:
> >>>>>>>>>>>>>>>>>>>>>> 1) Any way (or benefit) in structuring the current
> >>>>>>>>> singular
> >>>>>>>>>>>> graph
> >>>>>>>>>>>>>>>>> node
> >>>>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>> multiple nodes? It has a whopping 25 parameters
> >>>>> right
> >>>>>>>>> now. I
> >>>>>>>>>>> am
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>>> fuzzy
> >>>>>>>>>>>>>>>>>>>>>> on how the optimizations are supposed to work, so
> >>>>> I
> >>>>>>>> would
> >>>>>>>>>>>>>>>>> appreciate
> >>>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>> help on this aspect.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 2) Overall strategy for joining + resolving. This
> >>>>>>> thread
> >>>>>>>>> has
> >>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>> discourse
> >>>>>>>>>>>>>>>>>>>>>> between Jan and I between the current highwater
> >>>>> mark
> >>>>>>>>>> proposal
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> groupBy
> >>>>>>>>>>>>>>>>>>>>>> + reduce proposal. I am of the opinion that we
> >>>>> need
> >>>>>>> to
> >>>>>>>>>>> strictly
> >>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>> chance of out-of-order data and leave none of it
> >>>>> up
> >>>>>>> to
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> consumer.
> >>>>>>>>>>>>>>>>>>> Any
> >>>>>>>>>>>>>>>>>>>>>> comments or suggestions here would also help.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 3) Anything else that you see that would prevent
> >>>>> this
> >>>>>>>> from
> >>>>>>>>>>>> moving
> >>>>>>>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>>>> vote?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> >>>>>>>>>>>>>>>>>>>>> adam.bellemare@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Jan
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> With the Stores.windowStoreBuilder and
> >>>>>>>>>>>>>>>>> Stores.persistentWindowStore,
> >>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>> actually only need to specify the amount of
> >>>>> segments
> >>>>>>>> you
> >>>>>>>>>> want
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>> large
> >>>>>>>>>>>>>>>>>>>>>>> they are. To the best of my understanding, what
> >>>>>>> happens
> >>>>>>>>> is
> >>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> segments are automatically rolled over as new
> >>>>> data
> >>>>>>> with
> >>>>>>>>> new
> >>>>>>>>>>>>>>>>>>> timestamps
> >>>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>> created. We use this exact functionality in some
> >>>>> of
> >>>>>>> the
> >>>>>>>>>> work
> >>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>>>>>>>> internally at my company. For reference, this is
> >>>>> the
> >>>>>>>>>> hopping
> >>>>>>>>>>>>>>>>> windowed
> >>>>>>>>>>>>>>>>>>>>> store.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> In the code that I have provided, there are going
> >>>>>>> to be
> >>>>>>>>> two
> >>>>>>>>>>> 24h
> >>>>>>>>>>>>>>>>>>>>> segments.
> >>>>>>>>>>>>>>>>>>>>>>> When a record is put into the windowStore, it
> >>>>> will
> >>>>>>> be
> >>>>>>>>>>> inserted
> >>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>>> T in
> >>>>>>>>>>>>>>>>>>>>>>> both segments. The two segments will always
> >>>>> overlap
> >>>>>>> by
> >>>>>>>>> 12h.
> >>>>>>>>>>> As
> >>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>>> goes on
> >>>>>>>>>>>>>>>>>>>>>>> and new records are added (say at time T+12h+),
> >>>>> the
> >>>>>>>>> oldest
> >>>>>>>>>>>>>> segment
> >>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> automatically deleted and a new segment created.
> >>>>> The
> >>>>>>>>>> records
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>>>>> inserted with the context.timestamp(), such that
> >>>>> it
> >>>>>>> is
> >>>>>>>>> the
> >>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>> the clock time, which is used.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> To the best of my understanding, the timestamps
> >>>>> are
> >>>>>>>>>> retained
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>> restoring from the changelog.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Basically, this is heavy-handed way to deal with
> >>>>> TTL
> >>>>>>>> at a
> >>>>>>>>>>>>>>>>>>> segment-level,
> >>>>>>>>>>>>>>>>>>>>>>> instead of at an individual record level.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> >>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Will that work? I expected it to blow up with
> >>>>>>>>>>>> ClassCastException
> >>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>> similar.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> You either would have to specify the window you
> >>>>>>>>> fetch/put
> >>>>>>>>>> or
> >>>>>>>>>>>>>>>>> iterate
> >>>>>>>>>>>>>>>>>>>>>>>> across all windows the key was found in right?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I just hope the window-store doesn't check
> >>>>>>> stream-time
> >>>>>>>>>> under
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> hoods
> >>>>>>>>>>>>>>>>>>>>>>>> that would be a questionable interface.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> If it does: did you see my comment on checking
> >>>>> all
> >>>>>>> the
> >>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>> earlier?
> >>>>>>>>>>>>>>>>>>>>>>>> that would be needed to actually give reasonable
> >>>>>>> time
> >>>>>>>>>>>> gurantees.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Best
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
> >>>>>>> changed
> >>>>>>>>> the
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>> the ProcessorSupplier.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the information. This is indeed
> >>>>>>>> something
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>>> will be
> >>>>>>>>>>>>>>>>>>>>>>>>>>> extremely
> >>>>>>>>>>>>>>>>>>>>>>>>>>> useful for this KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your explanations. That being
> >>>>> said, I
> >>>>>>>> will
> >>>>>>>>>> not
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> moving
> >>>>>>>>>>>>>>>>>>>>>>>> ahead
> >>>>>>>>>>>>>>>>>>>>>>>>>>> with an implementation using
> >>>>> reshuffle/groupBy
> >>>>>>>>> solution
> >>>>>>>>>>> as
> >>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>> propose.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> That being said, if you wish to implement it
> >>>>>>>> yourself
> >>>>>>>>>> off
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>> current PR
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and submit it as a competitive alternative, I
> >>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>>>> happy to
> >>>>>>>>>>>>>>>>>>>>>>>>>>> help vet that as an alternate solution. As it
> >>>>>>>> stands
> >>>>>>>>>>> right
> >>>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>> I do
> >>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>> really have more time to invest into
> >>>>>>> alternatives
> >>>>>>>>>> without
> >>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>> strong indication from the binding voters
> >>>>> which
> >>>>>>>> they
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> prefer.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hey, total no worries. I think I personally
> >>>>> gave
> >>>>>>> up
> >>>>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> streams
> >>>>>>>>>>>>>>>>>>>>> DSL
> >>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>> some time already, otherwise I would have
> >>>>> pulled
> >>>>>>>> this
> >>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>>>>>>>>> already.
> >>>>>>>>>>>>>>>>>>>>>>>>>> I am currently reimplementing my own DSL
> >>>>> based on
> >>>>>>>>> PAPI.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I will look at finishing up my PR with the
> >>>>>>> windowed
> >>>>>>>>>> state
> >>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>>>>>>>>>> week or so, exercising it via tests, and
> >>>>> then I
> >>>>>>>> will
> >>>>>>>>>> come
> >>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> final
> >>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. In the meantime, I hope that
> >>>>> any of
> >>>>>>>> the
> >>>>>>>>>>>> binding
> >>>>>>>>>>>>>>>>>>> voters
> >>>>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
> >>>>>>> updated
> >>>>>>>> it
> >>>>>>>>>>>>>>>>> according
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> latest plan:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I have also updated the KIP PR to use a
> >>>>> windowed
> >>>>>>>>> store.
> >>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever
> >>>>> they
> >>>>>>>> are
> >>>>>>>>>>>>>>>>> completed.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
> >>>>>>>> already
> >>>>>>>>>>>> updated
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> PR?
> >>>>>>>>>>>>>>>>>>>>>>>>>> expected it to change to Windowed<K>,Long
> >>>>> Missing
> >>>>>>>>>>> something?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang
> >>>>> Wang <
> >>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533
> >>>>> is
> >>>>>>> the
> >>>>>>>>>> wrong
> >>>>>>>>>>>>>> link,
> >>>>>>>>>>>>>>>>>>> as it
> >>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
> >>>>>>> part of
> >>>>>>>>>>> KIP-258
> >>>>>>>>>>>>>>>>> we do
> >>>>>>>>>>>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> have "handling out-of-order data for source
> >>>>>>>> KTable"
> >>>>>>>>>> such
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> instead of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> blindly apply the updates to the
> >>>>> materialized
> >>>>>>>> store,
> >>>>>>>>>>> i.e.
> >>>>>>>>>>>>>>>>>>> following
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering, we will reject updates that are
> >>>>> older
> >>>>>>>> than
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>>>> key's
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps, i.e. following timestamp
> >>>>> ordering.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
> >>>>>>> Wang <
> >>>>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Adam,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
> >>>>>>> final
> >>>>>>>>> step
> >>>>>>>>>>>> (i.e.
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> high
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
> >>>>>>> with
> >>>>>>>> a
> >>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>> store),
> >>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> another current on-going KIP may actually
> >>>>>>> help:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is for adding the timestamp into a
> >>>>>>> key-value
> >>>>>>>>>> store
> >>>>>>>>>>>>>>>>> (i.e.
> >>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
> >>>>>>> usage,
> >>>>>>>> as
> >>>>>>>>>>>>>>>>> described
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533
> >>>>>>>> ,
> >>>>>>>>> is
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "reject" updates from the source topics if
> >>>>> its
> >>>>>>>>>>> timestamp
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> smaller
> >>>>>>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current key's latest update timestamp.
> >>>>> I
> >>>>>>>> think
> >>>>>>>>> it
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>>>> similar to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> what you have in mind for high watermark
> >>>>> based
> >>>>>>>>>>> filtering,
> >>>>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to make sure that the timestamps of
> >>>>> the
> >>>>>>>>> joining
> >>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> correctly
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> inherited though the whole topology to the
> >>>>>>> final
> >>>>>>>>>> stage.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Note that this KIP is for key-value store
> >>>>> and
> >>>>>>>> hence
> >>>>>>>>>>>>>>>>>>> non-windowed
> >>>>>>>>>>>>>>>>>>>>>>>> KTables
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
> >>>>>>> really
> >>>>>>>>> have
> >>>>>>>>>> a
> >>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> their joins anyways (
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think we can just consider non-windowed
> >>>>>>>>> KTable-KTable
> >>>>>>>>>>>>>>>>> non-key
> >>>>>>>>>>>>>>>>>>>>> joins
> >>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan
> >>>>> Filipiak
> >>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current highwater mark implementation
> >>>>> would
> >>>>>>>> grow
> >>>>>>>>>>>>>> endlessly
> >>>>>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary key of original event. It is a
> >>>>> pair
> >>>>>>> of
> >>>>>>>>>> (<this
> >>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <highest offset seen for that key>). This
> >>>>> is
> >>>>>>> used
> >>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> differentiate
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
> >>>>>>> proposal
> >>>>>>>>>> would
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> replace
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
> >>>>>>> This
> >>>>>>>>> would
> >>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on
> >>>>> time.
> >>>>>>> This
> >>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
> >>>>>>>> should
> >>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> customizable
> >>>>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
> >>>>>>> perhaps
> >>>>>>>>> just
> >>>>>>>>>>> 10
> >>>>>>>>>>>>>>>>>>> minutes
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or perhaps 7 days...).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can
> >>>>> do
> >>>>>>> the
> >>>>>>>>>> trick
> >>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>> Even
> >>>>>>>>>>>>>>>>>>>>>>>> if I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would still like to see the automatic
> >>>>>>>>> repartitioning
> >>>>>>>>>>>>>>>>> optional
> >>>>>>>>>>>>>>>>>>>>>>>> since I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
> >>>>>>> am a
> >>>>>>>>>> little
> >>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>>>>>> sceptical
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> how to determine the window. So esentially
> >>>>> one
> >>>>>>>>> could
> >>>>>>>>>>> run
> >>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>>>> problems
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the rapid change happens near a window
> >>>>>>> border. I
> >>>>>>>>> will
> >>>>>>>>>>>> check
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in detail, if its
> >>>>>>> problematic, we
> >>>>>>>>>> could
> >>>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>>>> check
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> _all_
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows on read with not to bad
> >>>>> performance
> >>>>>>>>> impact I
> >>>>>>>>>>>>>> guess.
> >>>>>>>>>>>>>>>>>>> Will
> >>>>>>>>>>>>>>>>>>>>>>>> let
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know if the implementation would be
> >>>>> correct
> >>>>>>> as
> >>>>>>>>> is. I
> >>>>>>>>>>>>>>>>> wouldn't
> >>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
> >>>>>>>>> timestamp(A)  <
> >>>>>>>>>>>>>>>>>>>>> timestamp(B).
> >>>>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we can't expect that.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I believe I understand what you mean now
> >>>>> -
> >>>>>>>> thanks
> >>>>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> diagram, it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did really help. You are correct that I
> >>>>> do
> >>>>>>> not
> >>>>>>>>> have
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> key available, and I can see that if it was
> >>>>>>>>> available
> >>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>> would be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> able to add and remove events from the
> >>>>> Map.
> >>>>>>>> That
> >>>>>>>>>>> being
> >>>>>>>>>>>>>>>>> said,
> >>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> encourage
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> you to finish your diagrams / charts just
> >>>>> for
> >>>>>>>>> clarity
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> everyone
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> else.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just
> >>>>> really
> >>>>>>> hard
> >>>>>>>>>> work.
> >>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
> >>>>>>>>> original
> >>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>> key,
> >>>>>>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join and Group by implemented our own in
> >>>>> PAPI
> >>>>>>>> and
> >>>>>>>>>>>>>> basically
> >>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely
> >>>>> missed
> >>>>>>>> that
> >>>>>>>>> in
> >>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>> DSL
> >>>>>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
> >>>>>>> up on
> >>>>>>>>> my
> >>>>>>>>>>> end.
> >>>>>>>>>>>>>>>>> Will
> >>>>>>>>>>>>>>>>>>>>>>>> finish
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
> >>>>>>> week.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My follow up question for you is, won't
> >>>>> the
> >>>>>>> Map
> >>>>>>>>> stay
> >>>>>>>>>>>>>> inside
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> State
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Store indefinitely after all of the
> >>>>> changes
> >>>>>>>> have
> >>>>>>>>>>>>>>>>> propagated?
> >>>>>>>>>>>>>>>>>>>>> Isn't
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
> >>>>>>> state
> >>>>>>>>>> store?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
> >>>>>>> substractor
> >>>>>>>> is
> >>>>>>>>>>> gonna
> >>>>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>>>>>>>> `null`
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
> >>>>>>> there
> >>>>>>>> is
> >>>>>>>>>>> going
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use
> >>>>> this
> >>>>>>>> store
> >>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues()
> >>>>> is a
> >>>>>>>>>> regular
> >>>>>>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>>>> satisfying
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
> >>>>>>> join.
> >>>>>>>>> The
> >>>>>>>>>>>>>>>>> Windowed
> >>>>>>>>>>>>>>>>>>>>>>>> store is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> keeping the values, so for the next
> >>>>> statefull
> >>>>>>>>>> operation
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
> >>>>>>> have
> >>>>>>>> the
> >>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the values then.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Long story short. if we can flip in a
> >>>>> custom
> >>>>>>>> group
> >>>>>>>>>> by
> >>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioning to the original primary
> >>>>> key i
> >>>>>>>> think
> >>>>>>>>>> it
> >>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> big time in building efficient apps. Given
> >>>>> the
> >>>>>>>>>> original
> >>>>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> issue I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand that we do not have a solid
> >>>>>>> foundation
> >>>>>>>>> to
> >>>>>>>>>>>> build
> >>>>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Leaving primary key carry along to the
> >>>>> user.
> >>>>>>>> very
> >>>>>>>>>>>>>>>>>>> unfortunate. I
> >>>>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand the decision goes like that. I
> >>>>> do
> >>>>>>> not
> >>>>>>>>>> think
> >>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> decision.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM,
> >>>>> Prajakta
> >>>>>>>>> Dumbre <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com <mailto:
> >>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             please remove me from this
> >>>>> group
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             On Tue, Sep 11, 2018 at 1:29 PM
> >>>>>>> Jan
> >>>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <Jan.Filipiak@trivago.com
> >>>>>>> <mailto:
> >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Hi Adam,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > give me some time, will make
> >>>>>>> such a
> >>>>>>>>>>> chart.
> >>>>>>>>>>>>>> last
> >>>>>>>>>>>>>>>>>>> time i
> >>>>>>>>>>>>>>>>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             get along
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > well with giphy and ruined
> >>>>> all
> >>>>>>> your
> >>>>>>>>>>> charts.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Hopefully i can get it done
> >>>>>>> today
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > On 08.09.2018 16:00, Adam
> >>>>>>> Bellemare
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > Hi Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > I have included a diagram
> >>>>> of
> >>>>>>>> what I
> >>>>>>>>>>>>>> attempted
> >>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > I attempted this back at
> >>>>> the
> >>>>>>>> start
> >>>>>>>>> of
> >>>>>>>>>>> my
> >>>>>>>>>>>> own
> >>>>>>>>>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > solution, and since I could
> >>>>>>> not
> >>>>>>>> get
> >>>>>>>>>> it
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> work I
> >>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             discarded the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > code. At this point in
> >>>>> time,
> >>>>>>> if
> >>>>>>>> you
> >>>>>>>>>>> wish
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> continue
> >>>>>>>>>>>>>>>>>>>>>>>> pursuing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             for your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > groupBy solution, I ask
> >>>>> that
> >>>>>>> you
> >>>>>>>>>> please
> >>>>>>>>>>>>>>>>> create a
> >>>>>>>>>>>>>>>>>>>>>>>> diagram on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the KIP
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > carefully explaining your
> >>>>>>>> solution.
> >>>>>>>>>>>> Please
> >>>>>>>>>>>>>>>>> feel
> >>>>>>>>>>>>>>>>>>> free
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the image I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > just posted as a starting
> >>>>>>> point.
> >>>>>>>> I
> >>>>>>>>> am
> >>>>>>>>>>>> having
> >>>>>>>>>>>>>>>>>>> trouble
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             understanding your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > explanations but I think
> >>>>> that
> >>>>>>> a
> >>>>>>>>>>> carefully
> >>>>>>>>>>>>>>>>>>> constructed
> >>>>>>>>>>>>>>>>>>>>>>>> diagram
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             will clear
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > up
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > any misunderstandings.
> >>>>>>>> Alternately,
> >>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>> post a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             comprehensive PR with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > your solution. I can only
> >>>>>>> guess
> >>>>>>>> at
> >>>>>>>>>> what
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>> mean, and
> >>>>>>>>>>>>>>>>>>>>>>>> since I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             value my
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > own
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > time as much as you value
> >>>>>>> yours,
> >>>>>>>> I
> >>>>>>>>>>>> believe
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             responsibility to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > provide an implementation
> >>>>>>> instead
> >>>>>>>>> of
> >>>>>>>>>> me
> >>>>>>>>>>>>>>>>> trying to
> >>>>>>>>>>>>>>>>>>>>> guess.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > On Sat, Sep 8, 2018 at 8:00
> >>>>>>> AM,
> >>>>>>>> Jan
> >>>>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <Jan.Filipiak@trivago.com
> >>>>>>> <mailto:
> >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> Hi James,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> nice to see you beeing
> >>>>>>>> interested.
> >>>>>>>>>>> kafka
> >>>>>>>>>>>>>>>>>>> streams at
> >>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             point supports
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> all sorts of joins as
> >>>>> long as
> >>>>>>>> both
> >>>>>>>>>>>> streams
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> Adam is currently
> >>>>>>> implementing a
> >>>>>>>>>> join
> >>>>>>>>>>>>>> where a
> >>>>>>>>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             KTable can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> a one to many relation
> >>>>> ship
> >>>>>>>> (1:n).
> >>>>>>>>>> We
> >>>>>>>>>>>>>> exploit
> >>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> rocksdb
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> datastore that keeps data
> >>>>>>> sorted
> >>>>>>>> (At
> >>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>>> exposes an
> >>>>>>>>>>>>>>>>>>>>>>>> API to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             access the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> stored data in a sorted
> >>>>>>>> fashion).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> I think the technical
> >>>>> caveats
> >>>>>>>> are
> >>>>>>>>>> well
> >>>>>>>>>>>>>>>>>>> understood
> >>>>>>>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > basically
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> down to philosophy and API
> >>>>>>>> Design
> >>>>>>>>> (
> >>>>>>>>>>> when
> >>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>> sees
> >>>>>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>> newest
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             message).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> I have a lengthy track
> >>>>>>> record of
> >>>>>>>>>>> loosing
> >>>>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>>> kinda
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             arguments within
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> streams community and I
> >>>>> have
> >>>>>>> no
> >>>>>>>>> clue
> >>>>>>>>>>>> why.
> >>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> literally
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             can't wait for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> to churn through this
> >>>>> thread
> >>>>>>> and
> >>>>>>>>>> give
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>> opinion on
> >>>>>>>>>>>>>>>>>>>>>>>> how we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             should
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > design
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> the return type of the
> >>>>>>>>> oneToManyJoin
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>>>>>>>> power we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             want to give
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> the user vs "simplicity"
> >>>>>>> (where
> >>>>>>>>>>>> simplicity
> >>>>>>>>>>>>>>>>> isn't
> >>>>>>>>>>>>>>>>>>>>>>>> really that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             as users
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > still
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> need to understand it I
> >>>>>>> argue)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> waiting for you to join
> >>>>> in on
> >>>>>>>> the
> >>>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> Best Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> On 07.09.2018 15:49, James
> >>>>>>> Kwan
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> I am new to this group
> >>>>> and I
> >>>>>>>>> found
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> subject
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             interesting.  Sounds
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> you guys want to
> >>>>> implement a
> >>>>>>>> join
> >>>>>>>>>>>> table of
> >>>>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>>>>>>>> streams? Is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > somewhere
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> I can see the original
> >>>>>>>>> requirement
> >>>>>>>>>> or
> >>>>>>>>>>>>>>>>> proposal?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> On Sep 7, 2018, at 8:13
> >>>>> AM,
> >>>>>>> Jan
> >>>>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <Jan.Filipiak@trivago.com
> >>>>>>> <mailto:
> >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> On 05.09.2018 22:17,
> >>>>> Adam
> >>>>>>>>>> Bellemare
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> I'm currently testing
> >>>>>>> using a
> >>>>>>>>>>>> Windowed
> >>>>>>>>>>>>>>>>> Store
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> store the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             highwater
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> mark.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> By all indications this
> >>>>>>>> should
> >>>>>>>>>> work
> >>>>>>>>>>>>>> fine,
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> caveat
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             being that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> only resolve
> >>>>> out-of-order
> >>>>>>>>> arrival
> >>>>>>>>>>>> for up
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> size of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the window
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > (ie:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> 24h, 72h, etc). This
> >>>>> would
> >>>>>>>>> remove
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> possibility
> >>>>>>>>>>>>>>>>>>>>>>>> of it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > unbounded
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> size.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> With regards to Jan's
> >>>>>>>>>> suggestion, I
> >>>>>>>>>>>>>>>>> believe
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             we will
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> remain in disagreement.
> >>>>>>>> While I
> >>>>>>>>>> do
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>> disagree
> >>>>>>>>>>>>>>>>>>>>>>>> with your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             statement
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> there likely to be
> >>>>>>> additional
> >>>>>>>>>> joins
> >>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>>>>>>>>>>> real-world
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             workflow, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > do
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> see how you can
> >>>>>>> conclusively
> >>>>>>>>> deal
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arrival
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> foreign-key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> changes and subsequent
> >>>>>>>> joins. I
> >>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> attempted
> >>>>>>>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             think you have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> proposed (without a
> >>>>>>>> high-water,
> >>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>> groupBy and
> >>>>>>>>>>>>>>>>>>>>>>>> reduce)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             and found
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> that if
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> the foreign key changes
> >>>>>>> too
> >>>>>>>>>>> quickly,
> >>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> load
> >>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             stream thread
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> too
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> high, the joined
> >>>>> messages
> >>>>>>>> will
> >>>>>>>>>>> arrive
> >>>>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>>>> and be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             incorrectly
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> propagated, such that
> >>>>> an
> >>>>>>>>>>> intermediate
> >>>>>>>>>>>>>>>>> event
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> represented
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             as the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > final
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> event.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> Can you shed some light
> >>>>> on
> >>>>>>>> your
> >>>>>>>>>>>> groupBy
> >>>>>>>>>>>>>>>>>>>>>>>> implementation.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             There must be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> some sort of flaw in it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> I have a suspicion
> >>>>> where it
> >>>>>>>> is,
> >>>>>>>>> I
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             confirm. The idea
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> is bullet proof and it
> >>>>>>> must be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> an implementation mess
> >>>>> up.
> >>>>>>> I
> >>>>>>>>> would
> >>>>>>>>>>>> like
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> clarify
> >>>>>>>>>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             we draw a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> conclusion.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>    Repartitioning the
> >>>>>>>> scattered
> >>>>>>>>>>> events
> >>>>>>>>>>>>>>>>> back to
> >>>>>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> partitions is the only
> >>>>> way I
> >>>>>>>> know
> >>>>>>>>>> how
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> conclusively
> >>>>>>>>>>>>>>>>>>>>>>>> deal
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> out-of-order events in
> >>>>> a
> >>>>>>>> given
> >>>>>>>>>> time
> >>>>>>>>>>>>>> frame,
> >>>>>>>>>>>>>>>>>>> and to
> >>>>>>>>>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             that the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > data
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> eventually consistent
> >>>>> with
> >>>>>>>> the
> >>>>>>>>>>> input
> >>>>>>>>>>>>>>>>> events.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> If you have some code
> >>>>> to
> >>>>>>>> share
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> illustrates
> >>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             approach, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> very grateful as it
> >>>>> would
> >>>>>>>>> remove
> >>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>>>> misunderstandings
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             that I may
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> ah okay you were looking
> >>>>>>> for
> >>>>>>>> my
> >>>>>>>>>>> code.
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             something easily
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> readable here as its
> >>>>>>> bloated
> >>>>>>>>> with
> >>>>>>>>>>>>>>>>> OO-patterns.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> its anyhow trivial:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> @Override
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>      public T apply(K
> >>>>>>> aggKey,
> >>>>>>>> V
> >>>>>>>>>>>> value, T
> >>>>>>>>>>>>>>>>>>>>> aggregate)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>      {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          Map<U, V>
> >>>>>>>>>>> currentStateAsMap =
> >>>>>>>>>>>>>>>>>>>>>>>> asMap(aggregate);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <<
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             imaginary
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          U toModifyKey =
> >>>>>>>>>>>>>>>>> mapper.apply(value);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << this is
> >>>>> the
> >>>>>>>>> place
> >>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             gonna have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > issues
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> and why you probably
> >>>>>>> couldn't
> >>>>>>>> do
> >>>>>>>>>> it.
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>>> to find
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             a solution
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > here.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> I didn't realize that
> >>>>> yet.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << we
> >>>>>>> propagate
> >>>>>>>> the
> >>>>>>>>>>>> field in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> joiner, so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             that we can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > pick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> it up in an aggregate.
> >>>>>>>> Probably
> >>>>>>>>>> you
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>> thought
> >>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             this in your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> approach right?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << I am
> >>>>> very
> >>>>>>> open
> >>>>>>>>> to
> >>>>>>>>>>>> find a
> >>>>>>>>>>>>>>>>>>> generic
> >>>>>>>>>>>>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             here. In my
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> honest opinion this is
> >>>>>>> broken
> >>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> KTableImpl.GroupBy
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             looses
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > the keys
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> and only maintains the
> >>>>>>>> aggregate
> >>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << I
> >>>>>>> abstracted
> >>>>>>>> it
> >>>>>>>>>> away
> >>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>> then way
> >>>>>>>>>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> i
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > thinking
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> of oneToMany join. That
> >>>>> is
> >>>>>>>> why I
> >>>>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>>>> realize
> >>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             significance here.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              <<
> >>>>> Opinions?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          for (V m :
> >>>>>>> current)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>> currentStateAsMap.put(mapper.apply(m),
> >>>>>>>>>>>>>> m);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          if (isAdder)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>> currentStateAsMap.put(toModifyKey,
> >>>>>>>>>>>>>> value);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          else
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>> currentStateAsMap.remove(toModifyKey);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>> if(currentStateAsMap.isEmpty()){
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>                  return
> >>>>>>> null;
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          retrun
> >>>>>>>>>>>>>>>>>>> asAggregateType(currentStateAsMap)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>      }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> On Wed, Sep 5, 2018 at
> >>>>>>> 3:35
> >>>>>>>> PM,
> >>>>>>>>>> Jan
> >>>>>>>>>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Jan.Filipiak@trivago.com
> >>>>>>> <mailto:
> >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> Thanks Adam for
> >>>>> bringing
> >>>>>>>>> Matthias
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> speed!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> about the
> >>>>> differences. I
> >>>>>>>> think
> >>>>>>>>>>>>>> re-keying
> >>>>>>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             optional at
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> best.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> I would say we return
> >>>>> a
> >>>>>>>>>>>> KScatteredTable
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> reshuffle()
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             returning
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>> KTable<originalKey,Joined>
> >>>>>>>> to
> >>>>>>>>>> make
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> backwards
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             repartitioning
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> optional.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> I am also in a big
> >>>>>>> favour of
> >>>>>>>>>> doing
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> order
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             processing using
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> by instead high water
> >>>>>>> mark
> >>>>>>>>>>> tracking.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> Just because unbounded
> >>>>>>>> growth
> >>>>>>>>> is
> >>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>> scary
> >>>>>>>>>>>>>>>>>>> + It
> >>>>>>>>>>>>>>>>>>>>>>>> saves
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the header
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> stuff.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> I think the
> >>>>> abstraction
> >>>>>>> of
> >>>>>>>>>> always
> >>>>>>>>>>>>>>>>>>> repartitioning
> >>>>>>>>>>>>>>>>>>>>>>>> back is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             just not so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> strong. Like the work
> >>>>> has
> >>>>>>>> been
> >>>>>>>>>>> done
> >>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             back and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> grouping
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> by something else
> >>>>>>> afterwards
> >>>>>>>>> is
> >>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>>>> common.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> On 05.09.2018 13:49,
> >>>>> Adam
> >>>>>>>>>>> Bellemare
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> Hi Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Thank you for your
> >>>>>>>> feedback,
> >>>>>>>>> I
> >>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>> appreciate
> >>>>>>>>>>>>>>>>>>>>> it!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> While name spacing
> >>>>>>> would be
> >>>>>>>>>>>> possible,
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > deserialize
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> user headers what
> >>>>>>> implies
> >>>>>>>> a
> >>>>>>>>>>>> runtime
> >>>>>>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             suggest to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > no
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> namespace for now to
> >>>>>>> avoid
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > problem in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the future, we can
> >>>>>>> still
> >>>>>>>> add
> >>>>>>>>>>> name
> >>>>>>>>>>>>>>>>> spacing
> >>>>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Agreed. I will go
> >>>>> with
> >>>>>>>>> using a
> >>>>>>>>>>>>>> reserved
> >>>>>>>>>>>>>>>>>>> string
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             document it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> My main concern about
> >>>>>>> the
> >>>>>>>>>> design
> >>>>>>>>>>> it
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> type of
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             result KTable:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > If
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> understood the
> >>>>> proposal
> >>>>>>>>>>> correctly,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> In your example, you
> >>>>>>> have
> >>>>>>>>>> table1
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> table2
> >>>>>>>>>>>>>>>>>>>>>>>> swapped.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             Here is how it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> works
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> currently:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 1) table1 has the
> >>>>>>> records
> >>>>>>>>> that
> >>>>>>>>>>>> contain
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> foreign key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             within their
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> value.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table1 input stream:
> >>>>>>>>>>>> <a,(fk=A,bar=1)>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> <c,(fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table2 input stream:
> >>>>>>> <A,X>,
> >>>>>>>>>> <B,Y>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 2) A Value mapper is
> >>>>>>>> required
> >>>>>>>>>> to
> >>>>>>>>>>>>>> extract
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> foreign
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table1 foreign key
> >>>>>>> mapper:
> >>>>>>>> (
> >>>>>>>>>>> value
> >>>>>>>>>>>> =>
> >>>>>>>>>>>>>>>>>>> value.fk
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <http://value.fk> )
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> The mapper is
> >>>>> applied to
> >>>>>>>> each
> >>>>>>>>>>>> element
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> table1,
> >>>>>>>>>>>>>>>>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             new combined
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> key is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> made:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table1 mapped: <A-a,
> >>>>>>>>>>> (fk=A,bar=1)>,
> >>>>>>>>>>>>>>>>> <A-b,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <B-c,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> (fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 3) The rekeyed events
> >>>>>>> are
> >>>>>>>>>>>>>> copartitioned
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> table2:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> a) Stream Thread with
> >>>>>>>>> Partition
> >>>>>>>>>>> 0:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> RepartitionedTable1:
> >>>>>>> <A-a,
> >>>>>>>>>>>>>>>>> (fk=A,bar=1)>,
> >>>>>>>>>>>>>>>>>>> <A-b,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             (fk=A,bar=2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Table2: <A,X>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> b) Stream Thread with
> >>>>>>>>> Partition
> >>>>>>>>>>> 1:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> RepartitionedTable1:
> >>>>>>> <B-c,
> >>>>>>>>>>>>>> (fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Table2: <B,Y>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 4) From here, they
> >>>>> can
> >>>>>>> be
> >>>>>>>>>> joined
> >>>>>>>>>>>>>>>>> together
> >>>>>>>>>>>>>>>>>>>>> locally
> >>>>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             applying the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> joiner
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> function.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> At this point, Jan's
> >>>>>>> design
> >>>>>>>>> and
> >>>>>>>>>>> my
> >>>>>>>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>>>>>>> deviate. My
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             design goes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> repartition the data
> >>>>>>>>> post-join
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> resolve
> >>>>>>>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             arrival of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> finally returning the
> >>>>>>> data
> >>>>>>>>>> keyed
> >>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> original key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             I do not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> expose
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> CombinedKey or any of
> >>>>>>> the
> >>>>>>>>>>> internals
> >>>>>>>>>>>>>>>>>>> outside of
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             joinOnForeignKey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> function. This does
> >>>>> make
> >>>>>>>> for
> >>>>>>>>>>> larger
> >>>>>>>>>>>>>>>>>>> footprint,
> >>>>>>>>>>>>>>>>>>>>>>>> but it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             removes all
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> agency
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> for resolving
> >>>>>>> out-of-order
> >>>>>>>>>>> arrivals
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> handling
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             CombinedKeys from
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> user. I believe that
> >>>>>>> this
> >>>>>>>>> makes
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             to use.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Let me know if this
> >>>>>>> helps
> >>>>>>>>>> resolve
> >>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>> questions,
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             please feel
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> free to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> add anything else on
> >>>>>>> your
> >>>>>>>>> mind.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Thanks again,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> On Tue, Sep 4, 2018
> >>>>> at
> >>>>>>> 8:36
> >>>>>>>>> PM,
> >>>>>>>>>>>>>>>>> Matthias J.
> >>>>>>>>>>>>>>>>>>>>> Sax <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>> matthias@confluent.io
> >>>>>>>>> <mailto:
> >>>>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> I am just catching
> >>>>> up
> >>>>>>> on
> >>>>>>>>> this
> >>>>>>>>>>>>>> thread. I
> >>>>>>>>>>>>>>>>>>> did
> >>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             everything so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> far,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> but want to share
> >>>>>>> couple
> >>>>>>>> of
> >>>>>>>>>>>> initial
> >>>>>>>>>>>>>>>>>>> thoughts:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Headers: I think
> >>>>> there
> >>>>>>> is
> >>>>>>>> a
> >>>>>>>>>>>>>> fundamental
> >>>>>>>>>>>>>>>>>>>>>>>> difference
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             between header
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> usage
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> in this KIP and
> >>>>> KP-258.
> >>>>>>>> For
> >>>>>>>>>> 258,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>>> headers
> >>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             changelog topic
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> are owned by Kafka
> >>>>>>> Streams
> >>>>>>>>> and
> >>>>>>>>>>>> nobody
> >>>>>>>>>>>>>>>>>>> else is
> >>>>>>>>>>>>>>>>>>>>>>>> supposed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             to write
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > into
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> them. In fact, no
> >>>>> user
> >>>>>>>>> header
> >>>>>>>>>>> are
> >>>>>>>>>>>>>>>>> written
> >>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             changelog topic
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> thus, there are not
> >>>>>>>>> conflicts.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Nevertheless, I
> >>>>> don't
> >>>>>>> see
> >>>>>>>> a
> >>>>>>>>>> big
> >>>>>>>>>>>> issue
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             headers within
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Streams.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> As long as we
> >>>>> document
> >>>>>>> it,
> >>>>>>>>> we
> >>>>>>>>>>> can
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>> "reserved"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             header keys
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> users are not
> >>>>> allowed
> >>>>>>> to
> >>>>>>>> use
> >>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>>>>>> data with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Streams.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> IMHO, this should be
> >>>>>>> ok.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> I think there is a
> >>>>> safe
> >>>>>>>> way
> >>>>>>>>> to
> >>>>>>>>>>>> avoid
> >>>>>>>>>>>>>>>>>>>>> conflicts,
> >>>>>>>>>>>>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > headers
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> only needed in
> >>>>>>> internal
> >>>>>>>>>> topics
> >>>>>>>>>>> (I
> >>>>>>>>>>>>>>>>> think):
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> For internal and
> >>>>>>>> changelog
> >>>>>>>>>>>> topics,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>> namespace
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             all headers:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> * user-defined
> >>>>> headers
> >>>>>>>> are
> >>>>>>>>>>>>>> namespaced
> >>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>> "external."
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             headerKey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> * internal headers
> >>>>> are
> >>>>>>>>>>>> namespaced as
> >>>>>>>>>>>>>>>>>>>>>>>> "internal." +
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             headerKey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> While name spacing
> >>>>>>> would
> >>>>>>>> be
> >>>>>>>>>>>>>> possible,
> >>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> deserialize
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> user headers what
> >>>>>>> implies
> >>>>>>>> a
> >>>>>>>>>>>> runtime
> >>>>>>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             suggest to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > no
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> namespace for now to
> >>>>>>> avoid
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > problem in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the future, we can
> >>>>>>> still
> >>>>>>>> add
> >>>>>>>>>>> name
> >>>>>>>>>>>>>>>>> spacing
> >>>>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> My main concern
> >>>>> about
> >>>>>>> the
> >>>>>>>>>> design
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> type
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             result KTable:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> If I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> understood the
> >>>>> proposal
> >>>>>>>>>>> correctly,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> KTable<K1,V1>
> >>>>> table1 =
> >>>>>>> ...
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> KTable<K2,V2>
> >>>>> table2 =
> >>>>>>> ...
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> KTable<K1,V3>
> >>>>>>> joinedTable
> >>>>>>>> =
> >>>>>>>>>>>>>>>>>>>>>>>> table1.join(table2,...);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> implies that the
> >>>>>>>>> `joinedTable`
> >>>>>>>>>>> has
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>> as the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             left input
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> table.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> IMHO, this does not
> >>>>>>> work
> >>>>>>>>>> because
> >>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>> table2
> >>>>>>>>>>>>>>>>>>>>>>>> contains
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             multiple rows
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> join with a record
> >>>>> in
> >>>>>>>> table1
> >>>>>>>>>>>> (what is
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> main
> >>>>>>>>>>>>>>>>>>>>>>>> purpose
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > foreign
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> join), the result
> >>>>> table
> >>>>>>>>> would
> >>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>> contain a
> >>>>>>>>>>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             join result,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > but
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> multiple.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Example:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> table1 input stream:
> >>>>>>> <A,X>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> table2 input stream:
> >>>>>>>>>> <a,(A,1)>,
> >>>>>>>>>>>>>>>>> <b,(A,2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> We use table2 value
> >>>>> a
> >>>>>>>>> foreign
> >>>>>>>>>>> key
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> table1
> >>>>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             "A" joins).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > If
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> result key is the
> >>>>> same
> >>>>>>> key
> >>>>>>>>> as
> >>>>>>>>>>> key
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> table1,
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             implies that the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> result can either be
> >>>>>>> <A,
> >>>>>>>>>>>> join(X,1)>
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> <A,
> >>>>>>>>>>>>>>>>>>>>>>>> join(X,2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             but not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > both.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Because the share
> >>>>> the
> >>>>>>> same
> >>>>>>>>>> key,
> >>>>>>>>>>>>>>>>> whatever
> >>>>>>>>>>>>>>>>>>>>> result
> >>>>>>>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             we emit
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > later,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> overwrite the
> >>>>> previous
> >>>>>>>>> result.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> This is the reason
> >>>>> why
> >>>>>>> Jan
> >>>>>>>>>>>> originally
> >>>>>>>>>>>>>>>>>>> proposed
> >>>>>>>>>>>>>>>>>>>>>>>> to use
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > combination
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> both primary keys of
> >>>>>>> the
> >>>>>>>>> input
> >>>>>>>>>>>> tables
> >>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             output table.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> makes the keys of
> >>>>> the
> >>>>>>>> output
> >>>>>>>>>>> table
> >>>>>>>>>>>>>>>>> unique
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             store both in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> output table:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Result would be
> >>>>> <A-a,
> >>>>>>>>>>> join(X,1)>,
> >>>>>>>>>>>>>> <A-b,
> >>>>>>>>>>>>>>>>>>>>>>>> join(X,2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Thoughts?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> On 9/4/18 1:36 PM,
> >>>>> Jan
> >>>>>>>>>> Filipiak
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Just on remark here.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> The high-watermark
> >>>>>>> could
> >>>>>>>> be
> >>>>>>>>>>>>>>>>> disregarded.
> >>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>> decision
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             about the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> forward
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> depends on the
> >>>>> size of
> >>>>>>>> the
> >>>>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>>>>> map.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> Only 1 element long
> >>>>>>> maps
> >>>>>>>>>> would
> >>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> unpacked
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             forwarded. 0
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > element
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> maps
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> would be published
> >>>>> as
> >>>>>>>>> delete.
> >>>>>>>>>>> Any
> >>>>>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>> count
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> of map entries is
> >>>>> in
> >>>>>>>>> "waiting
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> correct
> >>>>>>>>>>>>>>>>>>>>>>>> deletes to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > arrive"-state.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> On 04.09.2018
> >>>>> 21:29,
> >>>>>>> Adam
> >>>>>>>>>>>> Bellemare
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> It does look like I
> >>>>>>> could
> >>>>>>>>>>> replace
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             repartition store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> highwater store
> >>>>> with
> >>>>>>> a
> >>>>>>>>>> groupBy
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> reduce.
> >>>>>>>>>>>>>>>>>>>>>>>> However,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             it looks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> still need to
> >>>>> store
> >>>>>>> the
> >>>>>>>>>>>> highwater
> >>>>>>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>>>>>>> within
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             materialized
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> compare the
> >>>>> arrival of
> >>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>>>>>>> (assuming
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> understanding
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> THIS is
> >>>>> correct...).
> >>>>>>> This
> >>>>>>>>> in
> >>>>>>>>>>>> effect
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             design I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> just with the two
> >>>>>>> tables
> >>>>>>>>>> merged
> >>>>>>>>>>>>>>>>> together.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message