kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Wed, 02 Jan 2019 22:44:21 GMT
Hi Jan and Adam,

Good to hear from you both! Happy new year!

Hey Jan, I'm sorry you feel that we aren't listening to you. I was hoping
to convey that we did understand your proposal, but simply differed in
opinion at the end of the day,

I agree with your assessment that your proposal is "eventually correct",
which is the same guarantee that all the other proposals have offered.
It also seems like your proposal does create an opportunity for developers
to implement optimizations, particularly when chaining together multiple
joins.

API:

However, this feature does come at the expense of adding a whole new
KScatteredTable interface and the corresponding mental overhead of picturing
the data flow to make sense of it and understand how to use it in both
single
and multiple join scenarios. I suppose there's an argument that developers
should *always* bear in mind the underlying implementation details of the
systems they use. Practically speaking, though, it seems like the best
systems
are ones that don't require detailed internal knowledge to be used
productively.

I'm not sure how to express that I'm strongly sympathetic to the point of
view
that we shouldn't design to prohibit optimization. And that the promise of
unspecified future possible internal optimizations may never actually bear
fruit.

Efficiency:

In this case, I haven't seen or been able to build for myself a strong case
that
the scattered table API would actually be more efficient, even in the
presence
of multiple chained joins. It's true that you get to amortize the cost of
the post-join
group/gather operation over the number of chained joins. But on the other
hand,
it requires sending the higher-cardinality data over the wire, instead of
the
lower-cardinality data. And it requires maintaining a linear amount of
candidate join
result data (in the "map" in your example) to perform the final resolution.

I decided not to bring this up in the KIP discussion before, since it
sounded like
everyone liked the latest proposal, but there is an additional optimization
available
to the current proposal:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Workflow

In step 2, we are only sending the primary and foreign key data over the
wire,
so if we really want to pare down the amount of traffic and computation, we
can
check first whether the FK has actually changed. If we also materialize
step 6, the
right kTable values, then we can avoid any network or right-side range
scans and simply
recompute the join result directly in the left-side Joiner (8) when there's
any change
on the left side data that doesn't affect the PK->FK mapping.

As you pointed out, we have to make some kind of decision in the absence of
any
good data regarding the actual data set or workload.

Here are the assumptions I'm currently operating with:
* the left side is higher cardinality than the right side
* the higher cardinality data would change more frequently
* (step 2) sending only PK+FK is smaller than sending the whole left-side
record
* broadcasting the right-side data only on right-side changes and FK
changes is
   less than broadcasting the join result on all changes
* comparing the records in step 6 for FK consistency with the left-side
table is
   cheaper than maintaining a map of all recent join results to resolve
disordering

These are all assumptions for sure, but I think they are reasonable ones.
Together, they mean that when you're joining just two tables, the
single-operator join is also
more efficient. I've been trying to do some math to determine if this holds
up for three
tables as well, but it's murky with all the unbound terms. Tentatively, I
actually do think
that the single-operator join is also likely to be more efficient for three
tables. Happy to provide
more detail if you don't buy this.

However, you seem to have a strong intuition that the scatter/gather
approach is better.
Is this informed by your actual applications at work? Perhaps you can
provide an example
data set and sequence of operations so we can all do the math and agree
with you.
It seems like we should have a convincing efficiency argument before
choosing a more
complicated API over a simpler one.

Last thought:
> Regarding what will be observed. I consider it a plus that all events
> that are in the inputs have an respective output. Whereas your solution
> might "swallow" events.

I didn't follow this. Following Adam's example, we have two join results: a
"dead" one and
a "live" one. If we get the dead one first, both solutions emit it,
followed by the live result.
If we get the dead result second, both solutions should suppress it. In your
proposal, both the dead and live result would be stored in that map, but
the groupBy operator
must not emit the dead result after the live one in any case. I guess you
were referring only to the
fact that the scattered table emits does not swallow any events? This seems
partially unrelated, since
the join is still incomplete at that point.

Thanks for your time to help us get this right!
-John

On Wed, Jan 2, 2019 at 2:36 PM Adam Bellemare <adam.bellemare@gmail.com>
wrote:

> Hi Jan
>
> Ahh, I got it! It is deterministic once you apply the groupBy function you
> mentioned a few months ago to the output, but not before you apply it...
> correct? I was not thinking about the groupBy function.
>
> Here's how I understand how it could work from an API perspective: I am
> going to use the terminology "KScatteredTable" to represent the
> intermediate table that is not yet resolved - basically the join was
> performed but no race condition handling is done.
>
> If I wanted to join three KTables together on foreign keys, one of the ways
> I could do it is:
>
> KScatteredTable scatteredOne =  ktableOne.oneToManyJoin(kTableTwo,
> joinerFuncTwo, foreignKeyExtractorTwo);
> KScatteredTable scatteredTwo = scatteredOne.oneToManyJoin(kTableThree,
> joinerFuncThree, foreignKeyExtractorThree)
>
> //Now I groupBy the key that I want to obtain, and I can resolve the out of
> order dependencies here.
> scatteredTwo.groupBy( keyValueMapper )   ( shown here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-SolutionB-User-ManagedGroupBy(Jan's)
> )
>
> Is this in line with what you're doing? Can this be done without exposing
> the CombinedKey? As you mentioned before "A Table
> KTable<CombinedKey<A,B>,JoinedResult> is not a good return type. It breaks
> the KTable invariant that a table is currently partitioned by its key".
> With that being said, are the only two operations that a KScatteredTable
> would need to support be oneToManyJoin and groupBy?
>
> Thanks for your thoughts
>
> Adam
>
>
> On Wed, Jan 2, 2019 at 3:07 PM Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>
> > Hi Adam,
> >
> > I am kinda surprised! Yes my solution of course is correct. Don't really
> > know what to show in an example as I am convinced you grabbed the
> > concept of how mine works,
> >
> > If there is a race condition there is a race condition. It doesn't
> > matter if there is 10 minutes or milliseconds between events. Either
> > they are properly guarded or not. My solution has no such race
> > condition. It is 'eventual consistent'. You gonna see all sort of stuff
> > coming up during a reprocess.
> >
> > The user can still fk it up later though. But that is usual business.
> >
> > In reality I try to supress updates from left sides as long as possible
> > because right side updates are more expensive if left is already
> > fullish. So that limits the space a little but there are no grantees.
> > The result however, after lag is zero is the same every time.
> >
> > The trade-offs can be shifted as you like. My solution gives full power
> > to the user and only does a minimum in the framework. You push
> > everything into streams.
> >
> > If you ask me, not a good choice. Will anyone listen. No.
> > I do actually think its to late to do my way. It's not like if you
> > haven't been gone through the effort and building it.
> >
> > Just wanted to give you guys another chance, to think it through  ;)
> >
> > Regarding what will be observed. I consider it a plus that all events
> > that are in the inputs have an respective output. Whereas your solution
> > might "swallow" events.
> >
> > Best Jan
> >
> >
> > On 02.01.2019 15:30, Adam Bellemare wrote:
> > > Jan
> > >
> > > I have been thinking a lot about the history of the discussion and your
> > > original proposal, and why you believe it is a better solution. The
> > biggest
> > > problem with your original proposed design is that it seems to me to be
> > > non-deterministic. It is subject to race conditions that are dependent
> > > entirely on the data, and without resolution of these races you can end
> > up
> > > with different results each time. If I am mistaken and this is indeed
> > > deterministic, then please let me know and provide an explanation,
> > ideally
> > > with an example.
> > >
> > > The way I see it is that you will get very different answers to your
> > > non-race-condition-resolved join topology, especially if you are
> nesting
> > it
> > > with additional joins as you have indicated you are doing. Consider
> > > rebuilding an application state from the beginning of two topics. If
> the
> > > left/this side has multiple foreign-key changes in a row, spaced out
> > every
> > > ten minutes, you may see something like this:
> > >
> > > (foo, foreignKey=red) t=0
> > > (foo, foreignKey=blue) t=0+10m
> > > (foo, foreignKey=green) t=0+20m
> > > (foo, foreignKey=purple) t=0+30m
> > > (foo, foreignKey=blue) t=0+40m
> > > (foo, foreignKey=white) t=0+50m
> > >
> > > During realtime processing, all of the updates may have correctly
> > > propagated because it took less than 10 minutes to resolve each join.
> > Upon
> > > rebuilding from the start, however, all of these events would be
> > processed
> > > in quick succession. The presence or absence of data will affect the
> > > results of your join, and the results can vary with each run depending
> on
> > > the data. Because of this, I cannot support any kind of solution that
> > would
> > > allow the exposure of an unresolved intermediate state. I can
> understand
> > if
> > > you don't support this, but this is why, as you said, you have the
> > freedom
> > > to use the Processor API.
> > >
> > >
> > > With that being said, either the solution that I originally proposed
> > > (join's ocurring on the foreign node) or John + Guozhang's solution
> > > (registering with the foreign node for notifications) is fine with me -
> > > both have the same API and we can evaluate it further during
> > implementation.
> > >
> > >
> > > Thanks
> > >
> > > Adam
> > >
> > > On Thu, Dec 27, 2018 at 2:38 PM Jan Filipiak <Jan.Filipiak@trivago.com
> >
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> just want to let you guys know that this thing is spiralling out of
> > >> control if you ask me.
> > >>
> > >> First you take away the possibility for the user to optimize. Now you
> > >> pile up complexity to perform some afterwards optimisation, that from
> my
> > >> POV completely misses the point. As if the actual call to the joiner
> > >> really gonna be an expensive part. It wont. Truth is, you don't have a
> > >> clue which side is gonna be smaller. might be the key you shuffle
> around
> > >> is >>> than the value on the other side already.
> > >>
> > >> You know my opinion on this. For me its dead, I just leave you the
> > >> message here as an opportunity to reconsider the choices that were
> made.
> > >>
> > >> Whish y'll a happy new year :)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On 27.12.2018 17:22, Adam Bellemare wrote:
> > >>> Hi All
> > >>>
> > >>> Sorry for the delay - holidays and all. I have since updated the KIP
> > with
> > >>> John's original suggestion and have pruned a number of the no longer
> > >>> relevant diagrams. Any more comments would be welcomed, otherwise I
> > will
> > >>> look to kick off the vote again shortly.
> > >>>
> > >>> Thanks
> > >>> Adam
> > >>>
> > >>> On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare <
> > adam.bellemare@gmail.com
> > >>>
> > >>> wrote:
> > >>>
> > >>>> Hi John and Guozhang
> > >>>>
> > >>>> Ah yes, I lost that in the mix! Thanks for the convergent solutions
> -
> > I
> > >> do
> > >>>> think that the attachment that John included makes for a better
> > design.
> > >> It
> > >>>> should also help with overall performance as very high-cardinality
> > >> foreign
> > >>>> keyed data (say millions of events with the same entity) will be
> able
> > to
> > >>>> leverage the multiple nodes for join functionality instead of having
> > it
> > >> all
> > >>>> performed in one node. There is still a bottleneck in the right
> table
> > >>>> having to propagate all those events, but with slimmer structures,
> > less
> > >> IO
> > >>>> and no need to perform the join I think the throughput will be much
> > >> higher
> > >>>> in those scenarios.
> > >>>>
> > >>>> Okay, I am convinced. I will update the KIP accordingly to a Gliffy
> > >>>> version of John's diagram and ensure that the example flow matches
> > >>>> correctly. Then I can go back to working on the PR to match the
> > diagram.
> > >>>>
> > >>>> Thanks both of you for all the help - very much appreciated.
> > >>>>
> > >>>> Adam
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangguoz@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>> Hi John,
> > >>>>>
> > >>>>> Just made a pass on your diagram (nice hand-drawing btw!), and
> > >> obviously
> > >>>>> we
> > >>>>> are thinking about the same thing :) A neat difference that I like,
> > is
> > >>>>> that
> > >>>>> in the pre-join repartition topic we can still send message in the
> > >> format
> > >>>>> of `K=k, V=(i=2)` while using "i" as the partition key in
> > >>>>> StreamsPartition,
> > >>>>> this way we do not need to even augment the key for the repartition
> > >> topic,
> > >>>>> but just do a projection on the foreign key part but trim all other
> > >>>>> fields:
> > >>>>> as long as we still materialize the store as `A-2` co-located with
> > the
> > >>>>> right KTable, that is fine.
> > >>>>>
> > >>>>> As I mentioned in my previous email, I also think this has a few
> > >>>>> advantages
> > >>>>> on saving over-the-wire bytes as well as disk bytes.
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>>
> > >>>>> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <john@confluent.io>
> > >> wrote:
> > >>>>>
> > >>>>>> Hi Guozhang,
> > >>>>>>
> > >>>>>> Thanks for taking a look! I think Adam's already addressed your
> > >>>>> questions
> > >>>>>> as well as I could have.
> > >>>>>>
> > >>>>>> Hi Adam,
> > >>>>>>
> > >>>>>> Thanks for updating the KIP. It looks great, especially how all
> the
> > >>>>>> need-to-know information is right at the top, followed by the
> > details.
> > >>>>>>
> > >>>>>> Also, thanks for that high-level diagram. Actually, now that I'm
> > >> looking
> > >>>>>> at it, I think part of my proposal got lost in translation,
> > although I
> > >>>>> do
> > >>>>>> think that what you have there is also correct.
> > >>>>>>
> > >>>>>> I sketched up a crude diagram based on yours and attached it to
> the
> > >> KIP
> > >>>>>> (I'm not sure if attached or inline images work on the mailing
> > list):
> > >>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
> > >>>>>> . It's also attached to this email for convenience.
> > >>>>>>
> > >>>>>> Hopefully, you can see how it's intended to line up, and which
> parts
> > >> are
> > >>>>>> modified.
> > >>>>>> At a high level, instead of performing the join on the right-hand
> > >> side,
> > >>>>>> we're essentially just registering interest, like "LHS key A
> wishes
> > to
> > >>>>>> receive updates for RHS key 2". Then, when there is a new
> "interest"
> > >> or
> > >>>>> any
> > >>>>>> updates to the RHS records, it "broadcasts" its state back to the
> > LHS
> > >>>>>> records who are interested in it.
> > >>>>>>
> > >>>>>> Thus, instead of sending the LHS values to the RHS joiner workers
> > and
> > >>>>> then
> > >>>>>> sending the join results back to the LHS worke be co-partitioned
> and
> > >>>>>> validated, we instead only send the LHS *keys* to the RHS workers
> > and
> > >>>>> then
> > >>>>>> only the RHS k/v back to be joined by the LHS worker.
> > >>>>>>
> > >>>>>> I've been considering both your diagram and mine, and I *think*
> what
> > >> I'm
> > >>>>>> proposing has a few advantages.
> > >>>>>>
> > >>>>>> Here are some points of interest as you look at the diagram:
> > >>>>>> * When we extract the foreign key and send it to the Pre-Join
> > >>>>> Repartition
> > >>>>>> Topic, we can send only the FK/PK pair. There's no need to worry
> > about
> > >>>>>> custom partitioner logic, since we can just use the foreign key
> > >> plainly
> > >>>>> as
> > >>>>>> the repartition record key. Also, we save on transmitting the LHS
> > >> value,
> > >>>>>> since we only send its key in this step.
> > >>>>>> * We also only need to store the RHSKey:LHSKey mapping in the
> > >>>>>> MaterializedSubscriptionStore, saving on disk. We can use the same
> > >> rocks
> > >>>>>> key format you proposed and the same algorithm involving range
> scans
> > >>>>> when
> > >>>>>> the RHS records get updated.
> > >>>>>> * Instead of joining on the right side, all we do is compose a
> > >>>>>> re-repartition record so we can broadcast the RHS k/v pair back to
> > the
> > >>>>>> original LHS partition. (this is what the "rekey" node is doing)
> > >>>>>> * Then, there is a special kind of Joiner that's co-resident in
> the
> > >> same
> > >>>>>> StreamTask as the LHS table, subscribed to the Post-Join
> Repartition
> > >>>>> Topic.
> > >>>>>> ** This Joiner is *not* triggered directly by any changes in the
> LHS
> > >>>>>> KTable. Instead, LHS events indirectly trigger the join via the
> > whole
> > >>>>>> lifecycle.
> > >>>>>> ** For each event arriving from the Post-Join Repartition Topic,
> the
> > >>>>>> Joiner looks up the corresponding record in the LHS KTable. It
> > >> validates
> > >>>>>> the FK as you noted, discarding any inconsistent events.
> Otherwise,
> > it
> > >>>>>> unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the
> > >> join
> > >>>>>> result
> > >>>>>> ** Note that the Joiner itself is stateless, so materializing the
> > join
> > >>>>>> result is optional, just as with the 1:1 joins.
> > >>>>>>
> > >>>>>> So in summary:
> > >>>>>> * instead of transmitting the LHS keys and values to the right and
> > the
> > >>>>>> JoinResult back to the left, we only transmit the LHS keys to the
> > >> right
> > >>>>> and
> > >>>>>> the RHS values to the left. Assuming the average RHS value is on
> > >> smaller
> > >>>>>> than or equal to the average join result size, it's a clear win on
> > >>>>> broker
> > >>>>>> traffic. I think this is actually a reasonable assumption, which
> we
> > >> can
> > >>>>>> discuss more if you're suspicious.
> > >>>>>> * we only need one copy of the data (the left and right tables
> need
> > to
> > >>>>> be
> > >>>>>> materialized) and one extra copy of the PK:FK pairs in the
> > >> Materialized
> > >>>>>> Subscription Store. Materializing the join result is optional,
> just
> > as
> > >>>>> with
> > >>>>>> the existing 1:1 joins.
> > >>>>>> * we still need the fancy range-scan algorithm on the right to
> > locate
> > >>>>> all
> > >>>>>> interested LHS keys when a RHS value is updated, but we don't
> need a
> > >>>>> custom
> > >>>>>> partitioner for either repartition topic (this is of course a
> > >>>>> modification
> > >>>>>> we could make to your version as well)
> > >>>>>>
> > >>>>>> How does this sound to you? (And did I miss anything?)
> > >>>>>> -John
> > >>>>>>
> > >>>>>> On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <
> > >>>>> adam.bellemare@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi John & Guozhang
> > >>>>>>>
> > >>>>>>> @John & @Guozhang Wang <wangguoz@gmail.com> - I have cleaned up
> > the
> > >>>>> KIP,
> > >>>>>>> pruned much of what I wrote and put a simplified diagram near the
> > top
> > >>>>> to
> > >>>>>>> illustrate the workflow. I encapsulated Jan's content at the
> bottom
> > >> of
> > >>>>> the
> > >>>>>>> document. I believe it is simpler to read by far now.
> > >>>>>>>
> > >>>>>>> @Guozhang Wang <wangguoz@gmail.com>:
> > >>>>>>>> #1: rekey left table
> > >>>>>>>>     -> source from the left upstream, send to rekey-processor to
> > >>>>> generate
> > >>>>>>> combined key, and then sink to copartition topic.
> > >>>>>>> Correct.
> > >>>>>>>
> > >>>>>>>> #2: first-join with right table
> > >>>>>>>>     -> source from the right table upstream, materialize the
> right
> > >>>>> table.
> > >>>>>>>>     -> source from the co-partition topic, materialize the
> rekeyed
> > >> left
> > >>>>>>> table, join with the right table, rekey back, and then sink to
> the
> > >>>>>>> rekeyed-back topic.
> > >>>>>>> Almost - I cleared up the KIP. We do not rekey back yet, as I
> need
> > >> the
> > >>>>>>> Foreign-Key value generated in #1 above to compare in the
> > resolution
> > >>>>>>> stage.
> > >>>>>>>
> > >>>>>>>> #3: second join
> > >>>>>>>>      -> source from the rekeyed-back topic, materialize the
> > rekeyed
> > >>>>> back
> > >>>>>>> table.
> > >>>>>>>>     -> source from the left upstream, materialize the left
> table,
> > >> join
> > >>>>>>> with
> > >>>>>>> the rekeyed back table.
> > >>>>>>> Almost - As each event comes in, we just run it through a
> stateful
> > >>>>>>> processor that checks the original ("This") KTable for the key.
> The
> > >>>>> value
> > >>>>>>> payload then has the foreignKeyExtractor applied again as in Part
> > #1
> > >>>>>>> above,
> > >>>>>>> and gets the current foreign key. Then we compare it to the
> joined
> > >>>>> event
> > >>>>>>> that we are currently resolving. If they have the same
> foreign-key,
> > >>>>>>> propagate the result out. If they don't, throw the event away.
> > >>>>>>>
> > >>>>>>> The end result is that we do need to materialize 2 additional
> > tables
> > >>>>>>> (left/this-combinedkey table, and the final Joined table) as I've
> > >>>>>>> illustrated in the updated KIP. I hope the diagram clears it up a
> > lot
> > >>>>>>> better. Please let me know.
> > >>>>>>>
> > >>>>>>> Thanks again
> > >>>>>>> Adam
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <
> wangguoz@gmail.com>
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> John,
> > >>>>>>>>
> > >>>>>>>> Thanks a lot for the suggestions on refactoring the wiki, I
> agree
> > >>>>> with
> > >>>>>>> you
> > >>>>>>>> that we should consider the KIP proposal to be easily understood
> > by
> > >>>>>>> anyone
> > >>>>>>>> in the future to read, and hence should provide a good summary
> on
> > >> the
> > >>>>>>>> user-facing interfaces, as well as rejected alternatives to
> > >> represent
> > >>>>>>>> briefly "how we came a long way to this conclusion, and what we
> > have
> > >>>>>>>> argued, disagreed, and agreed about, etc" so that readers do not
> > >>>>> need to
> > >>>>>>>> dig into the DISCUSS thread to get all the details. We can, of
> > >>>>> course,
> > >>>>>>> keep
> > >>>>>>>> the implementation details like "workflows" on the wiki page as
> a
> > >>>>>>> addendum
> > >>>>>>>> section since it also has correlations.
> > >>>>>>>>
> > >>>>>>>> Regarding your proposal on comment 6): that's a very interesting
> > >>>>> idea!
> > >>>>>>> Just
> > >>>>>>>> to clarify that I understands it fully correctly: the proposal's
> > >>>>>>> resulted
> > >>>>>>>> topology is still the same as the current proposal, where we
> will
> > >>>>> have 3
> > >>>>>>>> sub-topologies for this operator:
> > >>>>>>>>
> > >>>>>>>> #1: rekey left table
> > >>>>>>>>      -> source from the left upstream, send to rekey-processor
> to
> > >>>>> generate
> > >>>>>>>> combined key, and then sink to copartition topic.
> > >>>>>>>>
> > >>>>>>>> #2: first-join with right table
> > >>>>>>>>      -> source from the right table upstream, materialize the
> > right
> > >>>>> table.
> > >>>>>>>>      -> source from the co-partition topic, materialize the
> > rekeyed
> > >>>>> left
> > >>>>>>>> table, join with the right table, rekey back, and then sink to
> the
> > >>>>>>>> rekeyed-back topic.
> > >>>>>>>>
> > >>>>>>>> #3: second join
> > >>>>>>>>      -> source from the rekeyed-back topic, materialize the
> > rekeyed
> > >>>>> back
> > >>>>>>>> table.
> > >>>>>>>>      -> source from the left upstream, materialize the left
> table,
> > >> join
> > >>>>>>> with
> > >>>>>>>> the rekeyed back table.
> > >>>>>>>>
> > >>>>>>>> Sub-topology #1 and #3 may be merged to a single sub-topology
> > since
> > >>>>>>> both of
> > >>>>>>>> them read from the left table source stream. In this workflow,
> we
> > >>>>> need
> > >>>>>>> to
> > >>>>>>>> materialize 4 tables (left table in #3, right table in #2,
> rekeyed
> > >>>>> left
> > >>>>>>>> table in #2, rekeyed-back table in #3), and 2 repartition topics
> > >>>>>>>> (copartition topic, rekeyed-back topic).
> > >>>>>>>>
> > >>>>>>>> Compared with Adam's current proposal in the workflow overview,
> it
> > >>>>> has
> > >>>>>>> the
> > >>>>>>>> same num.materialize tables (left table, rekeyed left table,
> right
> > >>>>>>> table,
> > >>>>>>>> out-of-ordering resolver table), and same num.internal topics
> > (two).
> > >>>>> The
> > >>>>>>>> advantage is that on the copartition topic, we can save
> bandwidth
> > by
> > >>>>> not
> > >>>>>>>> sending value, and in #2 the rekeyed left table is smaller since
> > we
> > >>>>> do
> > >>>>>>> not
> > >>>>>>>> have any values to materialize. Is that right?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Wed, Dec 12, 2018 at 1:22 PM John Roesler <john@confluent.io
> >
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Adam,
> > >>>>>>>>>
> > >>>>>>>>> Given that the committers are all pretty busy right now, I
> think
> > >>>>> that
> > >>>>>>> it
> > >>>>>>>>> would help if you were to refactor the KIP a little to reduce
> the
> > >>>>>>>> workload
> > >>>>>>>>> for reviewers.
> > >>>>>>>>>
> > >>>>>>>>> I'd recommend the following changes:
> > >>>>>>>>> * relocate all internal details to a section at the end called
> > >>>>>>> something
> > >>>>>>>>> like "Implementation Notes" or something like that.
> > >>>>>>>>> * rewrite the rest of the KIP to be a succinct as possible and
> > >>>>> mention
> > >>>>>>>> only
> > >>>>>>>>> publicly-facing API changes.
> > >>>>>>>>> ** for example, the interface that you've already listed there,
> > as
> > >>>>>>> well
> > >>>>>>>> as
> > >>>>>>>>> a textual description of the guarantees we'll be providing
> (join
> > >>>>>>> result
> > >>>>>>>> is
> > >>>>>>>>> copartitioned with the LHS, and the join result is guaranteed
> > >>>>> correct)
> > >>>>>>>>>
> > >>>>>>>>> A good target would be that the whole main body of the KIP,
> > >>>>> including
> > >>>>>>>>> Status, Motivation, Proposal, Justification, and Rejected
> > >>>>> Alternatives
> > >>>>>>>> all
> > >>>>>>>>> fit "above the fold" (i.e., all fit on the screen at a
> > comfortable
> > >>>>>>> zoom
> > >>>>>>>>> level).
> > >>>>>>>>> I think the only real Rejected Alternative that bears mention
> at
> > >>>>> this
> > >>>>>>>> point
> > >>>>>>>>> is KScatteredTable, which you could just include the executive
> > >>>>>>> summary on
> > >>>>>>>>> (no implementation details), and link to extra details in the
> > >>>>>>>>> Implementation Notes section.
> > >>>>>>>>>
> > >>>>>>>>> Taking a look at the wiki page, ~90% of the text there is
> > internal
> > >>>>>>>> detail,
> > >>>>>>>>> which is useful for the dubious, but doesn't need to be
> ratified
> > >>>>> in a
> > >>>>>>>> vote
> > >>>>>>>>> (and would be subject to change without notice in the future
> > >>>>> anyway).
> > >>>>>>>>> There's also a lot of conflicting discussion, as you've very
> > >>>>>>> respectfully
> > >>>>>>>>> tried to preserve the original proposal from Jan while adding
> > your
> > >>>>>>> own.
> > >>>>>>>>> Isolating all this information in a dedicated section at the
> > bottom
> > >>>>>>> frees
> > >>>>>>>>> the voters up to focus on the public API part of the proposal,
> > >>>>> which
> > >>>>>>> is
> > >>>>>>>>> really all they need to consider.
> > >>>>>>>>>
> > >>>>>>>>> Plus, it'll be clear to future readers which parts of the
> > document
> > >>>>> are
> > >>>>>>>>> enduring, and which parts are a snapshot of our implementation
> > >>>>>>> thinking
> > >>>>>>>> at
> > >>>>>>>>> the time.
> > >>>>>>>>>
> > >>>>>>>>> I'm suggesting this because I suspect that the others haven't
> > made
> > >>>>>>> time
> > >>>>>>>> to
> > >>>>>>>>> review it partly because it seems daunting. If it seems like it
> > >>>>> would
> > >>>>>>> be
> > >>>>>>>> a
> > >>>>>>>>> huge time investment to review, people will just keep putting
> it
> > >>>>> off.
> > >>>>>>> But
> > >>>>>>>>> if the KIP is a single page, then they'll be more inclined to
> > give
> > >>>>> it
> > >>>>>>> a
> > >>>>>>>>> read.
> > >>>>>>>>>
> > >>>>>>>>> Honestly, I don't think the KIP itself is that controversial
> > (apart
> > >>>>>>> from
> > >>>>>>>>> the scattered table thing (sorry, Jan) ). Most of the
> discussion
> > >>>>> has
> > >>>>>>> been
> > >>>>>>>>> around the implementation, which we can continue more
> effectively
> > >>>>> in
> > >>>>>>> a PR
> > >>>>>>>>> once the KIP has passed.
> > >>>>>>>>>
> > >>>>>>>>> How does that sound?
> > >>>>>>>>> -John
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
> > >>>>>>> adam.bellemare@gmail.com
> > >>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> 1) I believe that the resolution mechanism John has proposed
> is
> > >>>>>>>>> sufficient
> > >>>>>>>>>> - it is clean and easy and doesn't require additional RocksDB
> > >>>>>>> stores,
> > >>>>>>>>> which
> > >>>>>>>>>> reduces the footprint greatly. I don't think we need to
> resolve
> > >>>>>>> based
> > >>>>>>>> on
> > >>>>>>>>>> timestamp or offset anymore, but if we decide to do to that
> > >>>>> would be
> > >>>>>>>>> within
> > >>>>>>>>>> the bounds of the existing API.
> > >>>>>>>>>>
> > >>>>>>>>>> 2) Is the current API sufficient, or does it need to be
> altered
> > >>>>> to
> > >>>>>>> go
> > >>>>>>>>> back
> > >>>>>>>>>> to vote?
> > >>>>>>>>>>
> > >>>>>>>>>> 3) KScatteredTable implementation can always be added in a
> > future
> > >>>>>>>>> revision.
> > >>>>>>>>>> This API does not rule it out. This implementation of this
> > >>>>> function
> > >>>>>>>> would
> > >>>>>>>>>> simply be replaced with `KScatteredTable.resolve()` while
> still
> > >>>>>>>>> maintaining
> > >>>>>>>>>> the existing API, thereby giving both features as Jan outlined
> > >>>>>>> earlier.
> > >>>>>>>>>> Would this work?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks Guozhang, John and Jan
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Dec 10, 2018 at 10:39 AM John Roesler <
> > john@confluent.io
> > >>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi, all,
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> In fact, we
> > >>>>>>>>>>>>> can just keep a single final-result store with timestamps
> > >>>>> and
> > >>>>>>>> reject
> > >>>>>>>>>>> values
> > >>>>>>>>>>>>> that have a smaller timestamp, is that right?
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Which is the correct output should at least be decided on
> the
> > >>>>>>>> offset
> > >>>>>>>>> of
> > >>>>>>>>>>>> the original message.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for this point, Jan.
> > >>>>>>>>>>>
> > >>>>>>>>>>> KIP-258 is merely to allow embedding the record timestamp  in
> > >>>>> the
> > >>>>>>> k/v
> > >>>>>>>>>>> store,
> > >>>>>>>>>>> as well as providing a storage-format upgrade path.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I might have missed it, but I think we have yet to discuss
> > >>>>> whether
> > >>>>>>>> it's
> > >>>>>>>>>>> safe
> > >>>>>>>>>>> or desirable just to swap topic-ordering our for
> > >>>>>>> timestamp-ordering.
> > >>>>>>>>> This
> > >>>>>>>>>>> is
> > >>>>>>>>>>> a very deep topic, and I think it would only pollute the
> > >>>>> current
> > >>>>>>>>>>> discussion.
> > >>>>>>>>>>>
> > >>>>>>>>>>> What Adam has proposed is safe, given the *current* ordering
> > >>>>>>>> semantics
> > >>>>>>>>>>> of the system. If we can agree on his proposal, I think we
> can
> > >>>>>>> merge
> > >>>>>>>>> the
> > >>>>>>>>>>> feature well before the conversation about timestamp ordering
> > >>>>> even
> > >>>>>>>>> takes
> > >>>>>>>>>>> place, much less reaches a conclusion. In the mean time, it
> > >>>>> would
> > >>>>>>>> seem
> > >>>>>>>>> to
> > >>>>>>>>>>> be unfortunate to have one join operator with different
> > >>>>> ordering
> > >>>>>>>>>> semantics
> > >>>>>>>>>>> from every other KTable operator.
> > >>>>>>>>>>>
> > >>>>>>>>>>> If and when that timestamp discussion takes place, many
> (all?)
> > >>>>>>> KTable
> > >>>>>>>>>>> operations
> > >>>>>>>>>>> will need to be updated, rendering the many:one join a small
> > >>>>>>> marginal
> > >>>>>>>>>> cost.
> > >>>>>>>>>>>
> > >>>>>>>>>>> And, just to plug it again, I proposed an algorithm above
> that
> > >>>>> I
> > >>>>>>>>> believe
> > >>>>>>>>>>> provides
> > >>>>>>>>>>> correct ordering without any additional metadata, and
> > >>>>> regardless
> > >>>>>>> of
> > >>>>>>>> the
> > >>>>>>>>>>> ordering semantics. I didn't bring it up further, because I
> > >>>>> felt
> > >>>>>>> the
> > >>>>>>>>> KIP
> > >>>>>>>>>>> only needs
> > >>>>>>>>>>> to agree on the public API, and we can discuss the
> > >>>>> implementation
> > >>>>>>> at
> > >>>>>>>>>>> leisure in
> > >>>>>>>>>>> a PR...
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>> -John
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
> > >>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 10.12.2018 07:42, Guozhang Wang wrote:
> > >>>>>>>>>>>>> Hello Adam / Jan / John,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Sorry for being late on this thread! I've finally got some
> > >>>>>>> time
> > >>>>>>>>> this
> > >>>>>>>>>>>>> weekend to cleanup a load of tasks on my queue (actually
> > >>>>> I've
> > >>>>>>>> also
> > >>>>>>>>>>>> realized
> > >>>>>>>>>>>>> there are a bunch of other things I need to enqueue while
> > >>>>>>>> cleaning
> > >>>>>>>>>> them
> > >>>>>>>>>>>> up
> > >>>>>>>>>>>>> --- sth I need to improve on my side). So here are my
> > >>>>>>> thoughts:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Regarding the APIs: I like the current written API in the
> > >>>>> KIP.
> > >>>>>>>> More
> > >>>>>>>>>>>>> generally I'd prefer to keep the 1) one-to-many join
> > >>>>>>>>> functionalities
> > >>>>>>>>>> as
> > >>>>>>>>>>>>> well as 2) other join types than inner as separate KIPs
> > >>>>> since
> > >>>>>>> 1)
> > >>>>>>>>> may
> > >>>>>>>>>>>> worth
> > >>>>>>>>>>>>> a general API refactoring that can benefit not only
> > >>>>> foreignkey
> > >>>>>>>>> joins
> > >>>>>>>>>>> but
> > >>>>>>>>>>>>> collocate joins as well (e.g. an extended proposal of
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > >>>>>>>>>>>> ),
> > >>>>>>>>>>>>> and I'm not sure if other join types would actually be
> > >>>>> needed
> > >>>>>>>>> (maybe
> > >>>>>>>>>>> left
> > >>>>>>>>>>>>> join still makes sense), so it's better to
> > >>>>>>>>>>> wait-for-people-to-ask-and-add
> > >>>>>>>>>>>>> than add-sth-that-no-one-uses.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Regarding whether we enforce step 3) / 4) v.s. introducing
> > >>>>> a
> > >>>>>>>>>>>>> KScatteredTable for users to inject their own optimization:
> > >>>>>>> I'd
> > >>>>>>>>>> prefer
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> do the current option as-is, and my main rationale is for
> > >>>>>>>>>> optimization
> > >>>>>>>>>>>>> rooms inside the Streams internals and the API
> > >>>>> succinctness.
> > >>>>>>> For
> > >>>>>>>>>>> advanced
> > >>>>>>>>>>>>> users who may indeed prefer KScatteredTable and do their
> > >>>>> own
> > >>>>>>>>>>>> optimization,
> > >>>>>>>>>>>>> while it is too much of the work to use Processor API
> > >>>>>>> directly, I
> > >>>>>>>>>> think
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>> can still extend the current API to support it in the
> > >>>>> future
> > >>>>>>> if
> > >>>>>>>> it
> > >>>>>>>>>>>> becomes
> > >>>>>>>>>>>>> necessary.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> no internal optimization potential. it's a myth
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> ¯\_(ツ)_/¯
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> :-)
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Another note about step 4) resolving out-of-ordering data,
> > >>>>> as
> > >>>>>>> I
> > >>>>>>>>>>> mentioned
> > >>>>>>>>>>>>> before I think with KIP-258 (embedded timestamp with
> > >>>>> key-value
> > >>>>>>>>> store)
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> can actually make this step simpler than the current
> > >>>>>>> proposal. In
> > >>>>>>>>>> fact,
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>> can just keep a single final-result store with timestamps
> > >>>>> and
> > >>>>>>>>> reject
> > >>>>>>>>>>>> values
> > >>>>>>>>>>>>> that have a smaller timestamp, is that right?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Which is the correct output should at least be decided on
> the
> > >>>>>>>> offset
> > >>>>>>>>> of
> > >>>>>>>>>>>> the original message.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> That's all I have in mind now. Again, great appreciation to
> > >>>>>>> Adam
> > >>>>>>>> to
> > >>>>>>>>>>> make
> > >>>>>>>>>>>>> such HUGE progress on this KIP!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
> > >>>>>>>>>> Jan.Filipiak@trivago.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> If they don't find the time:
> > >>>>>>>>>>>>>> They usually take the opposite path from me :D
> > >>>>>>>>>>>>>> so the answer would be clear.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> hence my suggestion to vote.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On 04.12.2018 21:06, Adam Bellemare wrote:
> > >>>>>>>>>>>>>>> Hi Guozhang and Matthias
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I know both of you are quite busy, but we've gotten this
> > >>>>> KIP
> > >>>>>>>> to a
> > >>>>>>>>>>> point
> > >>>>>>>>>>>>>>> where we need more guidance on the API (perhaps a bit of
> > >>>>> a
> > >>>>>>>>>>> tie-breaker,
> > >>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>> you will). If you have anyone else you may think should
> > >>>>>>> look at
> > >>>>>>>>>> this,
> > >>>>>>>>>>>>>>> please tag them accordingly.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The scenario is as such:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Current Option:
> > >>>>>>>>>>>>>>> API:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> > >>>>>>>>>>>>>>> 1) Rekey the data to CombinedKey, and shuffles it to the
> > >>>>>>>>> partition
> > >>>>>>>>>>> with
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> foreignKey (repartition 1)
> > >>>>>>>>>>>>>>> 2) Join the data
> > >>>>>>>>>>>>>>> 3) Shuffle the data back to the original node
> > >>>>> (repartition
> > >>>>>>> 2)
> > >>>>>>>>>>>>>>> 4) Resolve out-of-order arrival / race condition due to
> > >>>>>>>>> foreign-key
> > >>>>>>>>>>>>>> changes.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Alternate Option:
> > >>>>>>>>>>>>>>> Perform #1 and #2 above, and return a KScatteredTable.
> > >>>>>>>>>>>>>>> - It would be keyed on a wrapped key function:
> > >>>>>>> <CombinedKey<KO,
> > >>>>>>>>> K>,
> > >>>>>>>>>>> VR>
> > >>>>>>>>>>>>>> (KO
> > >>>>>>>>>>>>>>> = Other Table Key, K = This Table Key, VR = Joined
> > >>>>> Result)
> > >>>>>>>>>>>>>>> - KScatteredTable.resolve() would perform #3 and #4 but
> > >>>>>>>>> otherwise a
> > >>>>>>>>>>>> user
> > >>>>>>>>>>>>>>> would be able to perform additional functions directly
> > >>>>> from
> > >>>>>>> the
> > >>>>>>>>>>>>>>> KScatteredTable (TBD - currently out of scope).
> > >>>>>>>>>>>>>>> - John's analysis 2-emails up is accurate as to the
> > >>>>>>> tradeoffs.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Current Option is coded as-is. Alternate option is
> > >>>>> possible,
> > >>>>>>>> but
> > >>>>>>>>>> will
> > >>>>>>>>>>>>>>> require for implementation details to be made in the API
> > >>>>> and
> > >>>>>>>> some
> > >>>>>>>>>>>>>> exposure
> > >>>>>>>>>>>>>>> of new data structures into the API (ie: CombinedKey).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I appreciate any insight into this.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
> > >>>>>>>>>>>> adam.bellemare@gmail.com>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi John
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for your feedback and assistance. I think your
> > >>>>>>> summary
> > >>>>>>>> is
> > >>>>>>>>>>>>>> accurate
> > >>>>>>>>>>>>>>>> from my perspective. Additionally, I would like to add
> > >>>>> that
> > >>>>>>>>> there
> > >>>>>>>>>>> is a
> > >>>>>>>>>>>>>> risk
> > >>>>>>>>>>>>>>>> of inconsistent final states without performing the
> > >>>>>>>> resolution.
> > >>>>>>>>>> This
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> major concern for me as most of the data I have dealt
> > >>>>> with
> > >>>>>>> is
> > >>>>>>>>>>> produced
> > >>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>> relational databases. We have seen a number of cases
> > >>>>> where
> > >>>>>>> a
> > >>>>>>>>> user
> > >>>>>>>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> Rails UI has modified the field (foreign key), realized
> > >>>>>>> they
> > >>>>>>>>> made
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>>>> mistake, and then updated the field again with a new
> > >>>>> key.
> > >>>>>>> The
> > >>>>>>>>>> events
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>> propagated out as they are produced, and as such we have
> > >>>>>>> had
> > >>>>>>>>>>>> real-world
> > >>>>>>>>>>>>>>>> cases where these inconsistencies were propagated
> > >>>>>>> downstream
> > >>>>>>>> as
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>> final
> > >>>>>>>>>>>>>>>> values due to the race conditions in the fanout of the
> > >>>>>>> data.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> This solution that I propose values correctness of the
> > >>>>>>> final
> > >>>>>>>>>> result
> > >>>>>>>>>>>> over
> > >>>>>>>>>>>>>>>> other factors.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> We could always move this function over to using a
> > >>>>>>>>> KScatteredTable
> > >>>>>>>>>>>>>>>> implementation in the future, and simply deprecate it
> > >>>>> this
> > >>>>>>>> join
> > >>>>>>>>>> API
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>>>> time. I think I would like to hear more from some of the
> > >>>>>>> other
> > >>>>>>>>>> major
> > >>>>>>>>>>>>>>>> committers on which course of action they would think is
> > >>>>>>> best
> > >>>>>>>>>> before
> > >>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>> more coding is done.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks again
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
> > >>>>>>>> john@confluent.io>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Jan and Adam,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Wow, thanks for doing that test, Adam. Those results
> > >>>>> are
> > >>>>>>>>>>> encouraging.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for your performance experience as well, Jan. I
> > >>>>>>> agree
> > >>>>>>>>> that
> > >>>>>>>>>>>>>> avoiding
> > >>>>>>>>>>>>>>>>> unnecessary join outputs is especially important when
> > >>>>> the
> > >>>>>>>>> fan-out
> > >>>>>>>>>>> is
> > >>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>> high. I suppose this could also be built into the
> > >>>>>>>>> implementation
> > >>>>>>>>>>>> we're
> > >>>>>>>>>>>>>>>>> discussing, but it wouldn't have to be specified in the
> > >>>>>>> KIP
> > >>>>>>>>>> (since
> > >>>>>>>>>>>>>> it's an
> > >>>>>>>>>>>>>>>>> API-transparent optimization).
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> As far as whether or not to re-repartition the data, I
> > >>>>>>> didn't
> > >>>>>>>>>> bring
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>> up
> > >>>>>>>>>>>>>>>>> because it sounded like the two of you agreed to leave
> > >>>>> the
> > >>>>>>>> KIP
> > >>>>>>>>>>> as-is,
> > >>>>>>>>>>>>>>>>> despite the disagreement.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> If you want my opinion, I feel like both approaches are
> > >>>>>>>>>> reasonable.
> > >>>>>>>>>>>>>>>>> It sounds like Jan values more the potential for
> > >>>>>>> developers
> > >>>>>>>> to
> > >>>>>>>>>>>> optimize
> > >>>>>>>>>>>>>>>>> their topologies to re-use the intermediate nodes,
> > >>>>> whereas
> > >>>>>>>> Adam
> > >>>>>>>>>>>> places
> > >>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>> value on having a single operator that people can use
> > >>>>>>> without
> > >>>>>>>>>> extra
> > >>>>>>>>>>>>>> steps
> > >>>>>>>>>>>>>>>>> at the end.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Personally, although I do find it exceptionally
> > >>>>> annoying
> > >>>>>>>> when a
> > >>>>>>>>>>>>>> framework
> > >>>>>>>>>>>>>>>>> gets in my way when I'm trying to optimize something,
> > >>>>> it
> > >>>>>>>> seems
> > >>>>>>>>>>> better
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> go
> > >>>>>>>>>>>>>>>>> for a single operation.
> > >>>>>>>>>>>>>>>>> * Encapsulating the internal transitions gives us
> > >>>>>>> significant
> > >>>>>>>>>>>> latitude
> > >>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> the implementation (for example, joining only at the
> > >>>>> end,
> > >>>>>>> not
> > >>>>>>>>> in
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> middle
> > >>>>>>>>>>>>>>>>> to avoid extra data copying and out-of-order
> > >>>>> resolution;
> > >>>>>>> how
> > >>>>>>>> we
> > >>>>>>>>>>>>>> represent
> > >>>>>>>>>>>>>>>>> the first repartition keys (combined keys vs. value
> > >>>>>>> vectors),
> > >>>>>>>>>>> etc.).
> > >>>>>>>>>>>>>> If we
> > >>>>>>>>>>>>>>>>> publish something like a KScatteredTable with the
> > >>>>>>>>>> right-partitioned
> > >>>>>>>>>>>>>> joined
> > >>>>>>>>>>>>>>>>> data, then the API pretty much locks in the
> > >>>>>>> implementation as
> > >>>>>>>>>> well.
> > >>>>>>>>>>>>>>>>> * The API seems simpler to understand and use. I do
> > >>>>> mean
> > >>>>>>>>> "seems";
> > >>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> anyone
> > >>>>>>>>>>>>>>>>> wants to make the case that KScatteredTable is actually
> > >>>>>>>>> simpler,
> > >>>>>>>>>> I
> > >>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>> hypothetical usage code would help. From a relational
> > >>>>>>> algebra
> > >>>>>>>>>>>>>> perspective,
> > >>>>>>>>>>>>>>>>> it seems like KTable.join(KTable) should produce a new
> > >>>>>>> KTable
> > >>>>>>>>> in
> > >>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>> cases.
> > >>>>>>>>>>>>>>>>> * That said, there might still be room in the API for a
> > >>>>>>>>> different
> > >>>>>>>>>>>>>>>>> operation
> > >>>>>>>>>>>>>>>>> like what Jan has proposed to scatter a KTable, and
> > >>>>> then
> > >>>>>>> do
> > >>>>>>>>>> things
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>> join, re-group, etc from there... I'm not sure; I
> > >>>>> haven't
> > >>>>>>>>> thought
> > >>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>>> all the consequences yet.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> This is all just my opinion after thinking over the
> > >>>>>>>> discussion
> > >>>>>>>>> so
> > >>>>>>>>>>>>>> far...
> > >>>>>>>>>>>>>>>>> -John
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> > >>>>>>>>>>>>>> adam.bellemare@gmail.com>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Updated the PR to take into account John's feedback.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I did some preliminary testing for the performance of
> > >>>>> the
> > >>>>>>>>>>>> prefixScan.
> > >>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>> have attached the file, but I will also include the
> > >>>>> text
> > >>>>>>> in
> > >>>>>>>>> the
> > >>>>>>>>>>> body
> > >>>>>>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>>>> for archival purposes (I am not sure what happens to
> > >>>>>>>> attached
> > >>>>>>>>>>>> files).
> > >>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>> also updated the PR and the KIP accordingly.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Summary: It scales exceptionally well for scanning
> > >>>>> large
> > >>>>>>>>> values
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> records. As Jan mentioned previously, the real issue
> > >>>>>>> would
> > >>>>>>>> be
> > >>>>>>>>>> more
> > >>>>>>>>>>>>>>>>> around
> > >>>>>>>>>>>>>>>>>> processing the resulting records after obtaining them.
> > >>>>>>> For
> > >>>>>>>>>>> instance,
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>> takes approximately ~80-120 mS to flush the buffer
> > >>>>> and a
> > >>>>>>>>> further
> > >>>>>>>>>>>>>>>>> ~35-85mS
> > >>>>>>>>>>>>>>>>>> to scan 27.5M records, obtaining matches for 2.5M of
> > >>>>>>> them.
> > >>>>>>>>>>> Iterating
> > >>>>>>>>>>>>>>>>>> through the records just to generate a simple count
> > >>>>>>> takes ~
> > >>>>>>>> 40
> > >>>>>>>>>>> times
> > >>>>>>>>>>>>>>>>> longer
> > >>>>>>>>>>>>>>>>>> than the flush + scan combined.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> ============================================================================================
> > >>>>>>>>>>>>>>>>>> Setup:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> ============================================================================================
> > >>>>>>>>>>>>>>>>>> Java 9 with default settings aside from a 512 MB heap
> > >>>>>>>>> (Xmx512m,
> > >>>>>>>>>>>>>> Xms512m)
> > >>>>>>>>>>>>>>>>>> CPU: i7 2.2 Ghz.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Note: I am using a slightly-modified,
> > >>>>> directly-accessible
> > >>>>>>>>> Kafka
> > >>>>>>>>>>>>>> Streams
> > >>>>>>>>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>>>>>> implementation (RocksDB.java, basically just avoiding
> > >>>>> the
> > >>>>>>>>>>>>>>>>>> ProcessorContext).
> > >>>>>>>>>>>>>>>>>> There are no modifications to the default RocksDB
> > >>>>> values
> > >>>>>>>>>> provided
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> 2.1/trunk release.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> keysize = 128 bytes
> > >>>>>>>>>>>>>>>>>> valsize = 512 bytes
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Step 1:
> > >>>>>>>>>>>>>>>>>> Write X positive matching events: (key = prefix +
> > >>>>>>>> left-padded
> > >>>>>>>>>>>>>>>>>> auto-incrementing integer)
> > >>>>>>>>>>>>>>>>>> Step 2:
> > >>>>>>>>>>>>>>>>>> Write 10X negative matching events (key = left-padded
> > >>>>>>>>>>>>>> auto-incrementing
> > >>>>>>>>>>>>>>>>>> integer)
> > >>>>>>>>>>>>>>>>>> Step 3:
> > >>>>>>>>>>>>>>>>>> Perform flush
> > >>>>>>>>>>>>>>>>>> Step 4:
> > >>>>>>>>>>>>>>>>>> Perform prefixScan
> > >>>>>>>>>>>>>>>>>> Step 5:
> > >>>>>>>>>>>>>>>>>> Iterate through return Iterator and validate the
> > >>>>> count of
> > >>>>>>>>>> expected
> > >>>>>>>>>>>>>>>>> events.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> ============================================================================================
> > >>>>>>>>>>>>>>>>>> Results:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> ============================================================================================
> > >>>>>>>>>>>>>>>>>> X = 1k (11k events total)
> > >>>>>>>>>>>>>>>>>> Flush Time = 39 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 7 mS
> > >>>>>>>>>>>>>>>>>> 6.9 MB disk
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> --------------------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>> X = 10k (110k events total)
> > >>>>>>>>>>>>>>>>>> Flush Time = 45 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 8 mS
> > >>>>>>>>>>>>>>>>>> 127 MB
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> --------------------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>> X = 100k (1.1M events total)
> > >>>>>>>>>>>>>>>>>> Test1:
> > >>>>>>>>>>>>>>>>>> Flush Time = 60 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 12 mS
> > >>>>>>>>>>>>>>>>>> 678 MB
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Test2:
> > >>>>>>>>>>>>>>>>>> Flush Time = 45 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 7 mS
> > >>>>>>>>>>>>>>>>>> 576 MB
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> --------------------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>> X = 1MB (11M events total)
> > >>>>>>>>>>>>>>>>>> Test1:
> > >>>>>>>>>>>>>>>>>> Flush Time = 52 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 19 mS
> > >>>>>>>>>>>>>>>>>> 7.2 GB
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Test2:
> > >>>>>>>>>>>>>>>>>> Flush Time = 84 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 34 mS
> > >>>>>>>>>>>>>>>>>> 9.1 GB
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> --------------------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>> X = 2.5M (27.5M events total)
> > >>>>>>>>>>>>>>>>>> Test1:
> > >>>>>>>>>>>>>>>>>> Flush Time = 82 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 63 mS
> > >>>>>>>>>>>>>>>>>> 17GB - 276 sst files
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Test2:
> > >>>>>>>>>>>>>>>>>> Flush Time = 116 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 35 mS
> > >>>>>>>>>>>>>>>>>> 23GB - 361 sst files
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Test3:
> > >>>>>>>>>>>>>>>>>> Flush Time = 103 mS
> > >>>>>>>>>>>>>>>>>> Scan Time = 82 mS
> > >>>>>>>>>>>>>>>>>> 19 GB - 300 sst files
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> --------------------------------------------------------------------------------------------
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I had to limit my testing on my laptop to X = 2.5M
> > >>>>>>> events. I
> > >>>>>>>>>> tried
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> go
> > >>>>>>>>>>>>>>>>>> to X = 10M (110M events) but RocksDB was going into
> > >>>>> the
> > >>>>>>>> 100GB+
> > >>>>>>>>>>> range
> > >>>>>>>>>>>>>>>>> and my
> > >>>>>>>>>>>>>>>>>> laptop ran out of disk. More extensive testing could
> > >>>>> be
> > >>>>>>> done
> > >>>>>>>>>> but I
> > >>>>>>>>>>>>>>>>> suspect
> > >>>>>>>>>>>>>>>>>> that it would be in line with what we're seeing in the
> > >>>>>>>> results
> > >>>>>>>>>>>> above.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> At this point in time, I think the only major
> > >>>>> discussion
> > >>>>>>>> point
> > >>>>>>>>>> is
> > >>>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>> around what Jan and I have disagreed on:
> > >>>>> repartitioning
> > >>>>>>>> back +
> > >>>>>>>>>>>>>> resolving
> > >>>>>>>>>>>>>>>>>> potential out of order issues or leaving that up to
> > >>>>> the
> > >>>>>>>> client
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> handle.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks folks,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
> > >>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On 29.11.2018 15:14, John Roesler wrote:
> > >>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Sorry that this discussion petered out... I think
> > >>>>> the
> > >>>>>>> 2.1
> > >>>>>>>>>>> release
> > >>>>>>>>>>>>>>>>>>> caused an
> > >>>>>>>>>>>>>>>>>>>> extended distraction that pushed it off everyone's
> > >>>>>>> radar
> > >>>>>>>>>> (which
> > >>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>> precisely Adam's concern). Personally, I've also had
> > >>>>>>> some
> > >>>>>>>>>> extend
> > >>>>>>>>>>>>>>>>>>>> distractions of my own that kept (and continue to
> > >>>>>>> keep) me
> > >>>>>>>>>>>>>>>>> preoccupied.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> However, calling for a vote did wake me up, so I
> > >>>>> guess
> > >>>>>>> Jan
> > >>>>>>>>> was
> > >>>>>>>>>>> on
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> right
> > >>>>>>>>>>>>>>>>>>>> track!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I've gone back and reviewed the whole KIP document
> > >>>>> and
> > >>>>>>> the
> > >>>>>>>>>> prior
> > >>>>>>>>>>>>>>>>>>>> discussion, and I'd like to offer a few thoughts:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> API Thoughts:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 1. If I read the KIP right, you are proposing a
> > >>>>>>>> many-to-one
> > >>>>>>>>>>> join.
> > >>>>>>>>>>>>>>>>> Could
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
> > >>>>>>> flip
> > >>>>>>>>> the
> > >>>>>>>>>>>> design
> > >>>>>>>>>>>>>>>>>>> around
> > >>>>>>>>>>>>>>>>>>>> and make it a oneToManyJoin?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The proposed name "joinOnForeignKey" disguises the
> > >>>>> join
> > >>>>>>>>> type,
> > >>>>>>>>>>> and
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>>> like it might trick some people into using it for a
> > >>>>>>>>> one-to-one
> > >>>>>>>>>>>> join.
> > >>>>>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>>>> would work, of course, but it would be super
> > >>>>>>> inefficient
> > >>>>>>>>>>> compared
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> simple rekey-and-join.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 2. I might have missed it, but I don't think it's
> > >>>>>>>> specified
> > >>>>>>>>>>>> whether
> > >>>>>>>>>>>>>>>>>>> it's an
> > >>>>>>>>>>>>>>>>>>>> inner, outer, or left join. I'm guessing an outer
> > >>>>>>> join, as
> > >>>>>>>>>>>>>>>>> (neglecting
> > >>>>>>>>>>>>>>>>>>> IQ),
> > >>>>>>>>>>>>>>>>>>>> the rest can be achieved by filtering or by handling
> > >>>>>>> it in
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> ValueJoiner.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 3. The arg list to joinOnForeignKey doesn't look
> > >>>>> quite
> > >>>>>>>>> right.
> > >>>>>>>>>>>>>>>>>>>> 3a. Regarding Serialized: There are a few different
> > >>>>>>>>> paradigms
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> play in
> > >>>>>>>>>>>>>>>>>>>> the Streams API, so it's confusing, but instead of
> > >>>>>>> three
> > >>>>>>>>>>>> Serialized
> > >>>>>>>>>>>>>>>>>>> args, I
> > >>>>>>>>>>>>>>>>>>>> think it would be better to have one that allows
> > >>>>>>>>> (optionally)
> > >>>>>>>>>>>>>> setting
> > >>>>>>>>>>>>>>>>>>> the 4
> > >>>>>>>>>>>>>>>>>>>> incoming serdes. The result serde is defined by the
> > >>>>>>>>>>> Materialized.
> > >>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>> incoming serdes can be optional because they might
> > >>>>>>> already
> > >>>>>>>>> be
> > >>>>>>>>>>>>>>>>> available
> > >>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>> the source KTables, or the default serdes from the
> > >>>>>>> config
> > >>>>>>>>>> might
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> applicable.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 3b. Is the StreamPartitioner necessary? The other
> > >>>>> joins
> > >>>>>>>>> don't
> > >>>>>>>>>>>> allow
> > >>>>>>>>>>>>>>>>>>> setting
> > >>>>>>>>>>>>>>>>>>>> one, and it seems like it might actually be harmful,
> > >>>>>>> since
> > >>>>>>>>> the
> > >>>>>>>>>>>> rekey
> > >>>>>>>>>>>>>>>>>>>> operation needs to produce results that are
> > >>>>>>> co-partitioned
> > >>>>>>>>>> with
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> "other"
> > >>>>>>>>>>>>>>>>>>>> KTable.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 4. I'm fine with the "reserved word" header, but I
> > >>>>>>> didn't
> > >>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>> follow
> > >>>>>>>>>>>>>>>>>>>> what Matthias meant about namespacing requiring
> > >>>>>>>>>> "deserializing"
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>>>>>> header. The headers are already Strings, so I don't
> > >>>>>>> think
> > >>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> deserialization is required. If we applied the
> > >>>>>>> namespace
> > >>>>>>>> at
> > >>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>> nodes
> > >>>>>>>>>>>>>>>>>>>> and stripped it at sink nodes, this would be
> > >>>>>>> practically
> > >>>>>>>> no
> > >>>>>>>>>>>>>> overhead.
> > >>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>> advantage of the namespace idea is that no public
> > >>>>> API
> > >>>>>>>> change
> > >>>>>>>>>> wrt
> > >>>>>>>>>>>>>>>>> headers
> > >>>>>>>>>>>>>>>>>>>> needs to happen, and no restrictions need to be
> > >>>>> placed
> > >>>>>>> on
> > >>>>>>>>>> users'
> > >>>>>>>>>>>>>>>>>>> headers.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> (Although I'm wondering if we can get away without
> > >>>>> the
> > >>>>>>>>> header
> > >>>>>>>>>> at
> > >>>>>>>>>>>>>>>>> all...
> > >>>>>>>>>>>>>>>>>>>> stay tuned)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 5. I also didn't follow the discussion about the HWM
> > >>>>>>> table
> > >>>>>>>>>>> growing
> > >>>>>>>>>>>>>>>>>>> without
> > >>>>>>>>>>>>>>>>>>>> bound. As I read it, the HWM table is effectively
> > >>>>>>>>> implementing
> > >>>>>>>>>>> OCC
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> resolve the problem you noted with disordering when
> > >>>>> the
> > >>>>>>>>> rekey
> > >>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> reversed... particularly notable when the FK
> > >>>>> changes.
> > >>>>>>> As
> > >>>>>>>>> such,
> > >>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>> needs to track the most recent "version" (the
> > >>>>> offset in
> > >>>>>>>> the
> > >>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>>>>> partition) of each key. Therefore, it should have
> > >>>>> the
> > >>>>>>> same
> > >>>>>>>>>>> number
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> keys
> > >>>>>>>>>>>>>>>>>>>> as the source table at all times.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I see that you are aware of KIP-258, which I think
> > >>>>>>> might
> > >>>>>>>> be
> > >>>>>>>>>>>> relevant
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> couple of ways. One: it's just about storing the
> > >>>>>>> timestamp
> > >>>>>>>>> in
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>> store, but the ultimate idea is to effectively use
> > >>>>> the
> > >>>>>>>>>> timestamp
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>> OCC
> > >>>>>>>>>>>>>>>>>>>> "version" to drop disordered updates. You wouldn't
> > >>>>>>> want to
> > >>>>>>>>> use
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> timestamp for this operation, but if you were to
> > >>>>> use a
> > >>>>>>>>> similar
> > >>>>>>>>>>>>>>>>>>> mechanism to
> > >>>>>>>>>>>>>>>>>>>> store the source offset in the store alongside the
> > >>>>>>>> re-keyed
> > >>>>>>>>>>>> values,
> > >>>>>>>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>>>>>> you could avoid a separate table.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 6. You and Jan have been thinking about this for a
> > >>>>> long
> > >>>>>>>>> time,
> > >>>>>>>>>> so
> > >>>>>>>>>>>>>> I've
> > >>>>>>>>>>>>>>>>>>>> probably missed something here, but I'm wondering
> > >>>>> if we
> > >>>>>>>> can
> > >>>>>>>>>>> avoid
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> HWM
> > >>>>>>>>>>>>>>>>>>>> tracking at all and resolve out-of-order during a
> > >>>>> final
> > >>>>>>>> join
> > >>>>>>>>>>>>>>>>> instead...
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Let's say we're joining a left table (Integer K:
> > >>>>> Letter
> > >>>>>>>> FK,
> > >>>>>>>>>>> (other
> > >>>>>>>>>>>>>>>>>>> data))
> > >>>>>>>>>>>>>>>>>>>> to a right table (Letter K: (some data)).
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Left table:
> > >>>>>>>>>>>>>>>>>>>> 1: (A, xyz)
> > >>>>>>>>>>>>>>>>>>>> 2: (B, asd)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Right table:
> > >>>>>>>>>>>>>>>>>>>> A: EntityA
> > >>>>>>>>>>>>>>>>>>>> B: EntityB
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> We could do a rekey as you proposed with a combined
> > >>>>>>> key,
> > >>>>>>>> but
> > >>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> propagating the value at all..
> > >>>>>>>>>>>>>>>>>>>> Rekey table:
> > >>>>>>>>>>>>>>>>>>>> A-1: (dummy value)
> > >>>>>>>>>>>>>>>>>>>> B-2: (dummy value)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Which we then join with the right table to produce:
> > >>>>>>>>>>>>>>>>>>>> A-1: EntityA
> > >>>>>>>>>>>>>>>>>>>> B-2: EntityB
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Which gets rekeyed back:
> > >>>>>>>>>>>>>>>>>>>> 1: A, EntityA
> > >>>>>>>>>>>>>>>>>>>> 2: B, EntityB
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> And finally we do the actual join:
> > >>>>>>>>>>>>>>>>>>>> Result table:
> > >>>>>>>>>>>>>>>>>>>> 1: ((A, xyz), EntityA)
> > >>>>>>>>>>>>>>>>>>>> 2: ((B, asd), EntityB)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The thing is that in that last join, we have the
> > >>>>>>>> opportunity
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> compare
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> current FK in the left table with the incoming PK of
> > >>>>>>> the
> > >>>>>>>>> right
> > >>>>>>>>>>>>>>>>> table. If
> > >>>>>>>>>>>>>>>>>>>> they don't match, we just drop the event, since it
> > >>>>>>> must be
> > >>>>>>>>>>>> outdated.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> In your KIP, you gave an example in which (1: A,
> > >>>>> xyz)
> > >>>>>>> gets
> > >>>>>>>>>>> updated
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> (1:
> > >>>>>>>>>>>>>>>>>>>> B, xyz), ultimately yielding a conundrum about
> > >>>>> whether
> > >>>>>>> the
> > >>>>>>>>>> final
> > >>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>> should be (1: null) or (1: joined-on-B). With the
> > >>>>>>>> algorithm
> > >>>>>>>>>>> above,
> > >>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
> > >>>>>>> (B,
> > >>>>>>>>> xyz),
> > >>>>>>>>>>> (B,
> > >>>>>>>>>>>>>>>>>>>> EntityB)). It seems like this does give you enough
> > >>>>>>>>> information
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> right choice, regardless of disordering.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Will check Adams patch, but this should work. As
> > >>>>>>> mentioned
> > >>>>>>>>>> often
> > >>>>>>>>>>> I
> > >>>>>>>>>>>> am
> > >>>>>>>>>>>>>>>>>>> not convinced on partitioning back for the user
> > >>>>>>>>> automatically.
> > >>>>>>>>>> I
> > >>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>> this is the real performance eater ;)
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 7. Last thought... I'm a little concerned about the
> > >>>>>>>>>> performance
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> range scans when records change in the right table.
> > >>>>>>> You've
> > >>>>>>>>>> said
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> you've
> > >>>>>>>>>>>>>>>>>>>> been using the algorithm you presented in production
> > >>>>>>> for a
> > >>>>>>>>>>> while.
> > >>>>>>>>>>>>>> Can
> > >>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> give us a sense of the performance characteristics
> > >>>>>>> you've
> > >>>>>>>>>>>> observed?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Make it work, make it fast, make it beautiful. The
> > >>>>>>> topmost
> > >>>>>>>>>> thing
> > >>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> / was correctness. In practice I do not measure the
> > >>>>>>>>> performance
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> range scan. Usual cases I run this with is emitting
> > >>>>>>> 500k -
> > >>>>>>>>> 1kk
> > >>>>>>>>>>> rows
> > >>>>>>>>>>>>>>>>>>> on a left hand side change. The range scan is just
> > >>>>> the
> > >>>>>>> work
> > >>>>>>>>> you
> > >>>>>>>>>>>> gotta
> > >>>>>>>>>>>>>>>>>>> do, also when you pack your data into different
> > >>>>> formats,
> > >>>>>>>>>> usually
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> rocks performance is very tight to the size of the
> > >>>>> data
> > >>>>>>> and
> > >>>>>>>>> we
> > >>>>>>>>>>>> can't
> > >>>>>>>>>>>>>>>>>>> really change that. It is more important for users to
> > >>>>>>>> prevent
> > >>>>>>>>>>>> useless
> > >>>>>>>>>>>>>>>>>>> updates to begin with. My left hand side is guarded
> > >>>>> to
> > >>>>>>> drop
> > >>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> are not going to change my join output.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> usually it's:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> drop unused fields and then don't forward if
> > >>>>>>>> old.equals(new)
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> regarding to the performance of creating an iterator
> > >>>>> for
> > >>>>>>>>>> smaller
> > >>>>>>>>>>>>>>>>>>> fanouts, users can still just do a group by first
> > >>>>> then
> > >>>>>>>>> anyways.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I could only think of one alternative, but I'm not
> > >>>>>>> sure if
> > >>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>> worse... If the first re-key only needs to preserve
> > >>>>> the
> > >>>>>>>>>> original
> > >>>>>>>>>>>>>> key,
> > >>>>>>>>>>>>>>>>>>> as I
> > >>>>>>>>>>>>>>>>>>>> proposed in #6, then we could store a vector of
> > >>>>> keys in
> > >>>>>>>> the
> > >>>>>>>>>>> value:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Left table:
> > >>>>>>>>>>>>>>>>>>>> 1: A,...
> > >>>>>>>>>>>>>>>>>>>> 2: B,...
> > >>>>>>>>>>>>>>>>>>>> 3: A,...
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Gets re-keyed:
> > >>>>>>>>>>>>>>>>>>>> A: [1, 3]
> > >>>>>>>>>>>>>>>>>>>> B: [2]
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Then, the rhs part of the join would only need a
> > >>>>>>> regular
> > >>>>>>>>>>>> single-key
> > >>>>>>>>>>>>>>>>>>> lookup.
> > >>>>>>>>>>>>>>>>>>>> Of course we have to deal with the problem of large
> > >>>>>>>> values,
> > >>>>>>>>> as
> > >>>>>>>>>>>>>>>>> there's
> > >>>>>>>>>>>>>>>>>>> no
> > >>>>>>>>>>>>>>>>>>>> bound on the number of lhs records that can
> > >>>>> reference
> > >>>>>>> rhs
> > >>>>>>>>>>> records.
> > >>>>>>>>>>>>>>>>>>> Offhand,
> > >>>>>>>>>>>>>>>>>>>> I'd say we could page the values, so when one row is
> > >>>>>>> past
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> threshold, we
> > >>>>>>>>>>>>>>>>>>>> append the key for the next page. Then in most
> > >>>>> cases,
> > >>>>>>> it
> > >>>>>>>>> would
> > >>>>>>>>>>> be
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>>>> key lookup, but for large fan-out updates, it would
> > >>>>> be
> > >>>>>>> one
> > >>>>>>>>> per
> > >>>>>>>>>>>> (max
> > >>>>>>>>>>>>>>>>>>> value
> > >>>>>>>>>>>>>>>>>>>> size)/(avg lhs key size).
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> This seems more complex, though... Plus, I think
> > >>>>>>> there's
> > >>>>>>>>> some
> > >>>>>>>>>>>> extra
> > >>>>>>>>>>>>>>>>>>>> tracking we'd need to do to know when to emit a
> > >>>>>>>> retraction.
> > >>>>>>>>>> For
> > >>>>>>>>>>>>>>>>> example,
> > >>>>>>>>>>>>>>>>>>>> when record 1 is deleted, the re-key table would
> > >>>>> just
> > >>>>>>> have
> > >>>>>>>>> (A:
> > >>>>>>>>>>>> [3]).
> > >>>>>>>>>>>>>>>>>>> Some
> > >>>>>>>>>>>>>>>>>>>> kind of tombstone is needed so that the join result
> > >>>>>>> for 1
> > >>>>>>>>> can
> > >>>>>>>>>>> also
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> retracted.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> That's all!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks so much to both Adam and Jan for the
> > >>>>> thoughtful
> > >>>>>>>> KIP.
> > >>>>>>>>>>> Sorry
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> discussion has been slow.
> > >>>>>>>>>>>>>>>>>>>> -John
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> > >>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Id say you can just call the vote.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> that happens all the time, and if something comes
> > >>>>> up,
> > >>>>>>> it
> > >>>>>>>>> just
> > >>>>>>>>>>>> goes
> > >>>>>>>>>>>>>>>>> back
> > >>>>>>>>>>>>>>>>>>>>> to discuss.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> would not expect to much attention with another
> > >>>>>>> another
> > >>>>>>>>> email
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>> thread.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> best Jan
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> > >>>>>>>>>>>>>>>>>>>>>> Hello Contributors
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> I know that 2.1 is about to be released, but I do
> > >>>>>>> need
> > >>>>>>>> to
> > >>>>>>>>>> bump
> > >>>>>>>>>>>>>>>>> this to
> > >>>>>>>>>>>>>>>>>>>>> keep
> > >>>>>>>>>>>>>>>>>>>>>> visibility up. I am still intending to push this
> > >>>>>>> through
> > >>>>>>>>>> once
> > >>>>>>>>>>>>>>>>>>> contributor
> > >>>>>>>>>>>>>>>>>>>>>> feedback is given.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Main points that need addressing:
> > >>>>>>>>>>>>>>>>>>>>>> 1) Any way (or benefit) in structuring the current
> > >>>>>>>>> singular
> > >>>>>>>>>>>> graph
> > >>>>>>>>>>>>>>>>> node
> > >>>>>>>>>>>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>>>>>>>> multiple nodes? It has a whopping 25 parameters
> > >>>>> right
> > >>>>>>>>> now. I
> > >>>>>>>>>>> am
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>>>>>> fuzzy
> > >>>>>>>>>>>>>>>>>>>>>> on how the optimizations are supposed to work, so
> > >>>>> I
> > >>>>>>>> would
> > >>>>>>>>>>>>>>>>> appreciate
> > >>>>>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>>>> help on this aspect.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 2) Overall strategy for joining + resolving. This
> > >>>>>>> thread
> > >>>>>>>>> has
> > >>>>>>>>>>>> much
> > >>>>>>>>>>>>>>>>>>>>> discourse
> > >>>>>>>>>>>>>>>>>>>>>> between Jan and I between the current highwater
> > >>>>> mark
> > >>>>>>>>>> proposal
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>> groupBy
> > >>>>>>>>>>>>>>>>>>>>>> + reduce proposal. I am of the opinion that we
> > >>>>> need
> > >>>>>>> to
> > >>>>>>>>>>> strictly
> > >>>>>>>>>>>>>>>>> handle
> > >>>>>>>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>>>> chance of out-of-order data and leave none of it
> > >>>>> up
> > >>>>>>> to
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>> consumer.
> > >>>>>>>>>>>>>>>>>>> Any
> > >>>>>>>>>>>>>>>>>>>>>> comments or suggestions here would also help.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 3) Anything else that you see that would prevent
> > >>>>> this
> > >>>>>>>> from
> > >>>>>>>>>>>> moving
> > >>>>>>>>>>>>>>>>> to a
> > >>>>>>>>>>>>>>>>>>>>> vote?
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> > >>>>>>>>>>>>>>>>>>>>> adam.bellemare@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Jan
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> With the Stores.windowStoreBuilder and
> > >>>>>>>>>>>>>>>>> Stores.persistentWindowStore,
> > >>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>> actually only need to specify the amount of
> > >>>>> segments
> > >>>>>>>> you
> > >>>>>>>>>> want
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>>>>>> large
> > >>>>>>>>>>>>>>>>>>>>>>> they are. To the best of my understanding, what
> > >>>>>>> happens
> > >>>>>>>>> is
> > >>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> segments are automatically rolled over as new
> > >>>>> data
> > >>>>>>> with
> > >>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>> timestamps
> > >>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>> created. We use this exact functionality in some
> > >>>>> of
> > >>>>>>> the
> > >>>>>>>>>> work
> > >>>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>>>>>>>> internally at my company. For reference, this is
> > >>>>> the
> > >>>>>>>>>> hopping
> > >>>>>>>>>>>>>>>>> windowed
> > >>>>>>>>>>>>>>>>>>>>> store.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> In the code that I have provided, there are going
> > >>>>>>> to be
> > >>>>>>>>> two
> > >>>>>>>>>>> 24h
> > >>>>>>>>>>>>>>>>>>>>> segments.
> > >>>>>>>>>>>>>>>>>>>>>>> When a record is put into the windowStore, it
> > >>>>> will
> > >>>>>>> be
> > >>>>>>>>>>> inserted
> > >>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>>>> T in
> > >>>>>>>>>>>>>>>>>>>>>>> both segments. The two segments will always
> > >>>>> overlap
> > >>>>>>> by
> > >>>>>>>>> 12h.
> > >>>>>>>>>>> As
> > >>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>>>> goes on
> > >>>>>>>>>>>>>>>>>>>>>>> and new records are added (say at time T+12h+),
> > >>>>> the
> > >>>>>>>>> oldest
> > >>>>>>>>>>>>>> segment
> > >>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> automatically deleted and a new segment created.
> > >>>>> The
> > >>>>>>>>>> records
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>>> inserted with the context.timestamp(), such that
> > >>>>> it
> > >>>>>>> is
> > >>>>>>>>> the
> > >>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>>>>> time,
> > >>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>> the clock time, which is used.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> To the best of my understanding, the timestamps
> > >>>>> are
> > >>>>>>>>>> retained
> > >>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>>> restoring from the changelog.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Basically, this is heavy-handed way to deal with
> > >>>>> TTL
> > >>>>>>>> at a
> > >>>>>>>>>>>>>>>>>>> segment-level,
> > >>>>>>>>>>>>>>>>>>>>>>> instead of at an individual record level.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> > >>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Will that work? I expected it to blow up with
> > >>>>>>>>>>>> ClassCastException
> > >>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>>> similar.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> You either would have to specify the window you
> > >>>>>>>>> fetch/put
> > >>>>>>>>>> or
> > >>>>>>>>>>>>>>>>> iterate
> > >>>>>>>>>>>>>>>>>>>>>>>> across all windows the key was found in right?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I just hope the window-store doesn't check
> > >>>>>>> stream-time
> > >>>>>>>>>> under
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> hoods
> > >>>>>>>>>>>>>>>>>>>>>>>> that would be a questionable interface.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> If it does: did you see my comment on checking
> > >>>>> all
> > >>>>>>> the
> > >>>>>>>>>>> windows
> > >>>>>>>>>>>>>>>>>>> earlier?
> > >>>>>>>>>>>>>>>>>>>>>>>> that would be needed to actually give reasonable
> > >>>>>>> time
> > >>>>>>>>>>>> gurantees.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Best
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jan
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
> > >>>>>>> changed
> > >>>>>>>>> the
> > >>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>> store,
> > >>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>> the ProcessorSupplier.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the information. This is indeed
> > >>>>>>>> something
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> will be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> extremely
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> useful for this KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your explanations. That being
> > >>>>> said, I
> > >>>>>>>> will
> > >>>>>>>>>> not
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> moving
> > >>>>>>>>>>>>>>>>>>>>>>>> ahead
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> with an implementation using
> > >>>>> reshuffle/groupBy
> > >>>>>>>>> solution
> > >>>>>>>>>>> as
> > >>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>> propose.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> That being said, if you wish to implement it
> > >>>>>>>> yourself
> > >>>>>>>>>> off
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> my
> > >>>>>>>>>>>>>>>>>>>>>>>> current PR
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> and submit it as a competitive alternative, I
> > >>>>>>> would
> > >>>>>>>>> be
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>> happy to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> help vet that as an alternate solution. As it
> > >>>>>>>> stands
> > >>>>>>>>>>> right
> > >>>>>>>>>>>>>>>>> now,
> > >>>>>>>>>>>>>>>>>>> I do
> > >>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> really have more time to invest into
> > >>>>>>> alternatives
> > >>>>>>>>>> without
> > >>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> strong indication from the binding voters
> > >>>>> which
> > >>>>>>>> they
> > >>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>> prefer.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hey, total no worries. I think I personally
> > >>>>> gave
> > >>>>>>> up
> > >>>>>>>> on
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> streams
> > >>>>>>>>>>>>>>>>>>>>> DSL
> > >>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>> some time already, otherwise I would have
> > >>>>> pulled
> > >>>>>>>> this
> > >>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>>>>>>>>>> already.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I am currently reimplementing my own DSL
> > >>>>> based on
> > >>>>>>>>> PAPI.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I will look at finishing up my PR with the
> > >>>>>>> windowed
> > >>>>>>>>>> state
> > >>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> next
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> week or so, exercising it via tests, and
> > >>>>> then I
> > >>>>>>>> will
> > >>>>>>>>>> come
> > >>>>>>>>>>>>>> back
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>> final
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. In the meantime, I hope that
> > >>>>> any of
> > >>>>>>>> the
> > >>>>>>>>>>>> binding
> > >>>>>>>>>>>>>>>>>>> voters
> > >>>>>>>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
> > >>>>>>> updated
> > >>>>>>>> it
> > >>>>>>>>>>>>>>>>> according
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> latest plan:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I have also updated the KIP PR to use a
> > >>>>> windowed
> > >>>>>>>>> store.
> > >>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever
> > >>>>> they
> > >>>>>>>> are
> > >>>>>>>>>>>>>>>>> completed.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
> > >>>>>>>> already
> > >>>>>>>>>>>> updated
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> PR?
> > >>>>>>>>>>>>>>>>>>>>>>>>>> expected it to change to Windowed<K>,Long
> > >>>>> Missing
> > >>>>>>>>>>> something?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang
> > >>>>> Wang <
> > >>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533
> > >>>>> is
> > >>>>>>> the
> > >>>>>>>>>> wrong
> > >>>>>>>>>>>>>> link,
> > >>>>>>>>>>>>>>>>>>> as it
> > >>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
> > >>>>>>> part of
> > >>>>>>>>>>> KIP-258
> > >>>>>>>>>>>>>>>>> we do
> > >>>>>>>>>>>>>>>>>>>>>>>> want to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> have "handling out-of-order data for source
> > >>>>>>>> KTable"
> > >>>>>>>>>> such
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> instead of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> blindly apply the updates to the
> > >>>>> materialized
> > >>>>>>>> store,
> > >>>>>>>>>>> i.e.
> > >>>>>>>>>>>>>>>>>>> following
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> offset
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering, we will reject updates that are
> > >>>>> older
> > >>>>>>>> than
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> current
> > >>>>>>>>>>>>>>>>>>>>>>>> key's
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps, i.e. following timestamp
> > >>>>> ordering.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
> > >>>>>>> Wang <
> > >>>>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Adam,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
> > >>>>>>> final
> > >>>>>>>>> step
> > >>>>>>>>>>>> (i.e.
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> high
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
> > >>>>>>> with
> > >>>>>>>> a
> > >>>>>>>>>>> window
> > >>>>>>>>>>>>>>>>>>> store),
> > >>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> another current on-going KIP may actually
> > >>>>>>> help:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is for adding the timestamp into a
> > >>>>>>> key-value
> > >>>>>>>>>> store
> > >>>>>>>>>>>>>>>>> (i.e.
> > >>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
> > >>>>>>> usage,
> > >>>>>>>> as
> > >>>>>>>>>>>>>>>>> described
> > >>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533
> > >>>>>>>> ,
> > >>>>>>>>> is
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "reject" updates from the source topics if
> > >>>>> its
> > >>>>>>>>>>> timestamp
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> smaller
> > >>>>>>>>>>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current key's latest update timestamp.
> > >>>>> I
> > >>>>>>>> think
> > >>>>>>>>> it
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>>>>>> similar to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> what you have in mind for high watermark
> > >>>>> based
> > >>>>>>>>>>> filtering,
> > >>>>>>>>>>>>>>>>> while
> > >>>>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to make sure that the timestamps of
> > >>>>> the
> > >>>>>>>>> joining
> > >>>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> correctly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> inherited though the whole topology to the
> > >>>>>>> final
> > >>>>>>>>>> stage.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Note that this KIP is for key-value store
> > >>>>> and
> > >>>>>>>> hence
> > >>>>>>>>>>>>>>>>>>> non-windowed
> > >>>>>>>>>>>>>>>>>>>>>>>> KTables
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
> > >>>>>>> really
> > >>>>>>>>> have
> > >>>>>>>>>> a
> > >>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>>>>> support
> > >>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> their joins anyways (
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think we can just consider non-windowed
> > >>>>>>>>> KTable-KTable
> > >>>>>>>>>>>>>>>>> non-key
> > >>>>>>>>>>>>>>>>>>>>> joins
> > >>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan
> > >>>>> Filipiak
> > >>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current highwater mark implementation
> > >>>>> would
> > >>>>>>>> grow
> > >>>>>>>>>>>>>> endlessly
> > >>>>>>>>>>>>>>>>>>> based
> > >>>>>>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary key of original event. It is a
> > >>>>> pair
> > >>>>>>> of
> > >>>>>>>>>> (<this
> > >>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>> primary
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key>,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <highest offset seen for that key>). This
> > >>>>> is
> > >>>>>>> used
> > >>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> differentiate
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
> > >>>>>>> proposal
> > >>>>>>>>>> would
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> replace
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
> > >>>>>>> This
> > >>>>>>>>> would
> > >>>>>>>>>>>> allow
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on
> > >>>>> time.
> > >>>>>>> This
> > >>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>> allow
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
> > >>>>>>>> should
> > >>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> customizable
> > >>>>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
> > >>>>>>> perhaps
> > >>>>>>>>> just
> > >>>>>>>>>>> 10
> > >>>>>>>>>>>>>>>>>>> minutes
> > >>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or perhaps 7 days...).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can
> > >>>>> do
> > >>>>>>> the
> > >>>>>>>>>> trick
> > >>>>>>>>>>>>>> here.
> > >>>>>>>>>>>>>>>>>>> Even
> > >>>>>>>>>>>>>>>>>>>>>>>> if I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would still like to see the automatic
> > >>>>>>>>> repartitioning
> > >>>>>>>>>>>>>>>>> optional
> > >>>>>>>>>>>>>>>>>>>>>>>> since I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
> > >>>>>>> am a
> > >>>>>>>>>> little
> > >>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>>>>>>>>> sceptical
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> how to determine the window. So esentially
> > >>>>> one
> > >>>>>>>>> could
> > >>>>>>>>>>> run
> > >>>>>>>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>>>>>>>>>> problems
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the rapid change happens near a window
> > >>>>>>> border. I
> > >>>>>>>>> will
> > >>>>>>>>>>>> check
> > >>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in detail, if its
> > >>>>>>> problematic, we
> > >>>>>>>>>> could
> > >>>>>>>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>>>>>>> check
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> _all_
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows on read with not to bad
> > >>>>> performance
> > >>>>>>>>> impact I
> > >>>>>>>>>>>>>> guess.
> > >>>>>>>>>>>>>>>>>>> Will
> > >>>>>>>>>>>>>>>>>>>>>>>> let
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know if the implementation would be
> > >>>>> correct
> > >>>>>>> as
> > >>>>>>>>> is. I
> > >>>>>>>>>>>>>>>>> wouldn't
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
> > >>>>>>>>> timestamp(A)  <
> > >>>>>>>>>>>>>>>>>>>>> timestamp(B).
> > >>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we can't expect that.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I believe I understand what you mean now
> > >>>>> -
> > >>>>>>>> thanks
> > >>>>>>>>>> for
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> diagram, it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did really help. You are correct that I
> > >>>>> do
> > >>>>>>> not
> > >>>>>>>>> have
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> key available, and I can see that if it was
> > >>>>>>>>> available
> > >>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>> would be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> able to add and remove events from the
> > >>>>> Map.
> > >>>>>>>> That
> > >>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>> said,
> > >>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> encourage
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> you to finish your diagrams / charts just
> > >>>>> for
> > >>>>>>>>> clarity
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>> everyone
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> else.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just
> > >>>>> really
> > >>>>>>> hard
> > >>>>>>>>>> work.
> > >>>>>>>>>>>> But
> > >>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
> > >>>>>>>>> original
> > >>>>>>>>>>>>>> primary
> > >>>>>>>>>>>>>>>>>>> key,
> > >>>>>>>>>>>>>>>>>>>>> We
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join and Group by implemented our own in
> > >>>>> PAPI
> > >>>>>>>> and
> > >>>>>>>>>>>>>> basically
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> using
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely
> > >>>>> missed
> > >>>>>>>> that
> > >>>>>>>>> in
> > >>>>>>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>>>>> DSL
> > >>>>>>>>>>>>>>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
> > >>>>>>> up on
> > >>>>>>>>> my
> > >>>>>>>>>>> end.
> > >>>>>>>>>>>>>>>>> Will
> > >>>>>>>>>>>>>>>>>>>>>>>> finish
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
> > >>>>>>> week.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My follow up question for you is, won't
> > >>>>> the
> > >>>>>>> Map
> > >>>>>>>>> stay
> > >>>>>>>>>>>>>> inside
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> State
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Store indefinitely after all of the
> > >>>>> changes
> > >>>>>>>> have
> > >>>>>>>>>>>>>>>>> propagated?
> > >>>>>>>>>>>>>>>>>>>>> Isn't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
> > >>>>>>> state
> > >>>>>>>>>> store?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
> > >>>>>>> substractor
> > >>>>>>>> is
> > >>>>>>>>>>> gonna
> > >>>>>>>>>>>>>>>>>>> return
> > >>>>>>>>>>>>>>>>>>>>>>>> `null`
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
> > >>>>>>> there
> > >>>>>>>> is
> > >>>>>>>>>>> going
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use
> > >>>>> this
> > >>>>>>>> store
> > >>>>>>>>>>>> directly
> > >>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues()
> > >>>>> is a
> > >>>>>>>>>> regular
> > >>>>>>>>>>>>>>>>> store,
> > >>>>>>>>>>>>>>>>>>>>>>>> satisfying
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
> > >>>>>>> join.
> > >>>>>>>>> The
> > >>>>>>>>>>>>>>>>> Windowed
> > >>>>>>>>>>>>>>>>>>>>>>>> store is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> keeping the values, so for the next
> > >>>>> statefull
> > >>>>>>>>>> operation
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
> > >>>>>>> have
> > >>>>>>>> the
> > >>>>>>>>>>>> window
> > >>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>> also
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the values then.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Long story short. if we can flip in a
> > >>>>> custom
> > >>>>>>>> group
> > >>>>>>>>>> by
> > >>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioning to the original primary
> > >>>>> key i
> > >>>>>>>> think
> > >>>>>>>>>> it
> > >>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>> help
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> users
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> big time in building efficient apps. Given
> > >>>>> the
> > >>>>>>>>>> original
> > >>>>>>>>>>>>>>>>> primary
> > >>>>>>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> issue I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand that we do not have a solid
> > >>>>>>> foundation
> > >>>>>>>>> to
> > >>>>>>>>>>>> build
> > >>>>>>>>>>>>>>>>> on.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Leaving primary key carry along to the
> > >>>>> user.
> > >>>>>>>> very
> > >>>>>>>>>>>>>>>>>>> unfortunate. I
> > >>>>>>>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand the decision goes like that. I
> > >>>>> do
> > >>>>>>> not
> > >>>>>>>>>> think
> > >>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> decision.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM,
> > >>>>> Prajakta
> > >>>>>>>>> Dumbre <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com <mailto:
> > >>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             please remove me from this
> > >>>>> group
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             On Tue, Sep 11, 2018 at 1:29
> PM
> > >>>>>>> Jan
> > >>>>>>>>>>> Filipiak
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <Jan.Filipiak@trivago.com
> > >>>>>>> <mailto:
> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Hi Adam,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > give me some time, will
> make
> > >>>>>>> such a
> > >>>>>>>>>>> chart.
> > >>>>>>>>>>>>>> last
> > >>>>>>>>>>>>>>>>>>> time i
> > >>>>>>>>>>>>>>>>>>>>>>>> didn't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             get along
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > well with giphy and ruined
> > >>>>> all
> > >>>>>>> your
> > >>>>>>>>>>> charts.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Hopefully i can get it done
> > >>>>>>> today
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > On 08.09.2018 16:00, Adam
> > >>>>>>> Bellemare
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > Hi Jan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > I have included a diagram
> > >>>>> of
> > >>>>>>>> what I
> > >>>>>>>>>>>>>> attempted
> > >>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > I attempted this back at
> > >>>>> the
> > >>>>>>>> start
> > >>>>>>>>> of
> > >>>>>>>>>>> my
> > >>>>>>>>>>>> own
> > >>>>>>>>>>>>>>>>>>>>>>>> implementation
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > solution, and since I
> could
> > >>>>>>> not
> > >>>>>>>> get
> > >>>>>>>>>> it
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> work I
> > >>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>> since
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             discarded the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > code. At this point in
> > >>>>> time,
> > >>>>>>> if
> > >>>>>>>> you
> > >>>>>>>>>>> wish
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> continue
> > >>>>>>>>>>>>>>>>>>>>>>>> pursuing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             for your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > groupBy solution, I ask
> > >>>>> that
> > >>>>>>> you
> > >>>>>>>>>> please
> > >>>>>>>>>>>>>>>>> create a
> > >>>>>>>>>>>>>>>>>>>>>>>> diagram on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the KIP
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > carefully explaining your
> > >>>>>>>> solution.
> > >>>>>>>>>>>> Please
> > >>>>>>>>>>>>>>>>> feel
> > >>>>>>>>>>>>>>>>>>> free
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the image I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > just posted as a starting
> > >>>>>>> point.
> > >>>>>>>> I
> > >>>>>>>>> am
> > >>>>>>>>>>>> having
> > >>>>>>>>>>>>>>>>>>> trouble
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             understanding your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > explanations but I think
> > >>>>> that
> > >>>>>>> a
> > >>>>>>>>>>> carefully
> > >>>>>>>>>>>>>>>>>>> constructed
> > >>>>>>>>>>>>>>>>>>>>>>>> diagram
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             will clear
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > up
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > any misunderstandings.
> > >>>>>>>> Alternately,
> > >>>>>>>>>>>> please
> > >>>>>>>>>>>>>>>>> post a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             comprehensive PR with
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > your solution. I can only
> > >>>>>>> guess
> > >>>>>>>> at
> > >>>>>>>>>> what
> > >>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>> mean, and
> > >>>>>>>>>>>>>>>>>>>>>>>> since I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             value my
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > own
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > time as much as you value
> > >>>>>>> yours,
> > >>>>>>>> I
> > >>>>>>>>>>>> believe
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             responsibility to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > provide an implementation
> > >>>>>>> instead
> > >>>>>>>>> of
> > >>>>>>>>>> me
> > >>>>>>>>>>>>>>>>> trying to
> > >>>>>>>>>>>>>>>>>>>>> guess.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > Adam
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > On Sat, Sep 8, 2018 at
> 8:00
> > >>>>>>> AM,
> > >>>>>>>> Jan
> > >>>>>>>>>>>> Filipiak
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <Jan.Filipiak@trivago.com
> > >>>>>>> <mailto:
> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > > wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> Hi James,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> nice to see you beeing
> > >>>>>>>> interested.
> > >>>>>>>>>>> kafka
> > >>>>>>>>>>>>>>>>>>> streams at
> > >>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             point supports
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> all sorts of joins as
> > >>>>> long as
> > >>>>>>>> both
> > >>>>>>>>>>>> streams
> > >>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> Adam is currently
> > >>>>>>> implementing a
> > >>>>>>>>>> join
> > >>>>>>>>>>>>>> where a
> > >>>>>>>>>>>>>>>>>>> KTable
> > >>>>>>>>>>>>>>>>>>>>>>>> and a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             KTable can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> a one to many relation
> > >>>>> ship
> > >>>>>>>> (1:n).
> > >>>>>>>>>> We
> > >>>>>>>>>>>>>> exploit
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> rocksdb
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> datastore that keeps data
> > >>>>>>> sorted
> > >>>>>>>> (At
> > >>>>>>>>>>> least
> > >>>>>>>>>>>>>>>>>>> exposes an
> > >>>>>>>>>>>>>>>>>>>>>>>> API to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             access the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> stored data in a sorted
> > >>>>>>>> fashion).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> I think the technical
> > >>>>> caveats
> > >>>>>>>> are
> > >>>>>>>>>> well
> > >>>>>>>>>>>>>>>>>>> understood
> > >>>>>>>>>>>>>>>>>>>>> now
> > >>>>>>>>>>>>>>>>>>>>>>>> and we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > basically
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> down to philosophy and
> API
> > >>>>>>>> Design
> > >>>>>>>>> (
> > >>>>>>>>>>> when
> > >>>>>>>>>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>> sees
> > >>>>>>>>>>>>>>>>>>>>> my
> > >>>>>>>>>>>>>>>>>>>>>>>> newest
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             message).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> I have a lengthy track
> > >>>>>>> record of
> > >>>>>>>>>>> loosing
> > >>>>>>>>>>>>>>>>> those
> > >>>>>>>>>>>>>>>>>>> kinda
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             arguments within
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> streams community and I
> > >>>>> have
> > >>>>>>> no
> > >>>>>>>>> clue
> > >>>>>>>>>>>> why.
> > >>>>>>>>>>>>>> So
> > >>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>> literally
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             can't wait for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > you
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> to churn through this
> > >>>>> thread
> > >>>>>>> and
> > >>>>>>>>>> give
> > >>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>> opinion on
> > >>>>>>>>>>>>>>>>>>>>>>>> how we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             should
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > design
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> the return type of the
> > >>>>>>>>> oneToManyJoin
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>>>> many
> > >>>>>>>>>>>>>>>>>>>>>>>> power we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             want to give
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> the user vs "simplicity"
> > >>>>>>> (where
> > >>>>>>>>>>>> simplicity
> > >>>>>>>>>>>>>>>>> isn't
> > >>>>>>>>>>>>>>>>>>>>>>>> really that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             as users
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > still
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> need to understand it I
> > >>>>>>> argue)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> waiting for you to join
> > >>>>> in on
> > >>>>>>>> the
> > >>>>>>>>>>>>>> discussion
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> Best Jan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >> On 07.09.2018 15:49,
> James
> > >>>>>>> Kwan
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> I am new to this group
> > >>>>> and I
> > >>>>>>>>> found
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>> subject
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             interesting.  Sounds
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> you guys want to
> > >>>>> implement a
> > >>>>>>>> join
> > >>>>>>>>>>>> table of
> > >>>>>>>>>>>>>>>>> two
> > >>>>>>>>>>>>>>>>>>>>>>>> streams? Is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > somewhere
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> I can see the original
> > >>>>>>>>> requirement
> > >>>>>>>>>> or
> > >>>>>>>>>>>>>>>>> proposal?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>> On Sep 7, 2018, at 8:13
> > >>>>> AM,
> > >>>>>>> Jan
> > >>>>>>>>>>>> Filipiak
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <Jan.Filipiak@trivago.com
> > >>>>>>> <mailto:
> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> On 05.09.2018 22:17,
> > >>>>> Adam
> > >>>>>>>>>> Bellemare
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> I'm currently testing
> > >>>>>>> using a
> > >>>>>>>>>>>> Windowed
> > >>>>>>>>>>>>>>>>> Store
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> store the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             highwater
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> mark.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> By all indications
> this
> > >>>>>>>> should
> > >>>>>>>>>> work
> > >>>>>>>>>>>>>> fine,
> > >>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> caveat
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             being that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> only resolve
> > >>>>> out-of-order
> > >>>>>>>>> arrival
> > >>>>>>>>>>>> for up
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> size of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the window
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > (ie:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> 24h, 72h, etc). This
> > >>>>> would
> > >>>>>>>>> remove
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> possibility
> > >>>>>>>>>>>>>>>>>>>>>>>> of it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > unbounded
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> size.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> With regards to Jan's
> > >>>>>>>>>> suggestion, I
> > >>>>>>>>>>>>>>>>> believe
> > >>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>> where
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             we will
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> remain in
> disagreement.
> > >>>>>>>> While I
> > >>>>>>>>>> do
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>> disagree
> > >>>>>>>>>>>>>>>>>>>>>>>> with your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             statement
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> about
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> there likely to be
> > >>>>>>> additional
> > >>>>>>>>>> joins
> > >>>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>> in a
> > >>>>>>>>>>>>>>>>>>>>>>>> real-world
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             workflow, I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > do
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> see how you can
> > >>>>>>> conclusively
> > >>>>>>>>> deal
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>> out-of-order
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arrival
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> foreign-key
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> changes and
> subsequent
> > >>>>>>>> joins. I
> > >>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>> attempted
> > >>>>>>>>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             think you have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> proposed (without a
> > >>>>>>>> high-water,
> > >>>>>>>>>>> using
> > >>>>>>>>>>>>>>>>>>> groupBy and
> > >>>>>>>>>>>>>>>>>>>>>>>> reduce)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             and found
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> that if
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> the foreign key
> changes
> > >>>>>>> too
> > >>>>>>>>>>> quickly,
> > >>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> load
> > >>>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             stream thread
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> too
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> high, the joined
> > >>>>> messages
> > >>>>>>>> will
> > >>>>>>>>>>> arrive
> > >>>>>>>>>>>>>>>>>>>>> out-of-order
> > >>>>>>>>>>>>>>>>>>>>>>>> and be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             incorrectly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> propagated, such that
> > >>>>> an
> > >>>>>>>>>>> intermediate
> > >>>>>>>>>>>>>>>>> event
> > >>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> represented
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             as the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > final
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> event.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> Can you shed some
> light
> > >>>>> on
> > >>>>>>>> your
> > >>>>>>>>>>>> groupBy
> > >>>>>>>>>>>>>>>>>>>>>>>> implementation.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             There must be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> some sort of flaw in
> it.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> I have a suspicion
> > >>>>> where it
> > >>>>>>>> is,
> > >>>>>>>>> I
> > >>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             confirm. The idea
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> is bullet proof and it
> > >>>>>>> must be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> an implementation mess
> > >>>>> up.
> > >>>>>>> I
> > >>>>>>>>> would
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> clarify
> > >>>>>>>>>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             we draw a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> conclusion.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>    Repartitioning the
> > >>>>>>>> scattered
> > >>>>>>>>>>> events
> > >>>>>>>>>>>>>>>>> back to
> > >>>>>>>>>>>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> partitions is the only
> > >>>>> way I
> > >>>>>>>> know
> > >>>>>>>>>> how
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> conclusively
> > >>>>>>>>>>>>>>>>>>>>>>>> deal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             with
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> out-of-order events
> in
> > >>>>> a
> > >>>>>>>> given
> > >>>>>>>>>> time
> > >>>>>>>>>>>>>> frame,
> > >>>>>>>>>>>>>>>>>>> and to
> > >>>>>>>>>>>>>>>>>>>>>>>> ensure
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             that the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > data
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> eventually consistent
> > >>>>> with
> > >>>>>>>> the
> > >>>>>>>>>>> input
> > >>>>>>>>>>>>>>>>> events.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> If you have some code
> > >>>>> to
> > >>>>>>>> share
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> illustrates
> > >>>>>>>>>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             approach, I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> very grateful as it
> > >>>>> would
> > >>>>>>>>> remove
> > >>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>>>>>> misunderstandings
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             that I may
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> ah okay you were
> looking
> > >>>>>>> for
> > >>>>>>>> my
> > >>>>>>>>>>> code.
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>> don't
> > >>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             something easily
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> readable here as its
> > >>>>>>> bloated
> > >>>>>>>>> with
> > >>>>>>>>>>>>>>>>> OO-patterns.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> its anyhow trivial:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> @Override
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>      public T apply(K
> > >>>>>>> aggKey,
> > >>>>>>>> V
> > >>>>>>>>>>>> value, T
> > >>>>>>>>>>>>>>>>>>>>> aggregate)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>      {
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          Map<U, V>
> > >>>>>>>>>>> currentStateAsMap =
> > >>>>>>>>>>>>>>>>>>>>>>>> asMap(aggregate);
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <<
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             imaginary
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          U
> toModifyKey =
> > >>>>>>>>>>>>>>>>> mapper.apply(value);
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << this
> is
> > >>>>> the
> > >>>>>>>>> place
> > >>>>>>>>>>>> where
> > >>>>>>>>>>>>>>>>> people
> > >>>>>>>>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             gonna have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > issues
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> and why you probably
> > >>>>>>> couldn't
> > >>>>>>>> do
> > >>>>>>>>>> it.
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>>>>>> to find
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             a solution
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > here.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> I didn't realize that
> > >>>>> yet.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << we
> > >>>>>>> propagate
> > >>>>>>>> the
> > >>>>>>>>>>>> field in
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> joiner, so
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             that we can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > pick
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> it up in an aggregate.
> > >>>>>>>> Probably
> > >>>>>>>>>> you
> > >>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>> thought
> > >>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             this in your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> approach right?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << I am
> > >>>>> very
> > >>>>>>> open
> > >>>>>>>>> to
> > >>>>>>>>>>>> find a
> > >>>>>>>>>>>>>>>>>>> generic
> > >>>>>>>>>>>>>>>>>>>>>>>> solution
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             here. In my
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> honest opinion this is
> > >>>>>>> broken
> > >>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> KTableImpl.GroupBy
> > >>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             looses
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > the keys
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> and only maintains the
> > >>>>>>>> aggregate
> > >>>>>>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              << I
> > >>>>>>> abstracted
> > >>>>>>>> it
> > >>>>>>>>>> away
> > >>>>>>>>>>>> back
> > >>>>>>>>>>>>>>>>>>> then way
> > >>>>>>>>>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> i
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > thinking
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> of oneToMany join.
> That
> > >>>>> is
> > >>>>>>>> why I
> > >>>>>>>>>>>> didn't
> > >>>>>>>>>>>>>>>>>>> realize
> > >>>>>>>>>>>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             significance here.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              <<
> > >>>>> Opinions?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          for (V m :
> > >>>>>>> current)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          {
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>> currentStateAsMap.put(mapper.apply(m),
> > >>>>>>>>>>>>>> m);
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          }
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          if (isAdder)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          {
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>> currentStateAsMap.put(toModifyKey,
> > >>>>>>>>>>>>>> value);
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          }
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          else
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          {
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>> currentStateAsMap.remove(toModifyKey);
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>> if(currentStateAsMap.isEmpty()){
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> return
> > >>>>>>> null;
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>              }
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          }
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>          retrun
> > >>>>>>>>>>>>>>>>>>> asAggregateType(currentStateAsMap)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>      }
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> Adam
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> On Wed, Sep 5, 2018
> at
> > >>>>>>> 3:35
> > >>>>>>>> PM,
> > >>>>>>>>>> Jan
> > >>>>>>>>>>>>>>>>> Filipiak
> > >>>>>>>>>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Jan.Filipiak@trivago.com
> > >>>>>>> <mailto:
> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>> Thanks Adam for
> > >>>>> bringing
> > >>>>>>>>> Matthias
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> speed!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> about the
> > >>>>> differences. I
> > >>>>>>>> think
> > >>>>>>>>>>>>>> re-keying
> > >>>>>>>>>>>>>>>>>>> back
> > >>>>>>>>>>>>>>>>>>>>>>>> should be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             optional at
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> best.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> I would say we
> return
> > >>>>> a
> > >>>>>>>>>>>> KScatteredTable
> > >>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>> reshuffle()
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             returning
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>> KTable<originalKey,Joined>
> > >>>>>>>> to
> > >>>>>>>>>> make
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> backwards
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             repartitioning
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> optional.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> I am also in a big
> > >>>>>>> favour of
> > >>>>>>>>>> doing
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> order
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             processing using
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> group
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> by instead high
> water
> > >>>>>>> mark
> > >>>>>>>>>>> tracking.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> Just because
> unbounded
> > >>>>>>>> growth
> > >>>>>>>>> is
> > >>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>> scary
> > >>>>>>>>>>>>>>>>>>> + It
> > >>>>>>>>>>>>>>>>>>>>>>>> saves
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> us
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             the header
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> stuff.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> I think the
> > >>>>> abstraction
> > >>>>>>> of
> > >>>>>>>>>> always
> > >>>>>>>>>>>>>>>>>>> repartitioning
> > >>>>>>>>>>>>>>>>>>>>>>>> back is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             just not so
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> strong. Like the
> work
> > >>>>> has
> > >>>>>>>> been
> > >>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>> partition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             back and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> grouping
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> by something else
> > >>>>>>> afterwards
> > >>>>>>>>> is
> > >>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>> common.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> On 05.09.2018 13:49,
> > >>>>> Adam
> > >>>>>>>>>>> Bellemare
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>> Hi Matthias
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Thank you for your
> > >>>>>>>> feedback,
> > >>>>>>>>> I
> > >>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>> appreciate
> > >>>>>>>>>>>>>>>>>>>>> it!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> While name spacing
> > >>>>>>> would be
> > >>>>>>>>>>>> possible,
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>> require
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > deserialize
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> user headers what
> > >>>>>>> implies
> > >>>>>>>> a
> > >>>>>>>>>>>> runtime
> > >>>>>>>>>>>>>>>>>>> overhead.
> > >>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             suggest to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > no
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> namespace for now
> to
> > >>>>>>> avoid
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>> overhead.
> > >>>>>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > problem in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the future, we can
> > >>>>>>> still
> > >>>>>>>> add
> > >>>>>>>>>>> name
> > >>>>>>>>>>>>>>>>> spacing
> > >>>>>>>>>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>>>>>>>>>>> on.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Agreed. I will go
> > >>>>> with
> > >>>>>>>>> using a
> > >>>>>>>>>>>>>> reserved
> > >>>>>>>>>>>>>>>>>>> string
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             document it.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> My main concern
> about
> > >>>>>>> the
> > >>>>>>>>>> design
> > >>>>>>>>>>> it
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> type of
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             result KTable:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > If
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> understood the
> > >>>>> proposal
> > >>>>>>>>>>> correctly,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> In your example,
> you
> > >>>>>>> have
> > >>>>>>>>>> table1
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> table2
> > >>>>>>>>>>>>>>>>>>>>>>>> swapped.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             Here is how it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> works
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> currently:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 1) table1 has the
> > >>>>>>> records
> > >>>>>>>>> that
> > >>>>>>>>>>>> contain
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> foreign key
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             within their
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> value.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table1 input
> stream:
> > >>>>>>>>>>>> <a,(fk=A,bar=1)>,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> <c,(fk=B,bar=3)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table2 input
> stream:
> > >>>>>>> <A,X>,
> > >>>>>>>>>> <B,Y>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 2) A Value mapper
> is
> > >>>>>>>> required
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>> extract
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> foreign
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table1 foreign key
> > >>>>>>> mapper:
> > >>>>>>>> (
> > >>>>>>>>>>> value
> > >>>>>>>>>>>> =>
> > >>>>>>>>>>>>>>>>>>> value.fk
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <http://value.fk> )
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> The mapper is
> > >>>>> applied to
> > >>>>>>>> each
> > >>>>>>>>>>>> element
> > >>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> table1,
> > >>>>>>>>>>>>>>>>>>>>>>>> and a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             new combined
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> key is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> made:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> table1 mapped:
> <A-a,
> > >>>>>>>>>>> (fk=A,bar=1)>,
> > >>>>>>>>>>>>>>>>> <A-b,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             <B-c,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> (fk=B,bar=3)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 3) The rekeyed
> events
> > >>>>>>> are
> > >>>>>>>>>>>>>> copartitioned
> > >>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>> table2:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> a) Stream Thread
> with
> > >>>>>>>>> Partition
> > >>>>>>>>>>> 0:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> RepartitionedTable1:
> > >>>>>>> <A-a,
> > >>>>>>>>>>>>>>>>> (fk=A,bar=1)>,
> > >>>>>>>>>>>>>>>>>>> <A-b,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             (fk=A,bar=2)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Table2: <A,X>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> b) Stream Thread
> with
> > >>>>>>>>> Partition
> > >>>>>>>>>>> 1:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> RepartitionedTable1:
> > >>>>>>> <B-c,
> > >>>>>>>>>>>>>> (fk=B,bar=3)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Table2: <B,Y>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> 4) From here, they
> > >>>>> can
> > >>>>>>> be
> > >>>>>>>>>> joined
> > >>>>>>>>>>>>>>>>> together
> > >>>>>>>>>>>>>>>>>>>>> locally
> > >>>>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             applying the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> joiner
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> function.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> At this point,
> Jan's
> > >>>>>>> design
> > >>>>>>>>> and
> > >>>>>>>>>>> my
> > >>>>>>>>>>>>>>>>> design
> > >>>>>>>>>>>>>>>>>>>>>>>> deviate. My
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             design goes
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> repartition the
> data
> > >>>>>>>>> post-join
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> resolve
> > >>>>>>>>>>>>>>>>>>>>>>>> out-of-order
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             arrival of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> records,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> finally returning
> the
> > >>>>>>> data
> > >>>>>>>>>> keyed
> > >>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> original key.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             I do not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> expose
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> CombinedKey or any
> of
> > >>>>>>> the
> > >>>>>>>>>>> internals
> > >>>>>>>>>>>>>>>>>>> outside of
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             joinOnForeignKey
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> function. This does
> > >>>>> make
> > >>>>>>>> for
> > >>>>>>>>>>> larger
> > >>>>>>>>>>>>>>>>>>> footprint,
> > >>>>>>>>>>>>>>>>>>>>>>>> but it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             removes all
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> agency
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> for resolving
> > >>>>>>> out-of-order
> > >>>>>>>>>>> arrivals
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>> handling
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             CombinedKeys from
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> user. I believe
> that
> > >>>>>>> this
> > >>>>>>>>> makes
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> function
> > >>>>>>>>>>>>>>>>>>>>> much
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             to use.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Let me know if this
> > >>>>>>> helps
> > >>>>>>>>>> resolve
> > >>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>>>>> questions,
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             please feel
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> free to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> add anything else
> on
> > >>>>>>> your
> > >>>>>>>>> mind.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Thanks again,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Adam
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> On Tue, Sep 4, 2018
> > >>>>> at
> > >>>>>>> 8:36
> > >>>>>>>>> PM,
> > >>>>>>>>>>>>>>>>> Matthias J.
> > >>>>>>>>>>>>>>>>>>>>> Sax <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>> matthias@confluent.io
> > >>>>>>>>> <mailto:
> > >>>>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> I am just catching
> > >>>>> up
> > >>>>>>> on
> > >>>>>>>>> this
> > >>>>>>>>>>>>>> thread. I
> > >>>>>>>>>>>>>>>>>>> did
> > >>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> read
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             everything so
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> far,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> but want to share
> > >>>>>>> couple
> > >>>>>>>> of
> > >>>>>>>>>>>> initial
> > >>>>>>>>>>>>>>>>>>> thoughts:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Headers: I think
> > >>>>> there
> > >>>>>>> is
> > >>>>>>>> a
> > >>>>>>>>>>>>>> fundamental
> > >>>>>>>>>>>>>>>>>>>>>>>> difference
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             between header
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> usage
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> in this KIP and
> > >>>>> KP-258.
> > >>>>>>>> For
> > >>>>>>>>>> 258,
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> add
> > >>>>>>>>>>>>>>>>>>>>> headers
> > >>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             changelog topic
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> are owned by Kafka
> > >>>>>>> Streams
> > >>>>>>>>> and
> > >>>>>>>>>>>> nobody
> > >>>>>>>>>>>>>>>>>>> else is
> > >>>>>>>>>>>>>>>>>>>>>>>> supposed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             to write
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > into
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> them. In fact, no
> > >>>>> user
> > >>>>>>>>> header
> > >>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>> written
> > >>>>>>>>>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             changelog topic
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> thus, there are
> not
> > >>>>>>>>> conflicts.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Nevertheless, I
> > >>>>> don't
> > >>>>>>> see
> > >>>>>>>> a
> > >>>>>>>>>> big
> > >>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>> using
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             headers within
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Streams.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> As long as we
> > >>>>> document
> > >>>>>>> it,
> > >>>>>>>>> we
> > >>>>>>>>>>> can
> > >>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>>>> "reserved"
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             header keys
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> users are not
> > >>>>> allowed
> > >>>>>>> to
> > >>>>>>>> use
> > >>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>>>>>>>>>> data with
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             Kafka
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > Streams.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> IMHO, this should
> be
> > >>>>>>> ok.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> I think there is a
> > >>>>> safe
> > >>>>>>>> way
> > >>>>>>>>> to
> > >>>>>>>>>>>> avoid
> > >>>>>>>>>>>>>>>>>>>>> conflicts,
> > >>>>>>>>>>>>>>>>>>>>>>>> since
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > headers
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> only needed in
> > >>>>>>> internal
> > >>>>>>>>>> topics
> > >>>>>>>>>>> (I
> > >>>>>>>>>>>>>>>>> think):
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> For internal and
> > >>>>>>>> changelog
> > >>>>>>>>>>>> topics,
> > >>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> namespace
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             all headers:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> * user-defined
> > >>>>> headers
> > >>>>>>>> are
> > >>>>>>>>>>>>>> namespaced
> > >>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>> "external."
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             headerKey
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> * internal
> headers
> > >>>>> are
> > >>>>>>>>>>>> namespaced as
> > >>>>>>>>>>>>>>>>>>>>>>>> "internal." +
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             headerKey
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> While name
> spacing
> > >>>>>>> would
> > >>>>>>>> be
> > >>>>>>>>>>>>>> possible,
> > >>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> require
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> deserialize
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> user headers what
> > >>>>>>> implies
> > >>>>>>>> a
> > >>>>>>>>>>>> runtime
> > >>>>>>>>>>>>>>>>>>> overhead.
> > >>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             suggest to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > no
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> namespace for now
> to
> > >>>>>>> avoid
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>> overhead.
> > >>>>>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > problem in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the future, we can
> > >>>>>>> still
> > >>>>>>>> add
> > >>>>>>>>>>> name
> > >>>>>>>>>>>>>>>>> spacing
> > >>>>>>>>>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>>>>>>>>>>> on.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> My main concern
> > >>>>> about
> > >>>>>>> the
> > >>>>>>>>>> design
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> type
> > >>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             result KTable:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> If I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> understood the
> > >>>>> proposal
> > >>>>>>>>>>> correctly,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> KTable<K1,V1>
> > >>>>> table1 =
> > >>>>>>> ...
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> KTable<K2,V2>
> > >>>>> table2 =
> > >>>>>>> ...
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> KTable<K1,V3>
> > >>>>>>> joinedTable
> > >>>>>>>> =
> > >>>>>>>>>>>>>>>>>>>>>>>> table1.join(table2,...);
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> implies that the
> > >>>>>>>>> `joinedTable`
> > >>>>>>>>>>> has
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>>>>>> as the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             left input
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> table.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> IMHO, this does
> not
> > >>>>>>> work
> > >>>>>>>>>> because
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> table2
> > >>>>>>>>>>>>>>>>>>>>>>>> contains
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             multiple rows
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> join with a record
> > >>>>> in
> > >>>>>>>> table1
> > >>>>>>>>>>>> (what is
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> main
> > >>>>>>>>>>>>>>>>>>>>>>>> purpose
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > foreign
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> join), the result
> > >>>>> table
> > >>>>>>>>> would
> > >>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>> contain a
> > >>>>>>>>>>>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             join result,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > but
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> multiple.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Example:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> table1 input
> stream:
> > >>>>>>> <A,X>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> table2 input
> stream:
> > >>>>>>>>>> <a,(A,1)>,
> > >>>>>>>>>>>>>>>>> <b,(A,2)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> We use table2
> value
> > >>>>> a
> > >>>>>>>>> foreign
> > >>>>>>>>>>> key
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> table1
> > >>>>>>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>>>>>> (ie,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             "A" joins).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > If
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> result key is the
> > >>>>> same
> > >>>>>>> key
> > >>>>>>>>> as
> > >>>>>>>>>>> key
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> table1,
> > >>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             implies that the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> result can either
> be
> > >>>>>>> <A,
> > >>>>>>>>>>>> join(X,1)>
> > >>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>> <A,
> > >>>>>>>>>>>>>>>>>>>>>>>> join(X,2)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             but not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > both.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Because the share
> > >>>>> the
> > >>>>>>> same
> > >>>>>>>>>> key,
> > >>>>>>>>>>>>>>>>> whatever
> > >>>>>>>>>>>>>>>>>>>>> result
> > >>>>>>>>>>>>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             we emit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > later,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> overwrite the
> > >>>>> previous
> > >>>>>>>>> result.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> This is the reason
> > >>>>> why
> > >>>>>>> Jan
> > >>>>>>>>>>>> originally
> > >>>>>>>>>>>>>>>>>>> proposed
> > >>>>>>>>>>>>>>>>>>>>>>>> to use
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > combination
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> both primary keys
> of
> > >>>>>>> the
> > >>>>>>>>> input
> > >>>>>>>>>>>> tables
> > >>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             output table.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> This
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> makes the keys of
> > >>>>> the
> > >>>>>>>> output
> > >>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>> unique
> > >>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             store both in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> output table:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Result would be
> > >>>>> <A-a,
> > >>>>>>>>>>> join(X,1)>,
> > >>>>>>>>>>>>>> <A-b,
> > >>>>>>>>>>>>>>>>>>>>>>>> join(X,2)>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Thoughts?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> On 9/4/18 1:36 PM,
> > >>>>> Jan
> > >>>>>>>>>> Filipiak
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>> Just on remark
> here.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> The
> high-watermark
> > >>>>>>> could
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>>> disregarded.
> > >>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>> decision
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             about the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> forward
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> depends on the
> > >>>>> size of
> > >>>>>>>> the
> > >>>>>>>>>>>>>> aggregated
> > >>>>>>>>>>>>>>>>>>> map.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> Only 1 element
> long
> > >>>>>>> maps
> > >>>>>>>>>> would
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> unpacked
> > >>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             forwarded. 0
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > element
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> maps
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> would be
> published
> > >>>>> as
> > >>>>>>>>> delete.
> > >>>>>>>>>>> Any
> > >>>>>>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>> count
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> of map entries is
> > >>>>> in
> > >>>>>>>>> "waiting
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> correct
> > >>>>>>>>>>>>>>>>>>>>>>>> deletes to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > arrive"-state.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> On 04.09.2018
> > >>>>> 21:29,
> > >>>>>>> Adam
> > >>>>>>>>>>>> Bellemare
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> It does look
> like I
> > >>>>>>> could
> > >>>>>>>>>>> replace
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> second
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             repartition store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> highwater store
> > >>>>> with
> > >>>>>>> a
> > >>>>>>>>>> groupBy
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> reduce.
> > >>>>>>>>>>>>>>>>>>>>>>>> However,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             it looks
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> still need to
> > >>>>> store
> > >>>>>>> the
> > >>>>>>>>>>>> highwater
> > >>>>>>>>>>>>>>>>> value
> > >>>>>>>>>>>>>>>>>>>>> within
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             materialized
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> store,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> compare the
> > >>>>> arrival of
> > >>>>>>>>>>>> out-of-order
> > >>>>>>>>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>>>>>>> (assuming
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> understanding
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> THIS is
> > >>>>> correct...).
> > >>>>>>> This
> > >>>>>>>>> in
> > >>>>>>>>>>>> effect
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             design I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> now,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>> just with the two
> > >>>>>>> tables
> > >>>>>>>>>> merged
> > >>>>>>>>>>>>>>>>> together.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             > >>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>             >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>> --
> > >>>>> -- Guozhang
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message