kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Wed, 02 Jan 2019 20:52:17 GMT
Just pull your version through now, I think I should shut up.
Anyhow answered inline.

Hope you had good hollidays!

best Jan

On 02.01.2019 21:35, Adam Bellemare wrote:
> Hi Jan
>
> Ahh, I got it! It is deterministic once you apply the groupBy function you
> mentioned a few months ago to the output, but not before you apply it...
> correct? I was not thinking about the groupBy function.

Its also deterministic before the group by, but the key usually is not 
too usefull.

>
> Here's how I understand how it could work from an API perspective: I am
> going to use the terminology "KScatteredTable" to represent the
> intermediate table that is not yet resolved - basically the join was
> performed but no race condition handling is done.
>
> If I wanted to join three KTables together on foreign keys, one of the ways
> I could do it is:
>
> KScatteredTable scatteredOne =  ktableOne.oneToManyJoin(kTableTwo,
> joinerFuncTwo, foreignKeyExtractorTwo);
> KScatteredTable scatteredTwo = scatteredOne.oneToManyJoin(kTableThree,
> joinerFuncThree, foreignKeyExtractorThree)
>
> //Now I groupBy the key that I want to obtain, and I can resolve the out of
> order dependencies here.
> scatteredTwo.groupBy( keyValueMapper )   ( shown here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-SolutionB-User-ManagedGroupBy(Jan's)
> )
>
> Is this in line with what you're doing? Can this be done without exposing
> the CombinedKey? As you mentioned before "A Table
> KTable<CombinedKey<A,B>,JoinedResult> is not a good return type. It breaks
> the KTable invariant that a table is currently partitioned by its key".

The combined key has all sorts of issues, My implementation back then 
got stuck when trying to provide a built in Avro combined key serde 
because non from the schema regs guys could tell me how to have 2 keys 
per topic and it it was okay to come up with new topic names. So just 
ignore this and have them implement a serde on their own. Otherwise this 
whole crazy avro users want a say again. So not showing the CombinedKey 
in the DSL is a fair point. But I couldnt find a way to hide it.

> With that being said, are the only two operations that a KScatteredTable
> would need to support be oneToManyJoin and groupBy?

I think it is a full KTable, operations that require co-partitioning 
could arguably overriden to throw. But I do nothing else but group by 
and then the next one to many join.

>
> Thanks for your thoughts
>
> Adam
>
>
> On Wed, Jan 2, 2019 at 3:07 PM Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>
>> Hi Adam,
>>
>> I am kinda surprised! Yes my solution of course is correct. Don't really
>> know what to show in an example as I am convinced you grabbed the
>> concept of how mine works,
>>
>> If there is a race condition there is a race condition. It doesn't
>> matter if there is 10 minutes or milliseconds between events. Either
>> they are properly guarded or not. My solution has no such race
>> condition. It is 'eventual consistent'. You gonna see all sort of stuff
>> coming up during a reprocess.
>>
>> The user can still fk it up later though. But that is usual business.
>>
>> In reality I try to supress updates from left sides as long as possible
>> because right side updates are more expensive if left is already
>> fullish. So that limits the space a little but there are no grantees.
>> The result however, after lag is zero is the same every time.
>>
>> The trade-offs can be shifted as you like. My solution gives full power
>> to the user and only does a minimum in the framework. You push
>> everything into streams.
>>
>> If you ask me, not a good choice. Will anyone listen. No.
>> I do actually think its to late to do my way. It's not like if you
>> haven't been gone through the effort and building it.
>>
>> Just wanted to give you guys another chance, to think it through  ;)
>>
>> Regarding what will be observed. I consider it a plus that all events
>> that are in the inputs have an respective output. Whereas your solution
>> might "swallow" events.
>>
>> Best Jan
>>
>>
>> On 02.01.2019 15:30, Adam Bellemare wrote:
>>> Jan
>>>
>>> I have been thinking a lot about the history of the discussion and your
>>> original proposal, and why you believe it is a better solution. The
>> biggest
>>> problem with your original proposed design is that it seems to me to be
>>> non-deterministic. It is subject to race conditions that are dependent
>>> entirely on the data, and without resolution of these races you can end
>> up
>>> with different results each time. If I am mistaken and this is indeed
>>> deterministic, then please let me know and provide an explanation,
>> ideally
>>> with an example.
>>>
>>> The way I see it is that you will get very different answers to your
>>> non-race-condition-resolved join topology, especially if you are nesting
>> it
>>> with additional joins as you have indicated you are doing. Consider
>>> rebuilding an application state from the beginning of two topics. If the
>>> left/this side has multiple foreign-key changes in a row, spaced out
>> every
>>> ten minutes, you may see something like this:
>>>
>>> (foo, foreignKey=red) t=0
>>> (foo, foreignKey=blue) t=0+10m
>>> (foo, foreignKey=green) t=0+20m
>>> (foo, foreignKey=purple) t=0+30m
>>> (foo, foreignKey=blue) t=0+40m
>>> (foo, foreignKey=white) t=0+50m
>>>
>>> During realtime processing, all of the updates may have correctly
>>> propagated because it took less than 10 minutes to resolve each join.
>> Upon
>>> rebuilding from the start, however, all of these events would be
>> processed
>>> in quick succession. The presence or absence of data will affect the
>>> results of your join, and the results can vary with each run depending on
>>> the data. Because of this, I cannot support any kind of solution that
>> would
>>> allow the exposure of an unresolved intermediate state. I can understand
>> if
>>> you don't support this, but this is why, as you said, you have the
>> freedom
>>> to use the Processor API.
>>>
>>>
>>> With that being said, either the solution that I originally proposed
>>> (join's ocurring on the foreign node) or John + Guozhang's solution
>>> (registering with the foreign node for notifications) is fine with me -
>>> both have the same API and we can evaluate it further during
>> implementation.
>>>
>>>
>>> Thanks
>>>
>>> Adam
>>>
>>> On Thu, Dec 27, 2018 at 2:38 PM Jan Filipiak <Jan.Filipiak@trivago.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> just want to let you guys know that this thing is spiralling out of
>>>> control if you ask me.
>>>>
>>>> First you take away the possibility for the user to optimize. Now you
>>>> pile up complexity to perform some afterwards optimisation, that from my
>>>> POV completely misses the point. As if the actual call to the joiner
>>>> really gonna be an expensive part. It wont. Truth is, you don't have a
>>>> clue which side is gonna be smaller. might be the key you shuffle around
>>>> is >>> than the value on the other side already.
>>>>
>>>> You know my opinion on this. For me its dead, I just leave you the
>>>> message here as an opportunity to reconsider the choices that were made.
>>>>
>>>> Whish y'll a happy new year :)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 27.12.2018 17:22, Adam Bellemare wrote:
>>>>> Hi All
>>>>>
>>>>> Sorry for the delay - holidays and all. I have since updated the KIP
>> with
>>>>> John's original suggestion and have pruned a number of the no longer
>>>>> relevant diagrams. Any more comments would be welcomed, otherwise I
>> will
>>>>> look to kick off the vote again shortly.
>>>>>
>>>>> Thanks
>>>>> Adam
>>>>>
>>>>> On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare <
>> adam.bellemare@gmail.com
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi John and Guozhang
>>>>>>
>>>>>> Ah yes, I lost that in the mix! Thanks for the convergent solutions -
>> I
>>>> do
>>>>>> think that the attachment that John included makes for a better
>> design.
>>>> It
>>>>>> should also help with overall performance as very high-cardinality
>>>> foreign
>>>>>> keyed data (say millions of events with the same entity) will be able
>> to
>>>>>> leverage the multiple nodes for join functionality instead of having
>> it
>>>> all
>>>>>> performed in one node. There is still a bottleneck in the right table
>>>>>> having to propagate all those events, but with slimmer structures,
>> less
>>>> IO
>>>>>> and no need to perform the join I think the throughput will be much
>>>> higher
>>>>>> in those scenarios.
>>>>>>
>>>>>> Okay, I am convinced. I will update the KIP accordingly to a Gliffy
>>>>>> version of John's diagram and ensure that the example flow matches
>>>>>> correctly. Then I can go back to working on the PR to match the
>> diagram.
>>>>>>
>>>>>> Thanks both of you for all the help - very much appreciated.
>>>>>>
>>>>>> Adam
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangguoz@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> Just made a pass on your diagram (nice hand-drawing btw!), and
>>>> obviously
>>>>>>> we
>>>>>>> are thinking about the same thing :) A neat difference that I like,
>> is
>>>>>>> that
>>>>>>> in the pre-join repartition topic we can still send message in the
>>>> format
>>>>>>> of `K=k, V=(i=2)` while using "i" as the partition key in
>>>>>>> StreamsPartition,
>>>>>>> this way we do not need to even augment the key for the repartition
>>>> topic,
>>>>>>> but just do a projection on the foreign key part but trim all other
>>>>>>> fields:
>>>>>>> as long as we still materialize the store as `A-2` co-located with
>> the
>>>>>>> right KTable, that is fine.
>>>>>>>
>>>>>>> As I mentioned in my previous email, I also think this has a few
>>>>>>> advantages
>>>>>>> on saving over-the-wire bytes as well as disk bytes.
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <john@confluent.io>
>>>> wrote:
>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Thanks for taking a look! I think Adam's already addressed your
>>>>>>> questions
>>>>>>>> as well as I could have.
>>>>>>>>
>>>>>>>> Hi Adam,
>>>>>>>>
>>>>>>>> Thanks for updating the KIP. It looks great, especially how all the
>>>>>>>> need-to-know information is right at the top, followed by the
>> details.
>>>>>>>>
>>>>>>>> Also, thanks for that high-level diagram. Actually, now that I'm
>>>> looking
>>>>>>>> at it, I think part of my proposal got lost in translation,
>> although I
>>>>>>> do
>>>>>>>> think that what you have there is also correct.
>>>>>>>>
>>>>>>>> I sketched up a crude diagram based on yours and attached it to the
>>>> KIP
>>>>>>>> (I'm not sure if attached or inline images work on the mailing
>> list):
>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
>>>>>>>> . It's also attached to this email for convenience.
>>>>>>>>
>>>>>>>> Hopefully, you can see how it's intended to line up, and which parts
>>>> are
>>>>>>>> modified.
>>>>>>>> At a high level, instead of performing the join on the right-hand
>>>> side,
>>>>>>>> we're essentially just registering interest, like "LHS key A wishes
>> to
>>>>>>>> receive updates for RHS key 2". Then, when there is a new "interest"
>>>> or
>>>>>>> any
>>>>>>>> updates to the RHS records, it "broadcasts" its state back to the
>> LHS
>>>>>>>> records who are interested in it.
>>>>>>>>
>>>>>>>> Thus, instead of sending the LHS values to the RHS joiner workers
>> and
>>>>>>> then
>>>>>>>> sending the join results back to the LHS worke be co-partitioned and
>>>>>>>> validated, we instead only send the LHS *keys* to the RHS workers
>> and
>>>>>>> then
>>>>>>>> only the RHS k/v back to be joined by the LHS worker.
>>>>>>>>
>>>>>>>> I've been considering both your diagram and mine, and I *think* what
>>>> I'm
>>>>>>>> proposing has a few advantages.
>>>>>>>>
>>>>>>>> Here are some points of interest as you look at the diagram:
>>>>>>>> * When we extract the foreign key and send it to the Pre-Join
>>>>>>> Repartition
>>>>>>>> Topic, we can send only the FK/PK pair. There's no need to worry
>> about
>>>>>>>> custom partitioner logic, since we can just use the foreign key
>>>> plainly
>>>>>>> as
>>>>>>>> the repartition record key. Also, we save on transmitting the LHS
>>>> value,
>>>>>>>> since we only send its key in this step.
>>>>>>>> * We also only need to store the RHSKey:LHSKey mapping in the
>>>>>>>> MaterializedSubscriptionStore, saving on disk. We can use the same
>>>> rocks
>>>>>>>> key format you proposed and the same algorithm involving range scans
>>>>>>> when
>>>>>>>> the RHS records get updated.
>>>>>>>> * Instead of joining on the right side, all we do is compose a
>>>>>>>> re-repartition record so we can broadcast the RHS k/v pair back to
>> the
>>>>>>>> original LHS partition. (this is what the "rekey" node is doing)
>>>>>>>> * Then, there is a special kind of Joiner that's co-resident in the
>>>> same
>>>>>>>> StreamTask as the LHS table, subscribed to the Post-Join Repartition
>>>>>>> Topic.
>>>>>>>> ** This Joiner is *not* triggered directly by any changes in the LHS
>>>>>>>> KTable. Instead, LHS events indirectly trigger the join via the
>> whole
>>>>>>>> lifecycle.
>>>>>>>> ** For each event arriving from the Post-Join Repartition Topic, the
>>>>>>>> Joiner looks up the corresponding record in the LHS KTable. It
>>>> validates
>>>>>>>> the FK as you noted, discarding any inconsistent events. Otherwise,
>> it
>>>>>>>> unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the
>>>> join
>>>>>>>> result
>>>>>>>> ** Note that the Joiner itself is stateless, so materializing the
>> join
>>>>>>>> result is optional, just as with the 1:1 joins.
>>>>>>>>
>>>>>>>> So in summary:
>>>>>>>> * instead of transmitting the LHS keys and values to the right and
>> the
>>>>>>>> JoinResult back to the left, we only transmit the LHS keys to the
>>>> right
>>>>>>> and
>>>>>>>> the RHS values to the left. Assuming the average RHS value is on
>>>> smaller
>>>>>>>> than or equal to the average join result size, it's a clear win on
>>>>>>> broker
>>>>>>>> traffic. I think this is actually a reasonable assumption, which we
>>>> can
>>>>>>>> discuss more if you're suspicious.
>>>>>>>> * we only need one copy of the data (the left and right tables need
>> to
>>>>>>> be
>>>>>>>> materialized) and one extra copy of the PK:FK pairs in the
>>>> Materialized
>>>>>>>> Subscription Store. Materializing the join result is optional, just
>> as
>>>>>>> with
>>>>>>>> the existing 1:1 joins.
>>>>>>>> * we still need the fancy range-scan algorithm on the right to
>> locate
>>>>>>> all
>>>>>>>> interested LHS keys when a RHS value is updated, but we don't need a
>>>>>>> custom
>>>>>>>> partitioner for either repartition topic (this is of course a
>>>>>>> modification
>>>>>>>> we could make to your version as well)
>>>>>>>>
>>>>>>>> How does this sound to you? (And did I miss anything?)
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <
>>>>>>> adam.bellemare@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi John & Guozhang
>>>>>>>>>
>>>>>>>>> @John & @Guozhang Wang <wangguoz@gmail.com> - I have cleaned up
>> the
>>>>>>> KIP,
>>>>>>>>> pruned much of what I wrote and put a simplified diagram near the
>> top
>>>>>>> to
>>>>>>>>> illustrate the workflow. I encapsulated Jan's content at the bottom
>>>> of
>>>>>>> the
>>>>>>>>> document. I believe it is simpler to read by far now.
>>>>>>>>>
>>>>>>>>> @Guozhang Wang <wangguoz@gmail.com>:
>>>>>>>>>> #1: rekey left table
>>>>>>>>>>      -> source from the left upstream, send to rekey-processor to
>>>>>>> generate
>>>>>>>>> combined key, and then sink to copartition topic.
>>>>>>>>> Correct.
>>>>>>>>>
>>>>>>>>>> #2: first-join with right table
>>>>>>>>>>      -> source from the right table upstream, materialize the right
>>>>>>> table.
>>>>>>>>>>      -> source from the co-partition topic, materialize the rekeyed
>>>> left
>>>>>>>>> table, join with the right table, rekey back, and then sink to the
>>>>>>>>> rekeyed-back topic.
>>>>>>>>> Almost - I cleared up the KIP. We do not rekey back yet, as I need
>>>> the
>>>>>>>>> Foreign-Key value generated in #1 above to compare in the
>> resolution
>>>>>>>>> stage.
>>>>>>>>>
>>>>>>>>>> #3: second join
>>>>>>>>>>       -> source from the rekeyed-back topic, materialize the
>> rekeyed
>>>>>>> back
>>>>>>>>> table.
>>>>>>>>>>      -> source from the left upstream, materialize the left table,
>>>> join
>>>>>>>>> with
>>>>>>>>> the rekeyed back table.
>>>>>>>>> Almost - As each event comes in, we just run it through a stateful
>>>>>>>>> processor that checks the original ("This") KTable for the key. The
>>>>>>> value
>>>>>>>>> payload then has the foreignKeyExtractor applied again as in Part
>> #1
>>>>>>>>> above,
>>>>>>>>> and gets the current foreign key. Then we compare it to the joined
>>>>>>> event
>>>>>>>>> that we are currently resolving. If they have the same foreign-key,
>>>>>>>>> propagate the result out. If they don't, throw the event away.
>>>>>>>>>
>>>>>>>>> The end result is that we do need to materialize 2 additional
>> tables
>>>>>>>>> (left/this-combinedkey table, and the final Joined table) as I've
>>>>>>>>> illustrated in the updated KIP. I hope the diagram clears it up a
>> lot
>>>>>>>>> better. Please let me know.
>>>>>>>>>
>>>>>>>>> Thanks again
>>>>>>>>> Adam
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangguoz@gmail.com>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> John,
>>>>>>>>>>
>>>>>>>>>> Thanks a lot for the suggestions on refactoring the wiki, I agree
>>>>>>> with
>>>>>>>>> you
>>>>>>>>>> that we should consider the KIP proposal to be easily understood
>> by
>>>>>>>>> anyone
>>>>>>>>>> in the future to read, and hence should provide a good summary on
>>>> the
>>>>>>>>>> user-facing interfaces, as well as rejected alternatives to
>>>> represent
>>>>>>>>>> briefly "how we came a long way to this conclusion, and what we
>> have
>>>>>>>>>> argued, disagreed, and agreed about, etc" so that readers do not
>>>>>>> need to
>>>>>>>>>> dig into the DISCUSS thread to get all the details. We can, of
>>>>>>> course,
>>>>>>>>> keep
>>>>>>>>>> the implementation details like "workflows" on the wiki page as a
>>>>>>>>> addendum
>>>>>>>>>> section since it also has correlations.
>>>>>>>>>>
>>>>>>>>>> Regarding your proposal on comment 6): that's a very interesting
>>>>>>> idea!
>>>>>>>>> Just
>>>>>>>>>> to clarify that I understands it fully correctly: the proposal's
>>>>>>>>> resulted
>>>>>>>>>> topology is still the same as the current proposal, where we will
>>>>>>> have 3
>>>>>>>>>> sub-topologies for this operator:
>>>>>>>>>>
>>>>>>>>>> #1: rekey left table
>>>>>>>>>>       -> source from the left upstream, send to rekey-processor to
>>>>>>> generate
>>>>>>>>>> combined key, and then sink to copartition topic.
>>>>>>>>>>
>>>>>>>>>> #2: first-join with right table
>>>>>>>>>>       -> source from the right table upstream, materialize the
>> right
>>>>>>> table.
>>>>>>>>>>       -> source from the co-partition topic, materialize the
>> rekeyed
>>>>>>> left
>>>>>>>>>> table, join with the right table, rekey back, and then sink to the
>>>>>>>>>> rekeyed-back topic.
>>>>>>>>>>
>>>>>>>>>> #3: second join
>>>>>>>>>>       -> source from the rekeyed-back topic, materialize the
>> rekeyed
>>>>>>> back
>>>>>>>>>> table.
>>>>>>>>>>       -> source from the left upstream, materialize the left table,
>>>> join
>>>>>>>>> with
>>>>>>>>>> the rekeyed back table.
>>>>>>>>>>
>>>>>>>>>> Sub-topology #1 and #3 may be merged to a single sub-topology
>> since
>>>>>>>>> both of
>>>>>>>>>> them read from the left table source stream. In this workflow, we
>>>>>>> need
>>>>>>>>> to
>>>>>>>>>> materialize 4 tables (left table in #3, right table in #2, rekeyed
>>>>>>> left
>>>>>>>>>> table in #2, rekeyed-back table in #3), and 2 repartition topics
>>>>>>>>>> (copartition topic, rekeyed-back topic).
>>>>>>>>>>
>>>>>>>>>> Compared with Adam's current proposal in the workflow overview, it
>>>>>>> has
>>>>>>>>> the
>>>>>>>>>> same num.materialize tables (left table, rekeyed left table, right
>>>>>>>>> table,
>>>>>>>>>> out-of-ordering resolver table), and same num.internal topics
>> (two).
>>>>>>> The
>>>>>>>>>> advantage is that on the copartition topic, we can save bandwidth
>> by
>>>>>>> not
>>>>>>>>>> sending value, and in #2 the rekeyed left table is smaller since
>> we
>>>>>>> do
>>>>>>>>> not
>>>>>>>>>> have any values to materialize. Is that right?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 12, 2018 at 1:22 PM John Roesler <john@confluent.io>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Adam,
>>>>>>>>>>>
>>>>>>>>>>> Given that the committers are all pretty busy right now, I think
>>>>>>> that
>>>>>>>>> it
>>>>>>>>>>> would help if you were to refactor the KIP a little to reduce the
>>>>>>>>>> workload
>>>>>>>>>>> for reviewers.
>>>>>>>>>>>
>>>>>>>>>>> I'd recommend the following changes:
>>>>>>>>>>> * relocate all internal details to a section at the end called
>>>>>>>>> something
>>>>>>>>>>> like "Implementation Notes" or something like that.
>>>>>>>>>>> * rewrite the rest of the KIP to be a succinct as possible and
>>>>>>> mention
>>>>>>>>>> only
>>>>>>>>>>> publicly-facing API changes.
>>>>>>>>>>> ** for example, the interface that you've already listed there,
>> as
>>>>>>>>> well
>>>>>>>>>> as
>>>>>>>>>>> a textual description of the guarantees we'll be providing (join
>>>>>>>>> result
>>>>>>>>>> is
>>>>>>>>>>> copartitioned with the LHS, and the join result is guaranteed
>>>>>>> correct)
>>>>>>>>>>>
>>>>>>>>>>> A good target would be that the whole main body of the KIP,
>>>>>>> including
>>>>>>>>>>> Status, Motivation, Proposal, Justification, and Rejected
>>>>>>> Alternatives
>>>>>>>>>> all
>>>>>>>>>>> fit "above the fold" (i.e., all fit on the screen at a
>> comfortable
>>>>>>>>> zoom
>>>>>>>>>>> level).
>>>>>>>>>>> I think the only real Rejected Alternative that bears mention at
>>>>>>> this
>>>>>>>>>> point
>>>>>>>>>>> is KScatteredTable, which you could just include the executive
>>>>>>>>> summary on
>>>>>>>>>>> (no implementation details), and link to extra details in the
>>>>>>>>>>> Implementation Notes section.
>>>>>>>>>>>
>>>>>>>>>>> Taking a look at the wiki page, ~90% of the text there is
>> internal
>>>>>>>>>> detail,
>>>>>>>>>>> which is useful for the dubious, but doesn't need to be ratified
>>>>>>> in a
>>>>>>>>>> vote
>>>>>>>>>>> (and would be subject to change without notice in the future
>>>>>>> anyway).
>>>>>>>>>>> There's also a lot of conflicting discussion, as you've very
>>>>>>>>> respectfully
>>>>>>>>>>> tried to preserve the original proposal from Jan while adding
>> your
>>>>>>>>> own.
>>>>>>>>>>> Isolating all this information in a dedicated section at the
>> bottom
>>>>>>>>> frees
>>>>>>>>>>> the voters up to focus on the public API part of the proposal,
>>>>>>> which
>>>>>>>>> is
>>>>>>>>>>> really all they need to consider.
>>>>>>>>>>>
>>>>>>>>>>> Plus, it'll be clear to future readers which parts of the
>> document
>>>>>>> are
>>>>>>>>>>> enduring, and which parts are a snapshot of our implementation
>>>>>>>>> thinking
>>>>>>>>>> at
>>>>>>>>>>> the time.
>>>>>>>>>>>
>>>>>>>>>>> I'm suggesting this because I suspect that the others haven't
>> made
>>>>>>>>> time
>>>>>>>>>> to
>>>>>>>>>>> review it partly because it seems daunting. If it seems like it
>>>>>>> would
>>>>>>>>> be
>>>>>>>>>> a
>>>>>>>>>>> huge time investment to review, people will just keep putting it
>>>>>>> off.
>>>>>>>>> But
>>>>>>>>>>> if the KIP is a single page, then they'll be more inclined to
>> give
>>>>>>> it
>>>>>>>>> a
>>>>>>>>>>> read.
>>>>>>>>>>>
>>>>>>>>>>> Honestly, I don't think the KIP itself is that controversial
>> (apart
>>>>>>>>> from
>>>>>>>>>>> the scattered table thing (sorry, Jan) ). Most of the discussion
>>>>>>> has
>>>>>>>>> been
>>>>>>>>>>> around the implementation, which we can continue more effectively
>>>>>>> in
>>>>>>>>> a PR
>>>>>>>>>>> once the KIP has passed.
>>>>>>>>>>>
>>>>>>>>>>> How does that sound?
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
>>>>>>>>> adam.bellemare@gmail.com
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> 1) I believe that the resolution mechanism John has proposed is
>>>>>>>>>>> sufficient
>>>>>>>>>>>> - it is clean and easy and doesn't require additional RocksDB
>>>>>>>>> stores,
>>>>>>>>>>> which
>>>>>>>>>>>> reduces the footprint greatly. I don't think we need to resolve
>>>>>>>>> based
>>>>>>>>>> on
>>>>>>>>>>>> timestamp or offset anymore, but if we decide to do to that
>>>>>>> would be
>>>>>>>>>>> within
>>>>>>>>>>>> the bounds of the existing API.
>>>>>>>>>>>>
>>>>>>>>>>>> 2) Is the current API sufficient, or does it need to be altered
>>>>>>> to
>>>>>>>>> go
>>>>>>>>>>> back
>>>>>>>>>>>> to vote?
>>>>>>>>>>>>
>>>>>>>>>>>> 3) KScatteredTable implementation can always be added in a
>> future
>>>>>>>>>>> revision.
>>>>>>>>>>>> This API does not rule it out. This implementation of this
>>>>>>> function
>>>>>>>>>> would
>>>>>>>>>>>> simply be replaced with `KScatteredTable.resolve()` while still
>>>>>>>>>>> maintaining
>>>>>>>>>>>> the existing API, thereby giving both features as Jan outlined
>>>>>>>>> earlier.
>>>>>>>>>>>> Would this work?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks Guozhang, John and Jan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 10, 2018 at 10:39 AM John Roesler <
>> john@confluent.io
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, all,
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In fact, we
>>>>>>>>>>>>>>> can just keep a single final-result store with timestamps
>>>>>>> and
>>>>>>>>>> reject
>>>>>>>>>>>>> values
>>>>>>>>>>>>>>> that have a smaller timestamp, is that right?
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which is the correct output should at least be decided on the
>>>>>>>>>> offset
>>>>>>>>>>> of
>>>>>>>>>>>>>> the original message.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for this point, Jan.
>>>>>>>>>>>>>
>>>>>>>>>>>>> KIP-258 is merely to allow embedding the record timestamp  in
>>>>>>> the
>>>>>>>>> k/v
>>>>>>>>>>>>> store,
>>>>>>>>>>>>> as well as providing a storage-format upgrade path.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I might have missed it, but I think we have yet to discuss
>>>>>>> whether
>>>>>>>>>> it's
>>>>>>>>>>>>> safe
>>>>>>>>>>>>> or desirable just to swap topic-ordering our for
>>>>>>>>> timestamp-ordering.
>>>>>>>>>>> This
>>>>>>>>>>>>> is
>>>>>>>>>>>>> a very deep topic, and I think it would only pollute the
>>>>>>> current
>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What Adam has proposed is safe, given the *current* ordering
>>>>>>>>>> semantics
>>>>>>>>>>>>> of the system. If we can agree on his proposal, I think we can
>>>>>>>>> merge
>>>>>>>>>>> the
>>>>>>>>>>>>> feature well before the conversation about timestamp ordering
>>>>>>> even
>>>>>>>>>>> takes
>>>>>>>>>>>>> place, much less reaches a conclusion. In the mean time, it
>>>>>>> would
>>>>>>>>>> seem
>>>>>>>>>>> to
>>>>>>>>>>>>> be unfortunate to have one join operator with different
>>>>>>> ordering
>>>>>>>>>>>> semantics
>>>>>>>>>>>>> from every other KTable operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If and when that timestamp discussion takes place, many (all?)
>>>>>>>>> KTable
>>>>>>>>>>>>> operations
>>>>>>>>>>>>> will need to be updated, rendering the many:one join a small
>>>>>>>>> marginal
>>>>>>>>>>>> cost.
>>>>>>>>>>>>>
>>>>>>>>>>>>> And, just to plug it again, I proposed an algorithm above that
>>>>>>> I
>>>>>>>>>>> believe
>>>>>>>>>>>>> provides
>>>>>>>>>>>>> correct ordering without any additional metadata, and
>>>>>>> regardless
>>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>> ordering semantics. I didn't bring it up further, because I
>>>>>>> felt
>>>>>>>>> the
>>>>>>>>>>> KIP
>>>>>>>>>>>>> only needs
>>>>>>>>>>>>> to agree on the public API, and we can discuss the
>>>>>>> implementation
>>>>>>>>> at
>>>>>>>>>>>>> leisure in
>>>>>>>>>>>>> a PR...
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 10.12.2018 07:42, Guozhang Wang wrote:
>>>>>>>>>>>>>>> Hello Adam / Jan / John,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sorry for being late on this thread! I've finally got some
>>>>>>>>> time
>>>>>>>>>>> this
>>>>>>>>>>>>>>> weekend to cleanup a load of tasks on my queue (actually
>>>>>>> I've
>>>>>>>>>> also
>>>>>>>>>>>>>> realized
>>>>>>>>>>>>>>> there are a bunch of other things I need to enqueue while
>>>>>>>>>> cleaning
>>>>>>>>>>>> them
>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>> --- sth I need to improve on my side). So here are my
>>>>>>>>> thoughts:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the APIs: I like the current written API in the
>>>>>>> KIP.
>>>>>>>>>> More
>>>>>>>>>>>>>>> generally I'd prefer to keep the 1) one-to-many join
>>>>>>>>>>> functionalities
>>>>>>>>>>>> as
>>>>>>>>>>>>>>> well as 2) other join types than inner as separate KIPs
>>>>>>> since
>>>>>>>>> 1)
>>>>>>>>>>> may
>>>>>>>>>>>>>> worth
>>>>>>>>>>>>>>> a general API refactoring that can benefit not only
>>>>>>> foreignkey
>>>>>>>>>>> joins
>>>>>>>>>>>>> but
>>>>>>>>>>>>>>> collocate joins as well (e.g. an extended proposal of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
>>>>>>>>>>>>>> ),
>>>>>>>>>>>>>>> and I'm not sure if other join types would actually be
>>>>>>> needed
>>>>>>>>>>> (maybe
>>>>>>>>>>>>> left
>>>>>>>>>>>>>>> join still makes sense), so it's better to
>>>>>>>>>>>>> wait-for-people-to-ask-and-add
>>>>>>>>>>>>>>> than add-sth-that-no-one-uses.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding whether we enforce step 3) / 4) v.s. introducing
>>>>>>> a
>>>>>>>>>>>>>>> KScatteredTable for users to inject their own optimization:
>>>>>>>>> I'd
>>>>>>>>>>>> prefer
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> do the current option as-is, and my main rationale is for
>>>>>>>>>>>> optimization
>>>>>>>>>>>>>>> rooms inside the Streams internals and the API
>>>>>>> succinctness.
>>>>>>>>> For
>>>>>>>>>>>>> advanced
>>>>>>>>>>>>>>> users who may indeed prefer KScatteredTable and do their
>>>>>>> own
>>>>>>>>>>>>>> optimization,
>>>>>>>>>>>>>>> while it is too much of the work to use Processor API
>>>>>>>>> directly, I
>>>>>>>>>>>> think
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> can still extend the current API to support it in the
>>>>>>> future
>>>>>>>>> if
>>>>>>>>>> it
>>>>>>>>>>>>>> becomes
>>>>>>>>>>>>>>> necessary.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> no internal optimization potential. it's a myth
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ¯\_(ツ)_/¯
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> :-)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Another note about step 4) resolving out-of-ordering data,
>>>>>>> as
>>>>>>>>> I
>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>> before I think with KIP-258 (embedded timestamp with
>>>>>>> key-value
>>>>>>>>>>> store)
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> can actually make this step simpler than the current
>>>>>>>>> proposal. In
>>>>>>>>>>>> fact,
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> can just keep a single final-result store with timestamps
>>>>>>> and
>>>>>>>>>>> reject
>>>>>>>>>>>>>> values
>>>>>>>>>>>>>>> that have a smaller timestamp, is that right?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which is the correct output should at least be decided on the
>>>>>>>>>> offset
>>>>>>>>>>> of
>>>>>>>>>>>>>> the original message.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That's all I have in mind now. Again, great appreciation to
>>>>>>>>> Adam
>>>>>>>>>> to
>>>>>>>>>>>>> make
>>>>>>>>>>>>>>> such HUGE progress on this KIP!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
>>>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If they don't find the time:
>>>>>>>>>>>>>>>> They usually take the opposite path from me :D
>>>>>>>>>>>>>>>> so the answer would be clear.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> hence my suggestion to vote.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 04.12.2018 21:06, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>> Hi Guozhang and Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I know both of you are quite busy, but we've gotten this
>>>>>>> KIP
>>>>>>>>>> to a
>>>>>>>>>>>>> point
>>>>>>>>>>>>>>>>> where we need more guidance on the API (perhaps a bit of
>>>>>>> a
>>>>>>>>>>>>> tie-breaker,
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> you will). If you have anyone else you may think should
>>>>>>>>> look at
>>>>>>>>>>>> this,
>>>>>>>>>>>>>>>>> please tag them accordingly.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The scenario is as such:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Current Option:
>>>>>>>>>>>>>>>>> API:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
>>>>>>>>>>>>>>>>> 1) Rekey the data to CombinedKey, and shuffles it to the
>>>>>>>>>>> partition
>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> foreignKey (repartition 1)
>>>>>>>>>>>>>>>>> 2) Join the data
>>>>>>>>>>>>>>>>> 3) Shuffle the data back to the original node
>>>>>>> (repartition
>>>>>>>>> 2)
>>>>>>>>>>>>>>>>> 4) Resolve out-of-order arrival / race condition due to
>>>>>>>>>>> foreign-key
>>>>>>>>>>>>>>>> changes.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Alternate Option:
>>>>>>>>>>>>>>>>> Perform #1 and #2 above, and return a KScatteredTable.
>>>>>>>>>>>>>>>>> - It would be keyed on a wrapped key function:
>>>>>>>>> <CombinedKey<KO,
>>>>>>>>>>> K>,
>>>>>>>>>>>>> VR>
>>>>>>>>>>>>>>>> (KO
>>>>>>>>>>>>>>>>> = Other Table Key, K = This Table Key, VR = Joined
>>>>>>> Result)
>>>>>>>>>>>>>>>>> - KScatteredTable.resolve() would perform #3 and #4 but
>>>>>>>>>>> otherwise a
>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>> would be able to perform additional functions directly
>>>>>>> from
>>>>>>>>> the
>>>>>>>>>>>>>>>>> KScatteredTable (TBD - currently out of scope).
>>>>>>>>>>>>>>>>> - John's analysis 2-emails up is accurate as to the
>>>>>>>>> tradeoffs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Current Option is coded as-is. Alternate option is
>>>>>>> possible,
>>>>>>>>>> but
>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> require for implementation details to be made in the API
>>>>>>> and
>>>>>>>>>> some
>>>>>>>>>>>>>>>> exposure
>>>>>>>>>>>>>>>>> of new data structures into the API (ie: CombinedKey).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I appreciate any insight into this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
>>>>>>>>>>>>>> adam.bellemare@gmail.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi John
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for your feedback and assistance. I think your
>>>>>>>>> summary
>>>>>>>>>> is
>>>>>>>>>>>>>>>> accurate
>>>>>>>>>>>>>>>>>> from my perspective. Additionally, I would like to add
>>>>>>> that
>>>>>>>>>>> there
>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>> risk
>>>>>>>>>>>>>>>>>> of inconsistent final states without performing the
>>>>>>>>>> resolution.
>>>>>>>>>>>> This
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> major concern for me as most of the data I have dealt
>>>>>>> with
>>>>>>>>> is
>>>>>>>>>>>>> produced
>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>> relational databases. We have seen a number of cases
>>>>>>> where
>>>>>>>>> a
>>>>>>>>>>> user
>>>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> Rails UI has modified the field (foreign key), realized
>>>>>>>>> they
>>>>>>>>>>> made
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> mistake, and then updated the field again with a new
>>>>>>> key.
>>>>>>>>> The
>>>>>>>>>>>> events
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> propagated out as they are produced, and as such we have
>>>>>>>>> had
>>>>>>>>>>>>>> real-world
>>>>>>>>>>>>>>>>>> cases where these inconsistencies were propagated
>>>>>>>>> downstream
>>>>>>>>>> as
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> final
>>>>>>>>>>>>>>>>>> values due to the race conditions in the fanout of the
>>>>>>>>> data.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This solution that I propose values correctness of the
>>>>>>>>> final
>>>>>>>>>>>> result
>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>>>>> other factors.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We could always move this function over to using a
>>>>>>>>>>> KScatteredTable
>>>>>>>>>>>>>>>>>> implementation in the future, and simply deprecate it
>>>>>>> this
>>>>>>>>>> join
>>>>>>>>>>>> API
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> time. I think I would like to hear more from some of the
>>>>>>>>> other
>>>>>>>>>>>> major
>>>>>>>>>>>>>>>>>> committers on which course of action they would think is
>>>>>>>>> best
>>>>>>>>>>>> before
>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>> more coding is done.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks again
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
>>>>>>>>>> john@confluent.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Jan and Adam,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Wow, thanks for doing that test, Adam. Those results
>>>>>>> are
>>>>>>>>>>>>> encouraging.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your performance experience as well, Jan. I
>>>>>>>>> agree
>>>>>>>>>>> that
>>>>>>>>>>>>>>>> avoiding
>>>>>>>>>>>>>>>>>>> unnecessary join outputs is especially important when
>>>>>>> the
>>>>>>>>>>> fan-out
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>> high. I suppose this could also be built into the
>>>>>>>>>>> implementation
>>>>>>>>>>>>>> we're
>>>>>>>>>>>>>>>>>>> discussing, but it wouldn't have to be specified in the
>>>>>>>>> KIP
>>>>>>>>>>>> (since
>>>>>>>>>>>>>>>> it's an
>>>>>>>>>>>>>>>>>>> API-transparent optimization).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As far as whether or not to re-repartition the data, I
>>>>>>>>> didn't
>>>>>>>>>>>> bring
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>> because it sounded like the two of you agreed to leave
>>>>>>> the
>>>>>>>>>> KIP
>>>>>>>>>>>>> as-is,
>>>>>>>>>>>>>>>>>>> despite the disagreement.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If you want my opinion, I feel like both approaches are
>>>>>>>>>>>> reasonable.
>>>>>>>>>>>>>>>>>>> It sounds like Jan values more the potential for
>>>>>>>>> developers
>>>>>>>>>> to
>>>>>>>>>>>>>> optimize
>>>>>>>>>>>>>>>>>>> their topologies to re-use the intermediate nodes,
>>>>>>> whereas
>>>>>>>>>> Adam
>>>>>>>>>>>>>> places
>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> value on having a single operator that people can use
>>>>>>>>> without
>>>>>>>>>>>> extra
>>>>>>>>>>>>>>>> steps
>>>>>>>>>>>>>>>>>>> at the end.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Personally, although I do find it exceptionally
>>>>>>> annoying
>>>>>>>>>> when a
>>>>>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>>>> gets in my way when I'm trying to optimize something,
>>>>>>> it
>>>>>>>>>> seems
>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> go
>>>>>>>>>>>>>>>>>>> for a single operation.
>>>>>>>>>>>>>>>>>>> * Encapsulating the internal transitions gives us
>>>>>>>>> significant
>>>>>>>>>>>>>> latitude
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> the implementation (for example, joining only at the
>>>>>>> end,
>>>>>>>>> not
>>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> middle
>>>>>>>>>>>>>>>>>>> to avoid extra data copying and out-of-order
>>>>>>> resolution;
>>>>>>>>> how
>>>>>>>>>> we
>>>>>>>>>>>>>>>> represent
>>>>>>>>>>>>>>>>>>> the first repartition keys (combined keys vs. value
>>>>>>>>> vectors),
>>>>>>>>>>>>> etc.).
>>>>>>>>>>>>>>>> If we
>>>>>>>>>>>>>>>>>>> publish something like a KScatteredTable with the
>>>>>>>>>>>> right-partitioned
>>>>>>>>>>>>>>>> joined
>>>>>>>>>>>>>>>>>>> data, then the API pretty much locks in the
>>>>>>>>> implementation as
>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>> * The API seems simpler to understand and use. I do
>>>>>>> mean
>>>>>>>>>>> "seems";
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> anyone
>>>>>>>>>>>>>>>>>>> wants to make the case that KScatteredTable is actually
>>>>>>>>>>> simpler,
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>> hypothetical usage code would help. From a relational
>>>>>>>>> algebra
>>>>>>>>>>>>>>>> perspective,
>>>>>>>>>>>>>>>>>>> it seems like KTable.join(KTable) should produce a new
>>>>>>>>> KTable
>>>>>>>>>>> in
>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>> cases.
>>>>>>>>>>>>>>>>>>> * That said, there might still be room in the API for a
>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>> operation
>>>>>>>>>>>>>>>>>>> like what Jan has proposed to scatter a KTable, and
>>>>>>> then
>>>>>>>>> do
>>>>>>>>>>>> things
>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>> join, re-group, etc from there... I'm not sure; I
>>>>>>> haven't
>>>>>>>>>>> thought
>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>> all the consequences yet.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is all just my opinion after thinking over the
>>>>>>>>>> discussion
>>>>>>>>>>> so
>>>>>>>>>>>>>>>> far...
>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
>>>>>>>>>>>>>>>> adam.bellemare@gmail.com>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Updated the PR to take into account John's feedback.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I did some preliminary testing for the performance of
>>>>>>> the
>>>>>>>>>>>>>> prefixScan.
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> have attached the file, but I will also include the
>>>>>>> text
>>>>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>>>> body
>>>>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>>> for archival purposes (I am not sure what happens to
>>>>>>>>>> attached
>>>>>>>>>>>>>> files).
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> also updated the PR and the KIP accordingly.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Summary: It scales exceptionally well for scanning
>>>>>>> large
>>>>>>>>>>> values
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> records. As Jan mentioned previously, the real issue
>>>>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>> processing the resulting records after obtaining them.
>>>>>>>>> For
>>>>>>>>>>>>> instance,
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> takes approximately ~80-120 mS to flush the buffer
>>>>>>> and a
>>>>>>>>>>> further
>>>>>>>>>>>>>>>>>>> ~35-85mS
>>>>>>>>>>>>>>>>>>>> to scan 27.5M records, obtaining matches for 2.5M of
>>>>>>>>> them.
>>>>>>>>>>>>> Iterating
>>>>>>>>>>>>>>>>>>>> through the records just to generate a simple count
>>>>>>>>> takes ~
>>>>>>>>>> 40
>>>>>>>>>>>>> times
>>>>>>>>>>>>>>>>>>> longer
>>>>>>>>>>>>>>>>>>>> than the flush + scan combined.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> ============================================================================================
>>>>>>>>>>>>>>>>>>>> Setup:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> ============================================================================================
>>>>>>>>>>>>>>>>>>>> Java 9 with default settings aside from a 512 MB heap
>>>>>>>>>>> (Xmx512m,
>>>>>>>>>>>>>>>> Xms512m)
>>>>>>>>>>>>>>>>>>>> CPU: i7 2.2 Ghz.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Note: I am using a slightly-modified,
>>>>>>> directly-accessible
>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>> RocksDB
>>>>>>>>>>>>>>>>>>>> implementation (RocksDB.java, basically just avoiding
>>>>>>> the
>>>>>>>>>>>>>>>>>>>> ProcessorContext).
>>>>>>>>>>>>>>>>>>>> There are no modifications to the default RocksDB
>>>>>>> values
>>>>>>>>>>>> provided
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> 2.1/trunk release.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> keysize = 128 bytes
>>>>>>>>>>>>>>>>>>>> valsize = 512 bytes
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Step 1:
>>>>>>>>>>>>>>>>>>>> Write X positive matching events: (key = prefix +
>>>>>>>>>> left-padded
>>>>>>>>>>>>>>>>>>>> auto-incrementing integer)
>>>>>>>>>>>>>>>>>>>> Step 2:
>>>>>>>>>>>>>>>>>>>> Write 10X negative matching events (key = left-padded
>>>>>>>>>>>>>>>> auto-incrementing
>>>>>>>>>>>>>>>>>>>> integer)
>>>>>>>>>>>>>>>>>>>> Step 3:
>>>>>>>>>>>>>>>>>>>> Perform flush
>>>>>>>>>>>>>>>>>>>> Step 4:
>>>>>>>>>>>>>>>>>>>> Perform prefixScan
>>>>>>>>>>>>>>>>>>>> Step 5:
>>>>>>>>>>>>>>>>>>>> Iterate through return Iterator and validate the
>>>>>>> count of
>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>>>>> events.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> ============================================================================================
>>>>>>>>>>>>>>>>>>>> Results:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> ============================================================================================
>>>>>>>>>>>>>>>>>>>> X = 1k (11k events total)
>>>>>>>>>>>>>>>>>>>> Flush Time = 39 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 7 mS
>>>>>>>>>>>>>>>>>>>> 6.9 MB disk
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>> X = 10k (110k events total)
>>>>>>>>>>>>>>>>>>>> Flush Time = 45 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 8 mS
>>>>>>>>>>>>>>>>>>>> 127 MB
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>> X = 100k (1.1M events total)
>>>>>>>>>>>>>>>>>>>> Test1:
>>>>>>>>>>>>>>>>>>>> Flush Time = 60 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 12 mS
>>>>>>>>>>>>>>>>>>>> 678 MB
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Test2:
>>>>>>>>>>>>>>>>>>>> Flush Time = 45 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 7 mS
>>>>>>>>>>>>>>>>>>>> 576 MB
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>> X = 1MB (11M events total)
>>>>>>>>>>>>>>>>>>>> Test1:
>>>>>>>>>>>>>>>>>>>> Flush Time = 52 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 19 mS
>>>>>>>>>>>>>>>>>>>> 7.2 GB
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Test2:
>>>>>>>>>>>>>>>>>>>> Flush Time = 84 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 34 mS
>>>>>>>>>>>>>>>>>>>> 9.1 GB
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>> X = 2.5M (27.5M events total)
>>>>>>>>>>>>>>>>>>>> Test1:
>>>>>>>>>>>>>>>>>>>> Flush Time = 82 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 63 mS
>>>>>>>>>>>>>>>>>>>> 17GB - 276 sst files
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Test2:
>>>>>>>>>>>>>>>>>>>> Flush Time = 116 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 35 mS
>>>>>>>>>>>>>>>>>>>> 23GB - 361 sst files
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Test3:
>>>>>>>>>>>>>>>>>>>> Flush Time = 103 mS
>>>>>>>>>>>>>>>>>>>> Scan Time = 82 mS
>>>>>>>>>>>>>>>>>>>> 19 GB - 300 sst files
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I had to limit my testing on my laptop to X = 2.5M
>>>>>>>>> events. I
>>>>>>>>>>>> tried
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> go
>>>>>>>>>>>>>>>>>>>> to X = 10M (110M events) but RocksDB was going into
>>>>>>> the
>>>>>>>>>> 100GB+
>>>>>>>>>>>>> range
>>>>>>>>>>>>>>>>>>> and my
>>>>>>>>>>>>>>>>>>>> laptop ran out of disk. More extensive testing could
>>>>>>> be
>>>>>>>>> done
>>>>>>>>>>>> but I
>>>>>>>>>>>>>>>>>>> suspect
>>>>>>>>>>>>>>>>>>>> that it would be in line with what we're seeing in the
>>>>>>>>>> results
>>>>>>>>>>>>>> above.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> At this point in time, I think the only major
>>>>>>> discussion
>>>>>>>>>> point
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>> around what Jan and I have disagreed on:
>>>>>>> repartitioning
>>>>>>>>>> back +
>>>>>>>>>>>>>>>> resolving
>>>>>>>>>>>>>>>>>>>> potential out of order issues or leaving that up to
>>>>>>> the
>>>>>>>>>> client
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> handle.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks folks,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 29.11.2018 15:14, John Roesler wrote:
>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Sorry that this discussion petered out... I think
>>>>>>> the
>>>>>>>>> 2.1
>>>>>>>>>>>>> release
>>>>>>>>>>>>>>>>>>>>> caused an
>>>>>>>>>>>>>>>>>>>>>> extended distraction that pushed it off everyone's
>>>>>>>>> radar
>>>>>>>>>>>> (which
>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>>>> precisely Adam's concern). Personally, I've also had
>>>>>>>>> some
>>>>>>>>>>>> extend
>>>>>>>>>>>>>>>>>>>>>> distractions of my own that kept (and continue to
>>>>>>>>> keep) me
>>>>>>>>>>>>>>>>>>> preoccupied.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> However, calling for a vote did wake me up, so I
>>>>>>> guess
>>>>>>>>> Jan
>>>>>>>>>>> was
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> right
>>>>>>>>>>>>>>>>>>>>>> track!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I've gone back and reviewed the whole KIP document
>>>>>>> and
>>>>>>>>> the
>>>>>>>>>>>> prior
>>>>>>>>>>>>>>>>>>>>>> discussion, and I'd like to offer a few thoughts:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> API Thoughts:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1. If I read the KIP right, you are proposing a
>>>>>>>>>> many-to-one
>>>>>>>>>>>>> join.
>>>>>>>>>>>>>>>>>>> Could
>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
>>>>>>>>> flip
>>>>>>>>>>> the
>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>>>> and make it a oneToManyJoin?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The proposed name "joinOnForeignKey" disguises the
>>>>>>> join
>>>>>>>>>>> type,
>>>>>>>>>>>>> and
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>> like it might trick some people into using it for a
>>>>>>>>>>> one-to-one
>>>>>>>>>>>>>> join.
>>>>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>> would work, of course, but it would be super
>>>>>>>>> inefficient
>>>>>>>>>>>>> compared
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> simple rekey-and-join.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 2. I might have missed it, but I don't think it's
>>>>>>>>>> specified
>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>>>>>> it's an
>>>>>>>>>>>>>>>>>>>>>> inner, outer, or left join. I'm guessing an outer
>>>>>>>>> join, as
>>>>>>>>>>>>>>>>>>> (neglecting
>>>>>>>>>>>>>>>>>>>>> IQ),
>>>>>>>>>>>>>>>>>>>>>> the rest can be achieved by filtering or by handling
>>>>>>>>> it in
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> ValueJoiner.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 3. The arg list to joinOnForeignKey doesn't look
>>>>>>> quite
>>>>>>>>>>> right.
>>>>>>>>>>>>>>>>>>>>>> 3a. Regarding Serialized: There are a few different
>>>>>>>>>>> paradigms
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> play in
>>>>>>>>>>>>>>>>>>>>>> the Streams API, so it's confusing, but instead of
>>>>>>>>> three
>>>>>>>>>>>>>> Serialized
>>>>>>>>>>>>>>>>>>>>> args, I
>>>>>>>>>>>>>>>>>>>>>> think it would be better to have one that allows
>>>>>>>>>>> (optionally)
>>>>>>>>>>>>>>>> setting
>>>>>>>>>>>>>>>>>>>>> the 4
>>>>>>>>>>>>>>>>>>>>>> incoming serdes. The result serde is defined by the
>>>>>>>>>>>>> Materialized.
>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>> incoming serdes can be optional because they might
>>>>>>>>> already
>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> the source KTables, or the default serdes from the
>>>>>>>>> config
>>>>>>>>>>>> might
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> applicable.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 3b. Is the StreamPartitioner necessary? The other
>>>>>>> joins
>>>>>>>>>>> don't
>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>>>> setting
>>>>>>>>>>>>>>>>>>>>>> one, and it seems like it might actually be harmful,
>>>>>>>>> since
>>>>>>>>>>> the
>>>>>>>>>>>>>> rekey
>>>>>>>>>>>>>>>>>>>>>> operation needs to produce results that are
>>>>>>>>> co-partitioned
>>>>>>>>>>>> with
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> "other"
>>>>>>>>>>>>>>>>>>>>>> KTable.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 4. I'm fine with the "reserved word" header, but I
>>>>>>>>> didn't
>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>> follow
>>>>>>>>>>>>>>>>>>>>>> what Matthias meant about namespacing requiring
>>>>>>>>>>>> "deserializing"
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>> header. The headers are already Strings, so I don't
>>>>>>>>> think
>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> deserialization is required. If we applied the
>>>>>>>>> namespace
>>>>>>>>>> at
>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>> nodes
>>>>>>>>>>>>>>>>>>>>>> and stripped it at sink nodes, this would be
>>>>>>>>> practically
>>>>>>>>>> no
>>>>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>> advantage of the namespace idea is that no public
>>>>>>> API
>>>>>>>>>> change
>>>>>>>>>>>> wrt
>>>>>>>>>>>>>>>>>>> headers
>>>>>>>>>>>>>>>>>>>>>> needs to happen, and no restrictions need to be
>>>>>>> placed
>>>>>>>>> on
>>>>>>>>>>>> users'
>>>>>>>>>>>>>>>>>>>>> headers.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> (Although I'm wondering if we can get away without
>>>>>>> the
>>>>>>>>>>> header
>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> all...
>>>>>>>>>>>>>>>>>>>>>> stay tuned)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 5. I also didn't follow the discussion about the HWM
>>>>>>>>> table
>>>>>>>>>>>>> growing
>>>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>>>>> bound. As I read it, the HWM table is effectively
>>>>>>>>>>> implementing
>>>>>>>>>>>>> OCC
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> resolve the problem you noted with disordering when
>>>>>>> the
>>>>>>>>>>> rekey
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> reversed... particularly notable when the FK
>>>>>>> changes.
>>>>>>>>> As
>>>>>>>>>>> such,
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>> needs to track the most recent "version" (the
>>>>>>> offset in
>>>>>>>>>> the
>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>> partition) of each key. Therefore, it should have
>>>>>>> the
>>>>>>>>> same
>>>>>>>>>>>>> number
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> keys
>>>>>>>>>>>>>>>>>>>>>> as the source table at all times.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I see that you are aware of KIP-258, which I think
>>>>>>>>> might
>>>>>>>>>> be
>>>>>>>>>>>>>> relevant
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> couple of ways. One: it's just about storing the
>>>>>>>>> timestamp
>>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>> store, but the ultimate idea is to effectively use
>>>>>>> the
>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>> OCC
>>>>>>>>>>>>>>>>>>>>>> "version" to drop disordered updates. You wouldn't
>>>>>>>>> want to
>>>>>>>>>>> use
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> timestamp for this operation, but if you were to
>>>>>>> use a
>>>>>>>>>>> similar
>>>>>>>>>>>>>>>>>>>>> mechanism to
>>>>>>>>>>>>>>>>>>>>>> store the source offset in the store alongside the
>>>>>>>>>> re-keyed
>>>>>>>>>>>>>> values,
>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> you could avoid a separate table.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 6. You and Jan have been thinking about this for a
>>>>>>> long
>>>>>>>>>>> time,
>>>>>>>>>>>> so
>>>>>>>>>>>>>>>> I've
>>>>>>>>>>>>>>>>>>>>>> probably missed something here, but I'm wondering
>>>>>>> if we
>>>>>>>>>> can
>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> HWM
>>>>>>>>>>>>>>>>>>>>>> tracking at all and resolve out-of-order during a
>>>>>>> final
>>>>>>>>>> join
>>>>>>>>>>>>>>>>>>> instead...
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Let's say we're joining a left table (Integer K:
>>>>>>> Letter
>>>>>>>>>> FK,
>>>>>>>>>>>>> (other
>>>>>>>>>>>>>>>>>>>>> data))
>>>>>>>>>>>>>>>>>>>>>> to a right table (Letter K: (some data)).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Left table:
>>>>>>>>>>>>>>>>>>>>>> 1: (A, xyz)
>>>>>>>>>>>>>>>>>>>>>> 2: (B, asd)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Right table:
>>>>>>>>>>>>>>>>>>>>>> A: EntityA
>>>>>>>>>>>>>>>>>>>>>> B: EntityB
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> We could do a rekey as you proposed with a combined
>>>>>>>>> key,
>>>>>>>>>> but
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> propagating the value at all..
>>>>>>>>>>>>>>>>>>>>>> Rekey table:
>>>>>>>>>>>>>>>>>>>>>> A-1: (dummy value)
>>>>>>>>>>>>>>>>>>>>>> B-2: (dummy value)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Which we then join with the right table to produce:
>>>>>>>>>>>>>>>>>>>>>> A-1: EntityA
>>>>>>>>>>>>>>>>>>>>>> B-2: EntityB
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Which gets rekeyed back:
>>>>>>>>>>>>>>>>>>>>>> 1: A, EntityA
>>>>>>>>>>>>>>>>>>>>>> 2: B, EntityB
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> And finally we do the actual join:
>>>>>>>>>>>>>>>>>>>>>> Result table:
>>>>>>>>>>>>>>>>>>>>>> 1: ((A, xyz), EntityA)
>>>>>>>>>>>>>>>>>>>>>> 2: ((B, asd), EntityB)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The thing is that in that last join, we have the
>>>>>>>>>> opportunity
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> compare
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> current FK in the left table with the incoming PK of
>>>>>>>>> the
>>>>>>>>>>> right
>>>>>>>>>>>>>>>>>>> table. If
>>>>>>>>>>>>>>>>>>>>>> they don't match, we just drop the event, since it
>>>>>>>>> must be
>>>>>>>>>>>>>> outdated.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> In your KIP, you gave an example in which (1: A,
>>>>>>> xyz)
>>>>>>>>> gets
>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> (1:
>>>>>>>>>>>>>>>>>>>>>> B, xyz), ultimately yielding a conundrum about
>>>>>>> whether
>>>>>>>>> the
>>>>>>>>>>>> final
>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>> should be (1: null) or (1: joined-on-B). With the
>>>>>>>>>> algorithm
>>>>>>>>>>>>> above,
>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
>>>>>>>>> (B,
>>>>>>>>>>> xyz),
>>>>>>>>>>>>> (B,
>>>>>>>>>>>>>>>>>>>>>> EntityB)). It seems like this does give you enough
>>>>>>>>>>> information
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> right choice, regardless of disordering.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Will check Adams patch, but this should work. As
>>>>>>>>> mentioned
>>>>>>>>>>>> often
>>>>>>>>>>>>> I
>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>> not convinced on partitioning back for the user
>>>>>>>>>>> automatically.
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>> this is the real performance eater ;)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 7. Last thought... I'm a little concerned about the
>>>>>>>>>>>> performance
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> range scans when records change in the right table.
>>>>>>>>> You've
>>>>>>>>>>>> said
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> you've
>>>>>>>>>>>>>>>>>>>>>> been using the algorithm you presented in production
>>>>>>>>> for a
>>>>>>>>>>>>> while.
>>>>>>>>>>>>>>>> Can
>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>> give us a sense of the performance characteristics
>>>>>>>>> you've
>>>>>>>>>>>>>> observed?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Make it work, make it fast, make it beautiful. The
>>>>>>>>> topmost
>>>>>>>>>>>> thing
>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> / was correctness. In practice I do not measure the
>>>>>>>>>>> performance
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> range scan. Usual cases I run this with is emitting
>>>>>>>>> 500k -
>>>>>>>>>>> 1kk
>>>>>>>>>>>>> rows
>>>>>>>>>>>>>>>>>>>>> on a left hand side change. The range scan is just
>>>>>>> the
>>>>>>>>> work
>>>>>>>>>>> you
>>>>>>>>>>>>>> gotta
>>>>>>>>>>>>>>>>>>>>> do, also when you pack your data into different
>>>>>>> formats,
>>>>>>>>>>>> usually
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> rocks performance is very tight to the size of the
>>>>>>> data
>>>>>>>>> and
>>>>>>>>>>> we
>>>>>>>>>>>>>> can't
>>>>>>>>>>>>>>>>>>>>> really change that. It is more important for users to
>>>>>>>>>> prevent
>>>>>>>>>>>>>> useless
>>>>>>>>>>>>>>>>>>>>> updates to begin with. My left hand side is guarded
>>>>>>> to
>>>>>>>>> drop
>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> are not going to change my join output.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> usually it's:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> drop unused fields and then don't forward if
>>>>>>>>>> old.equals(new)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> regarding to the performance of creating an iterator
>>>>>>> for
>>>>>>>>>>>> smaller
>>>>>>>>>>>>>>>>>>>>> fanouts, users can still just do a group by first
>>>>>>> then
>>>>>>>>>>> anyways.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I could only think of one alternative, but I'm not
>>>>>>>>> sure if
>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>> worse... If the first re-key only needs to preserve
>>>>>>> the
>>>>>>>>>>>> original
>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>>> as I
>>>>>>>>>>>>>>>>>>>>>> proposed in #6, then we could store a vector of
>>>>>>> keys in
>>>>>>>>>> the
>>>>>>>>>>>>> value:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Left table:
>>>>>>>>>>>>>>>>>>>>>> 1: A,...
>>>>>>>>>>>>>>>>>>>>>> 2: B,...
>>>>>>>>>>>>>>>>>>>>>> 3: A,...
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Gets re-keyed:
>>>>>>>>>>>>>>>>>>>>>> A: [1, 3]
>>>>>>>>>>>>>>>>>>>>>> B: [2]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Then, the rhs part of the join would only need a
>>>>>>>>> regular
>>>>>>>>>>>>>> single-key
>>>>>>>>>>>>>>>>>>>>> lookup.
>>>>>>>>>>>>>>>>>>>>>> Of course we have to deal with the problem of large
>>>>>>>>>> values,
>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> there's
>>>>>>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>>>> bound on the number of lhs records that can
>>>>>>> reference
>>>>>>>>> rhs
>>>>>>>>>>>>> records.
>>>>>>>>>>>>>>>>>>>>> Offhand,
>>>>>>>>>>>>>>>>>>>>>> I'd say we could page the values, so when one row is
>>>>>>>>> past
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> threshold, we
>>>>>>>>>>>>>>>>>>>>>> append the key for the next page. Then in most
>>>>>>> cases,
>>>>>>>>> it
>>>>>>>>>>> would
>>>>>>>>>>>>> be
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>> key lookup, but for large fan-out updates, it would
>>>>>>> be
>>>>>>>>> one
>>>>>>>>>>> per
>>>>>>>>>>>>>> (max
>>>>>>>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>>>>>>> size)/(avg lhs key size).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This seems more complex, though... Plus, I think
>>>>>>>>> there's
>>>>>>>>>>> some
>>>>>>>>>>>>>> extra
>>>>>>>>>>>>>>>>>>>>>> tracking we'd need to do to know when to emit a
>>>>>>>>>> retraction.
>>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>>>>> when record 1 is deleted, the re-key table would
>>>>>>> just
>>>>>>>>> have
>>>>>>>>>>> (A:
>>>>>>>>>>>>>> [3]).
>>>>>>>>>>>>>>>>>>>>> Some
>>>>>>>>>>>>>>>>>>>>>> kind of tombstone is needed so that the join result
>>>>>>>>> for 1
>>>>>>>>>>> can
>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> retracted.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> That's all!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks so much to both Adam and Jan for the
>>>>>>> thoughtful
>>>>>>>>>> KIP.
>>>>>>>>>>>>> Sorry
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> discussion has been slow.
>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Id say you can just call the vote.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> that happens all the time, and if something comes
>>>>>>> up,
>>>>>>>>> it
>>>>>>>>>>> just
>>>>>>>>>>>>>> goes
>>>>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>>>>>>> to discuss.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> would not expect to much attention with another
>>>>>>>>> another
>>>>>>>>>>> email
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> thread.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> best Jan
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Hello Contributors
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I know that 2.1 is about to be released, but I do
>>>>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>> bump
>>>>>>>>>>>>>>>>>>> this to
>>>>>>>>>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>>>>>>>>>> visibility up. I am still intending to push this
>>>>>>>>> through
>>>>>>>>>>>> once
>>>>>>>>>>>>>>>>>>>>> contributor
>>>>>>>>>>>>>>>>>>>>>>>> feedback is given.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Main points that need addressing:
>>>>>>>>>>>>>>>>>>>>>>>> 1) Any way (or benefit) in structuring the current
>>>>>>>>>>> singular
>>>>>>>>>>>>>> graph
>>>>>>>>>>>>>>>>>>> node
>>>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>> multiple nodes? It has a whopping 25 parameters
>>>>>>> right
>>>>>>>>>>> now. I
>>>>>>>>>>>>> am
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>>>>>>> fuzzy
>>>>>>>>>>>>>>>>>>>>>>>> on how the optimizations are supposed to work, so
>>>>>>> I
>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> appreciate
>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>> help on this aspect.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 2) Overall strategy for joining + resolving. This
>>>>>>>>> thread
>>>>>>>>>>> has
>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>>>>>>> discourse
>>>>>>>>>>>>>>>>>>>>>>>> between Jan and I between the current highwater
>>>>>>> mark
>>>>>>>>>>>> proposal
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> groupBy
>>>>>>>>>>>>>>>>>>>>>>>> + reduce proposal. I am of the opinion that we
>>>>>>> need
>>>>>>>>> to
>>>>>>>>>>>>> strictly
>>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>> chance of out-of-order data and leave none of it
>>>>>>> up
>>>>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>>>>>>> Any
>>>>>>>>>>>>>>>>>>>>>>>> comments or suggestions here would also help.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 3) Anything else that you see that would prevent
>>>>>>> this
>>>>>>>>>> from
>>>>>>>>>>>>>> moving
>>>>>>>>>>>>>>>>>>> to a
>>>>>>>>>>>>>>>>>>>>>>> vote?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>>>>>>>>>>>>>>>>>>>>>>> adam.bellemare@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jan
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> With the Stores.windowStoreBuilder and
>>>>>>>>>>>>>>>>>>> Stores.persistentWindowStore,
>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>> actually only need to specify the amount of
>>>>>>> segments
>>>>>>>>>> you
>>>>>>>>>>>> want
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>> large
>>>>>>>>>>>>>>>>>>>>>>>>> they are. To the best of my understanding, what
>>>>>>>>> happens
>>>>>>>>>>> is
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> segments are automatically rolled over as new
>>>>>>> data
>>>>>>>>> with
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>> timestamps
>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> created. We use this exact functionality in some
>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>>> work
>>>>>>>>>>>>>> done
>>>>>>>>>>>>>>>>>>>>>>>>> internally at my company. For reference, this is
>>>>>>> the
>>>>>>>>>>>> hopping
>>>>>>>>>>>>>>>>>>> windowed
>>>>>>>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>
>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> In the code that I have provided, there are going
>>>>>>>>> to be
>>>>>>>>>>> two
>>>>>>>>>>>>> 24h
>>>>>>>>>>>>>>>>>>>>>>> segments.
>>>>>>>>>>>>>>>>>>>>>>>>> When a record is put into the windowStore, it
>>>>>>> will
>>>>>>>>> be
>>>>>>>>>>>>> inserted
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>>>>>> T in
>>>>>>>>>>>>>>>>>>>>>>>>> both segments. The two segments will always
>>>>>>> overlap
>>>>>>>>> by
>>>>>>>>>>> 12h.
>>>>>>>>>>>>> As
>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>>>>>> goes on
>>>>>>>>>>>>>>>>>>>>>>>>> and new records are added (say at time T+12h+),
>>>>>>> the
>>>>>>>>>>> oldest
>>>>>>>>>>>>>>>> segment
>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> automatically deleted and a new segment created.
>>>>>>> The
>>>>>>>>>>>> records
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>>>>>>>>>> inserted with the context.timestamp(), such that
>>>>>>> it
>>>>>>>>> is
>>>>>>>>>>> the
>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>> time,
>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>> the clock time, which is used.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> To the best of my understanding, the timestamps
>>>>>>> are
>>>>>>>>>>>> retained
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>> restoring from the changelog.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Basically, this is heavy-handed way to deal with
>>>>>>> TTL
>>>>>>>>>> at a
>>>>>>>>>>>>>>>>>>>>> segment-level,
>>>>>>>>>>>>>>>>>>>>>>>>> instead of at an individual record level.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Will that work? I expected it to blow up with
>>>>>>>>>>>>>> ClassCastException
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>> similar.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> You either would have to specify the window you
>>>>>>>>>>> fetch/put
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> iterate
>>>>>>>>>>>>>>>>>>>>>>>>>> across all windows the key was found in right?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I just hope the window-store doesn't check
>>>>>>>>> stream-time
>>>>>>>>>>>> under
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> hoods
>>>>>>>>>>>>>>>>>>>>>>>>>> that would be a questionable interface.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> If it does: did you see my comment on checking
>>>>>>> all
>>>>>>>>> the
>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>>> earlier?
>>>>>>>>>>>>>>>>>>>>>>>>>> that would be needed to actually give reasonable
>>>>>>>>> time
>>>>>>>>>>>>>> gurantees.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Best
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jan
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
>>>>>>>>> changed
>>>>>>>>>>> the
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>> store,
>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>> the ProcessorSupplier.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the information. This is indeed
>>>>>>>>>> something
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> will be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extremely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful for this KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your explanations. That being
>>>>>>> said, I
>>>>>>>>>> will
>>>>>>>>>>>> not
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> moving
>>>>>>>>>>>>>>>>>>>>>>>>>> ahead
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with an implementation using
>>>>>>> reshuffle/groupBy
>>>>>>>>>>> solution
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>> propose.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That being said, if you wish to implement it
>>>>>>>>>> yourself
>>>>>>>>>>>> off
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>>> current PR
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and submit it as a competitive alternative, I
>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>>>> happy to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> help vet that as an alternate solution. As it
>>>>>>>>>> stands
>>>>>>>>>>>>> right
>>>>>>>>>>>>>>>>>>> now,
>>>>>>>>>>>>>>>>>>>>> I do
>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really have more time to invest into
>>>>>>>>> alternatives
>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>> being
>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strong indication from the binding voters
>>>>>>> which
>>>>>>>>>> they
>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> prefer.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey, total no worries. I think I personally
>>>>>>> gave
>>>>>>>>> up
>>>>>>>>>> on
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> some time already, otherwise I would have
>>>>>>> pulled
>>>>>>>>>> this
>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>>> already.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am currently reimplementing my own DSL
>>>>>>> based on
>>>>>>>>>>> PAPI.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will look at finishing up my PR with the
>>>>>>>>> windowed
>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> next
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> week or so, exercising it via tests, and
>>>>>>> then I
>>>>>>>>>> will
>>>>>>>>>>>> come
>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> final
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. In the meantime, I hope that
>>>>>>> any of
>>>>>>>>>> the
>>>>>>>>>>>>>> binding
>>>>>>>>>>>>>>>>>>>>> voters
>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
>>>>>>>>> updated
>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> according
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> latest plan:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have also updated the KIP PR to use a
>>>>>>> windowed
>>>>>>>>>>> store.
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever
>>>>>>> they
>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> completed.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
>>>>>>>>>> already
>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> PR?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> expected it to change to Windowed<K>,Long
>>>>>>> Missing
>>>>>>>>>>>>> something?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang
>>>>>>> Wang <
>>>>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533
>>>>>>> is
>>>>>>>>> the
>>>>>>>>>>>> wrong
>>>>>>>>>>>>>>>> link,
>>>>>>>>>>>>>>>>>>>>> as it
>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
>>>>>>>>> part of
>>>>>>>>>>>>> KIP-258
>>>>>>>>>>>>>>>>>>> we do
>>>>>>>>>>>>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have "handling out-of-order data for source
>>>>>>>>>> KTable"
>>>>>>>>>>>> such
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> blindly apply the updates to the
>>>>>>> materialized
>>>>>>>>>> store,
>>>>>>>>>>>>> i.e.
>>>>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering, we will reject updates that are
>>>>>>> older
>>>>>>>>>> than
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>>>>>>>> key's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps, i.e. following timestamp
>>>>>>> ordering.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
>>>>>>>>> Wang <
>>>>>>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
>>>>>>>>> final
>>>>>>>>>>> step
>>>>>>>>>>>>>> (i.e.
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> high
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
>>>>>>>>> with
>>>>>>>>>> a
>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>>> store),
>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another current on-going KIP may actually
>>>>>>>>> help:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is for adding the timestamp into a
>>>>>>>>> key-value
>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>> (i.e.
>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
>>>>>>>>> usage,
>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> described
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533
>>>>>>>>>> ,
>>>>>>>>>>> is
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "reject" updates from the source topics if
>>>>>>> its
>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> smaller
>>>>>>>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current key's latest update timestamp.
>>>>>>> I
>>>>>>>>>> think
>>>>>>>>>>> it
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>>> similar to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what you have in mind for high watermark
>>>>>>> based
>>>>>>>>>>>>> filtering,
>>>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to make sure that the timestamps of
>>>>>>> the
>>>>>>>>>>> joining
>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> correctly
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> inherited though the whole topology to the
>>>>>>>>> final
>>>>>>>>>>>> stage.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Note that this KIP is for key-value store
>>>>>>> and
>>>>>>>>>> hence
>>>>>>>>>>>>>>>>>>>>> non-windowed
>>>>>>>>>>>>>>>>>>>>>>>>>> KTables
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
>>>>>>>>> really
>>>>>>>>>>> have
>>>>>>>>>>>> a
>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> their joins anyways (
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think we can just consider non-windowed
>>>>>>>>>>> KTable-KTable
>>>>>>>>>>>>>>>>>>> non-key
>>>>>>>>>>>>>>>>>>>>>>> joins
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan
>>>>>>> Filipiak
>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current highwater mark implementation
>>>>>>> would
>>>>>>>>>> grow
>>>>>>>>>>>>>>>> endlessly
>>>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary key of original event. It is a
>>>>>>> pair
>>>>>>>>> of
>>>>>>>>>>>> (<this
>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>> primary
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key>,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <highest offset seen for that key>). This
>>>>>>> is
>>>>>>>>> used
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> differentiate
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
>>>>>>>>> proposal
>>>>>>>>>>>> would
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
>>>>>>>>> This
>>>>>>>>>>> would
>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on
>>>>>>> time.
>>>>>>>>> This
>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
>>>>>>>>>> should
>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> customizable
>>>>>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
>>>>>>>>> perhaps
>>>>>>>>>>> just
>>>>>>>>>>>>> 10
>>>>>>>>>>>>>>>>>>>>> minutes
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or perhaps 7 days...).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can
>>>>>>> do
>>>>>>>>> the
>>>>>>>>>>>> trick
>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>>>>>> Even
>>>>>>>>>>>>>>>>>>>>>>>>>> if I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would still like to see the automatic
>>>>>>>>>>> repartitioning
>>>>>>>>>>>>>>>>>>> optional
>>>>>>>>>>>>>>>>>>>>>>>>>> since I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
>>>>>>>>> am a
>>>>>>>>>>>> little
>>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>>>>>>>>>> sceptical
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how to determine the window. So esentially
>>>>>>> one
>>>>>>>>>>> could
>>>>>>>>>>>>> run
>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>> problems
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the rapid change happens near a window
>>>>>>>>> border. I
>>>>>>>>>>> will
>>>>>>>>>>>>>> check
>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in detail, if its
>>>>>>>>> problematic, we
>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>>>>>>> check
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> _all_
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows on read with not to bad
>>>>>>> performance
>>>>>>>>>>> impact I
>>>>>>>>>>>>>>>> guess.
>>>>>>>>>>>>>>>>>>>>> Will
>>>>>>>>>>>>>>>>>>>>>>>>>> let
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know if the implementation would be
>>>>>>> correct
>>>>>>>>> as
>>>>>>>>>>> is. I
>>>>>>>>>>>>>>>>>>> wouldn't
>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
>>>>>>>>>>> timestamp(A)  <
>>>>>>>>>>>>>>>>>>>>>>> timestamp(B).
>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we can't expect that.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I believe I understand what you mean now
>>>>>>> -
>>>>>>>>>> thanks
>>>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> diagram, it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did really help. You are correct that I
>>>>>>> do
>>>>>>>>> not
>>>>>>>>>>> have
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key available, and I can see that if it was
>>>>>>>>>>> available
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> able to add and remove events from the
>>>>>>> Map.
>>>>>>>>>> That
>>>>>>>>>>>>> being
>>>>>>>>>>>>>>>>>>> said,
>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> encourage
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you to finish your diagrams / charts just
>>>>>>> for
>>>>>>>>>>> clarity
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> everyone
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> else.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just
>>>>>>> really
>>>>>>>>> hard
>>>>>>>>>>>> work.
>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
>>>>>>>>>>> original
>>>>>>>>>>>>>>>> primary
>>>>>>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join and Group by implemented our own in
>>>>>>> PAPI
>>>>>>>>>> and
>>>>>>>>>>>>>>>> basically
>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely
>>>>>>> missed
>>>>>>>>>> that
>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
>>>>>>>>> up on
>>>>>>>>>>> my
>>>>>>>>>>>>> end.
>>>>>>>>>>>>>>>>>>> Will
>>>>>>>>>>>>>>>>>>>>>>>>>> finish
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
>>>>>>>>> week.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My follow up question for you is, won't
>>>>>>> the
>>>>>>>>> Map
>>>>>>>>>>> stay
>>>>>>>>>>>>>>>> inside
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> State
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Store indefinitely after all of the
>>>>>>> changes
>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> propagated?
>>>>>>>>>>>>>>>>>>>>>>> Isn't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
>>>>>>>>> state
>>>>>>>>>>>> store?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
>>>>>>>>> substractor
>>>>>>>>>> is
>>>>>>>>>>>>> gonna
>>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>>>>>> `null`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
>>>>>>>>> there
>>>>>>>>>> is
>>>>>>>>>>>>> going
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use
>>>>>>> this
>>>>>>>>>> store
>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues()
>>>>>>> is a
>>>>>>>>>>>> regular
>>>>>>>>>>>>>>>>>>> store,
>>>>>>>>>>>>>>>>>>>>>>>>>> satisfying
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
>>>>>>>>> join.
>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>> Windowed
>>>>>>>>>>>>>>>>>>>>>>>>>> store is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keeping the values, so for the next
>>>>>>> statefull
>>>>>>>>>>>> operation
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
>>>>>>>>> have
>>>>>>>>>> the
>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the values then.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Long story short. if we can flip in a
>>>>>>> custom
>>>>>>>>>> group
>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioning to the original primary
>>>>>>> key i
>>>>>>>>>> think
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> big time in building efficient apps. Given
>>>>>>> the
>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>> primary
>>>>>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issue I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand that we do not have a solid
>>>>>>>>> foundation
>>>>>>>>>>> to
>>>>>>>>>>>>>> build
>>>>>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Leaving primary key carry along to the
>>>>>>> user.
>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>> unfortunate. I
>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand the decision goes like that. I
>>>>>>> do
>>>>>>>>> not
>>>>>>>>>>>> think
>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM,
>>>>>>> Prajakta
>>>>>>>>>>> Dumbre <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com <mailto:
>>>>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              please remove me from this
>>>>>>> group
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              On Tue, Sep 11, 2018 at 1:29 PM
>>>>>>>>> Jan
>>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              <Jan.Filipiak@trivago.com
>>>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > Hi Adam,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > give me some time, will make
>>>>>>>>> such a
>>>>>>>>>>>>> chart.
>>>>>>>>>>>>>>>> last
>>>>>>>>>>>>>>>>>>>>> time i
>>>>>>>>>>>>>>>>>>>>>>>>>> didn't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              get along
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > well with giphy and ruined
>>>>>>> all
>>>>>>>>> your
>>>>>>>>>>>>> charts.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > Hopefully i can get it done
>>>>>>>>> today
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > On 08.09.2018 16:00, Adam
>>>>>>>>> Bellemare
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > Hi Jan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > I have included a diagram
>>>>>>> of
>>>>>>>>>> what I
>>>>>>>>>>>>>>>> attempted
>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              <
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > I attempted this back at
>>>>>>> the
>>>>>>>>>> start
>>>>>>>>>>> of
>>>>>>>>>>>>> my
>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > solution, and since I could
>>>>>>>>> not
>>>>>>>>>> get
>>>>>>>>>>>> it
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> work I
>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              discarded the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > code. At this point in
>>>>>>> time,
>>>>>>>>> if
>>>>>>>>>> you
>>>>>>>>>>>>> wish
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> continue
>>>>>>>>>>>>>>>>>>>>>>>>>> pursuing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              for your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > groupBy solution, I ask
>>>>>>> that
>>>>>>>>> you
>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>> create a
>>>>>>>>>>>>>>>>>>>>>>>>>> diagram on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              the KIP
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > carefully explaining your
>>>>>>>>>> solution.
>>>>>>>>>>>>>> Please
>>>>>>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>>>>>>>> free
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              the image I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > just posted as a starting
>>>>>>>>> point.
>>>>>>>>>> I
>>>>>>>>>>> am
>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>>>>>>> trouble
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              understanding your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > explanations but I think
>>>>>>> that
>>>>>>>>> a
>>>>>>>>>>>>> carefully
>>>>>>>>>>>>>>>>>>>>> constructed
>>>>>>>>>>>>>>>>>>>>>>>>>> diagram
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              will clear
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > up
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > any misunderstandings.
>>>>>>>>>> Alternately,
>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>> post a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              comprehensive PR with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > your solution. I can only
>>>>>>>>> guess
>>>>>>>>>> at
>>>>>>>>>>>> what
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>> mean, and
>>>>>>>>>>>>>>>>>>>>>>>>>> since I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              value my
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > own
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > time as much as you value
>>>>>>>>> yours,
>>>>>>>>>> I
>>>>>>>>>>>>>> believe
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              responsibility to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > provide an implementation
>>>>>>>>> instead
>>>>>>>>>>> of
>>>>>>>>>>>> me
>>>>>>>>>>>>>>>>>>> trying to
>>>>>>>>>>>>>>>>>>>>>>> guess.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > Adam
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > On Sat, Sep 8, 2018 at 8:00
>>>>>>>>> AM,
>>>>>>>>>> Jan
>>>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              <Jan.Filipiak@trivago.com
>>>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > > wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> Hi James,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> nice to see you beeing
>>>>>>>>>> interested.
>>>>>>>>>>>>> kafka
>>>>>>>>>>>>>>>>>>>>> streams at
>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              point supports
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> all sorts of joins as
>>>>>>> long as
>>>>>>>>>> both
>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> Adam is currently
>>>>>>>>> implementing a
>>>>>>>>>>>> join
>>>>>>>>>>>>>>>> where a
>>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>>>>>>>> and a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              KTable can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> a one to many relation
>>>>>>> ship
>>>>>>>>>> (1:n).
>>>>>>>>>>>> We
>>>>>>>>>>>>>>>> exploit
>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> datastore that keeps data
>>>>>>>>> sorted
>>>>>>>>>> (At
>>>>>>>>>>>>> least
>>>>>>>>>>>>>>>>>>>>> exposes an
>>>>>>>>>>>>>>>>>>>>>>>>>> API to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              access the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> stored data in a sorted
>>>>>>>>>> fashion).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> I think the technical
>>>>>>> caveats
>>>>>>>>>> are
>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>>>>>> understood
>>>>>>>>>>>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > basically
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> down to philosophy and API
>>>>>>>>>> Design
>>>>>>>>>>> (
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>> sees
>>>>>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>>> newest
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              message).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> I have a lengthy track
>>>>>>>>> record of
>>>>>>>>>>>>> loosing
>>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>> kinda
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              arguments within
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> streams community and I
>>>>>>> have
>>>>>>>>> no
>>>>>>>>>>> clue
>>>>>>>>>>>>>> why.
>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> literally
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              can't wait for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> to churn through this
>>>>>>> thread
>>>>>>>>> and
>>>>>>>>>>>> give
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>> opinion on
>>>>>>>>>>>>>>>>>>>>>>>>>> how we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > design
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> the return type of the
>>>>>>>>>>> oneToManyJoin
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>>>>> power we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              want to give
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> the user vs "simplicity"
>>>>>>>>> (where
>>>>>>>>>>>>>> simplicity
>>>>>>>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>>>>>>>>>>>>>> really that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              as users
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > still
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> need to understand it I
>>>>>>>>> argue)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> waiting for you to join
>>>>>>> in on
>>>>>>>>>> the
>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> Best Jan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >> On 07.09.2018 15:49, James
>>>>>>>>> Kwan
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>> I am new to this group
>>>>>>> and I
>>>>>>>>>>> found
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> subject
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              interesting.  Sounds
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>> you guys want to
>>>>>>> implement a
>>>>>>>>>> join
>>>>>>>>>>>>>> table of
>>>>>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>>>>>>>>>> streams? Is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > somewhere
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>> I can see the original
>>>>>>>>>>> requirement
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> proposal?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>> On Sep 7, 2018, at 8:13
>>>>>>> AM,
>>>>>>>>> Jan
>>>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              <Jan.Filipiak@trivago.com
>>>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> On 05.09.2018 22:17,
>>>>>>> Adam
>>>>>>>>>>>> Bellemare
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> I'm currently testing
>>>>>>>>> using a
>>>>>>>>>>>>>> Windowed
>>>>>>>>>>>>>>>>>>> Store
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> store the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              highwater
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> mark.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> By all indications this
>>>>>>>>>> should
>>>>>>>>>>>> work
>>>>>>>>>>>>>>>> fine,
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> caveat
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              being that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> only resolve
>>>>>>> out-of-order
>>>>>>>>>>> arrival
>>>>>>>>>>>>>> for up
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> size of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              the window
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > (ie:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> 24h, 72h, etc). This
>>>>>>> would
>>>>>>>>>>> remove
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> possibility
>>>>>>>>>>>>>>>>>>>>>>>>>> of it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> being
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > unbounded
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> size.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> With regards to Jan's
>>>>>>>>>>>> suggestion, I
>>>>>>>>>>>>>>>>>>> believe
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              we will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> remain in disagreement.
>>>>>>>>>> While I
>>>>>>>>>>>> do
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> disagree
>>>>>>>>>>>>>>>>>>>>>>>>>> with your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              statement
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> there likely to be
>>>>>>>>> additional
>>>>>>>>>>>> joins
>>>>>>>>>>>>>> done
>>>>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>>>> real-world
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              workflow, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > do
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> see how you can
>>>>>>>>> conclusively
>>>>>>>>>>> deal
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arrival
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> foreign-key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> changes and subsequent
>>>>>>>>>> joins. I
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>> attempted
>>>>>>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              think you have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> proposed (without a
>>>>>>>>>> high-water,
>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>> groupBy and
>>>>>>>>>>>>>>>>>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              and found
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> that if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> the foreign key changes
>>>>>>>>> too
>>>>>>>>>>>>> quickly,
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> load
>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              stream thread
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> too
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> high, the joined
>>>>>>> messages
>>>>>>>>>> will
>>>>>>>>>>>>> arrive
>>>>>>>>>>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>>>>>> and be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              incorrectly
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> propagated, such that
>>>>>>> an
>>>>>>>>>>>>> intermediate
>>>>>>>>>>>>>>>>>>> event
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> represented
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              as the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > final
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> event.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> Can you shed some light
>>>>>>> on
>>>>>>>>>> your
>>>>>>>>>>>>>> groupBy
>>>>>>>>>>>>>>>>>>>>>>>>>> implementation.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              There must be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> some sort of flaw in it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> I have a suspicion
>>>>>>> where it
>>>>>>>>>> is,
>>>>>>>>>>> I
>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              confirm. The idea
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> is bullet proof and it
>>>>>>>>> must be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> an implementation mess
>>>>>>> up.
>>>>>>>>> I
>>>>>>>>>>> would
>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> clarify
>>>>>>>>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              we draw a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> conclusion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>    Repartitioning the
>>>>>>>>>> scattered
>>>>>>>>>>>>> events
>>>>>>>>>>>>>>>>>>> back to
>>>>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> partitions is the only
>>>>>>> way I
>>>>>>>>>> know
>>>>>>>>>>>> how
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> conclusively
>>>>>>>>>>>>>>>>>>>>>>>>>> deal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> out-of-order events in
>>>>>>> a
>>>>>>>>>> given
>>>>>>>>>>>> time
>>>>>>>>>>>>>>>> frame,
>>>>>>>>>>>>>>>>>>>>> and to
>>>>>>>>>>>>>>>>>>>>>>>>>> ensure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > data
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> eventually consistent
>>>>>>> with
>>>>>>>>>> the
>>>>>>>>>>>>> input
>>>>>>>>>>>>>>>>>>> events.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> If you have some code
>>>>>>> to
>>>>>>>>>> share
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> illustrates
>>>>>>>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              approach, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> very grateful as it
>>>>>>> would
>>>>>>>>>>> remove
>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>> misunderstandings
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              that I may
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > have.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> ah okay you were looking
>>>>>>>>> for
>>>>>>>>>> my
>>>>>>>>>>>>> code.
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              something easily
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> readable here as its
>>>>>>>>> bloated
>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> OO-patterns.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> its anyhow trivial:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> @Override
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>      public T apply(K
>>>>>>>>> aggKey,
>>>>>>>>>> V
>>>>>>>>>>>>>> value, T
>>>>>>>>>>>>>>>>>>>>>>> aggregate)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>      {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          Map<U, V>
>>>>>>>>>>>>> currentStateAsMap =
>>>>>>>>>>>>>>>>>>>>>>>>>> asMap(aggregate);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              imaginary
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          U toModifyKey =
>>>>>>>>>>>>>>>>>>> mapper.apply(value);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>              << this is
>>>>>>> the
>>>>>>>>>>> place
>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>> people
>>>>>>>>>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              gonna have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > issues
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> and why you probably
>>>>>>>>> couldn't
>>>>>>>>>> do
>>>>>>>>>>>> it.
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>>>>> to find
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              a solution
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > here.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> I didn't realize that
>>>>>>> yet.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>              << we
>>>>>>>>> propagate
>>>>>>>>>> the
>>>>>>>>>>>>>> field in
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> joiner, so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              that we can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > pick
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> it up in an aggregate.
>>>>>>>>>> Probably
>>>>>>>>>>>> you
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> thought
>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              this in your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> approach right?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>              << I am
>>>>>>> very
>>>>>>>>> open
>>>>>>>>>>> to
>>>>>>>>>>>>>> find a
>>>>>>>>>>>>>>>>>>>>> generic
>>>>>>>>>>>>>>>>>>>>>>>>>> solution
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              here. In my
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> honest opinion this is
>>>>>>>>> broken
>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> KTableImpl.GroupBy
>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              looses
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > the keys
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> and only maintains the
>>>>>>>>>> aggregate
>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>              << I
>>>>>>>>> abstracted
>>>>>>>>>> it
>>>>>>>>>>>> away
>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>>>>> then way
>>>>>>>>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> i
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > thinking
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> of oneToMany join. That
>>>>>>> is
>>>>>>>>>> why I
>>>>>>>>>>>>>> didn't
>>>>>>>>>>>>>>>>>>>>> realize
>>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              significance here.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>              <<
>>>>>>> Opinions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          for (V m :
>>>>>>>>> current)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>> currentStateAsMap.put(mapper.apply(m),
>>>>>>>>>>>>>>>> m);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          if (isAdder)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>> currentStateAsMap.put(toModifyKey,
>>>>>>>>>>>>>>>> value);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          else
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>> currentStateAsMap.remove(toModifyKey);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>> if(currentStateAsMap.isEmpty()){
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>                  return
>>>>>>>>> null;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>              }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>          retrun
>>>>>>>>>>>>>>>>>>>>> asAggregateType(currentStateAsMap)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>      }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> Adam
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> On Wed, Sep 5, 2018 at
>>>>>>>>> 3:35
>>>>>>>>>> PM,
>>>>>>>>>>>> Jan
>>>>>>>>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > Jan.Filipiak@trivago.com
>>>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>> Thanks Adam for
>>>>>>> bringing
>>>>>>>>>>> Matthias
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> speed!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> about the
>>>>>>> differences. I
>>>>>>>>>> think
>>>>>>>>>>>>>>>> re-keying
>>>>>>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              optional at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> best.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> I would say we return
>>>>>>> a
>>>>>>>>>>>>>> KScatteredTable
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> reshuffle()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              returning
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>> KTable<originalKey,Joined>
>>>>>>>>>> to
>>>>>>>>>>>> make
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> backwards
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              repartitioning
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> optional.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> I am also in a big
>>>>>>>>> favour of
>>>>>>>>>>>> doing
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> order
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              processing using
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> group
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> by instead high water
>>>>>>>>> mark
>>>>>>>>>>>>> tracking.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> Just because unbounded
>>>>>>>>>> growth
>>>>>>>>>>> is
>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>> scary
>>>>>>>>>>>>>>>>>>>>> + It
>>>>>>>>>>>>>>>>>>>>>>>>>> saves
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              the header
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> stuff.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> I think the
>>>>>>> abstraction
>>>>>>>>> of
>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>> repartitioning
>>>>>>>>>>>>>>>>>>>>>>>>>> back is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              just not so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> strong. Like the work
>>>>>>> has
>>>>>>>>>> been
>>>>>>>>>>>>> done
>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              back and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> grouping
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> by something else
>>>>>>>>> afterwards
>>>>>>>>>>> is
>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>> common.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> On 05.09.2018 13:49,
>>>>>>> Adam
>>>>>>>>>>>>> Bellemare
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>> Hi Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Thank you for your
>>>>>>>>>> feedback,
>>>>>>>>>>> I
>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>> appreciate
>>>>>>>>>>>>>>>>>>>>>>> it!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> While name spacing
>>>>>>>>> would be
>>>>>>>>>>>>>> possible,
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > deserialize
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> user headers what
>>>>>>>>> implies
>>>>>>>>>> a
>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              suggest to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > no
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> namespace for now to
>>>>>>>>> avoid
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > problem in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> the future, we can
>>>>>>>>> still
>>>>>>>>>> add
>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>> spacing
>>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Agreed. I will go
>>>>>>> with
>>>>>>>>>>> using a
>>>>>>>>>>>>>>>> reserved
>>>>>>>>>>>>>>>>>>>>> string
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              document it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> My main concern about
>>>>>>>>> the
>>>>>>>>>>>> design
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> type of
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              result KTable:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> understood the
>>>>>>> proposal
>>>>>>>>>>>>> correctly,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> In your example, you
>>>>>>>>> have
>>>>>>>>>>>> table1
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> table2
>>>>>>>>>>>>>>>>>>>>>>>>>> swapped.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              Here is how it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> works
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> currently:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> 1) table1 has the
>>>>>>>>> records
>>>>>>>>>>> that
>>>>>>>>>>>>>> contain
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> foreign key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              within their
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> value.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> table1 input stream:
>>>>>>>>>>>>>> <a,(fk=A,bar=1)>,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> <c,(fk=B,bar=3)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> table2 input stream:
>>>>>>>>> <A,X>,
>>>>>>>>>>>> <B,Y>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> 2) A Value mapper is
>>>>>>>>>> required
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> extract
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> foreign
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> table1 foreign key
>>>>>>>>> mapper:
>>>>>>>>>> (
>>>>>>>>>>>>> value
>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>>>>>>>>> value.fk
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              <http://value.fk> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> The mapper is
>>>>>>> applied to
>>>>>>>>>> each
>>>>>>>>>>>>>> element
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> table1,
>>>>>>>>>>>>>>>>>>>>>>>>>> and a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              new combined
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> key is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> made:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> table1 mapped: <A-a,
>>>>>>>>>>>>> (fk=A,bar=1)>,
>>>>>>>>>>>>>>>>>>> <A-b,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              <B-c,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> (fk=B,bar=3)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> 3) The rekeyed events
>>>>>>>>> are
>>>>>>>>>>>>>>>> copartitioned
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> table2:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> a) Stream Thread with
>>>>>>>>>>> Partition
>>>>>>>>>>>>> 0:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> RepartitionedTable1:
>>>>>>>>> <A-a,
>>>>>>>>>>>>>>>>>>> (fk=A,bar=1)>,
>>>>>>>>>>>>>>>>>>>>> <A-b,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              (fk=A,bar=2)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Table2: <A,X>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> b) Stream Thread with
>>>>>>>>>>> Partition
>>>>>>>>>>>>> 1:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> RepartitionedTable1:
>>>>>>>>> <B-c,
>>>>>>>>>>>>>>>> (fk=B,bar=3)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Table2: <B,Y>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> 4) From here, they
>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>>> joined
>>>>>>>>>>>>>>>>>>> together
>>>>>>>>>>>>>>>>>>>>>>> locally
>>>>>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              applying the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> joiner
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> function.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> At this point, Jan's
>>>>>>>>> design
>>>>>>>>>>> and
>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>>>>>>>>>>> deviate. My
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              design goes
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> repartition the data
>>>>>>>>>>> post-join
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> resolve
>>>>>>>>>>>>>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              arrival of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> records,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> finally returning the
>>>>>>>>> data
>>>>>>>>>>>> keyed
>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> original key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              I do not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> expose
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> CombinedKey or any of
>>>>>>>>> the
>>>>>>>>>>>>> internals
>>>>>>>>>>>>>>>>>>>>> outside of
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              joinOnForeignKey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> function. This does
>>>>>>> make
>>>>>>>>>> for
>>>>>>>>>>>>> larger
>>>>>>>>>>>>>>>>>>>>> footprint,
>>>>>>>>>>>>>>>>>>>>>>>>>> but it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              removes all
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> agency
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> for resolving
>>>>>>>>> out-of-order
>>>>>>>>>>>>> arrivals
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> handling
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              CombinedKeys from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> user. I believe that
>>>>>>>>> this
>>>>>>>>>>> makes
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              to use.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Let me know if this
>>>>>>>>> helps
>>>>>>>>>>>> resolve
>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>>>>> questions,
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              please feel
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> free to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> add anything else on
>>>>>>>>> your
>>>>>>>>>>> mind.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Thanks again,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Adam
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> On Tue, Sep 4, 2018
>>>>>>> at
>>>>>>>>> 8:36
>>>>>>>>>>> PM,
>>>>>>>>>>>>>>>>>>> Matthias J.
>>>>>>>>>>>>>>>>>>>>>>> Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>> matthias@confluent.io
>>>>>>>>>>> <mailto:
>>>>>>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> I am just catching
>>>>>>> up
>>>>>>>>> on
>>>>>>>>>>> this
>>>>>>>>>>>>>>>> thread. I
>>>>>>>>>>>>>>>>>>>>> did
>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>> read
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              everything so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> far,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> but want to share
>>>>>>>>> couple
>>>>>>>>>> of
>>>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Headers: I think
>>>>>>> there
>>>>>>>>> is
>>>>>>>>>> a
>>>>>>>>>>>>>>>> fundamental
>>>>>>>>>>>>>>>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              between header
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> usage
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> in this KIP and
>>>>>>> KP-258.
>>>>>>>>>> For
>>>>>>>>>>>> 258,
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>>>> headers
>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              changelog topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> are owned by Kafka
>>>>>>>>> Streams
>>>>>>>>>>> and
>>>>>>>>>>>>>> nobody
>>>>>>>>>>>>>>>>>>>>> else is
>>>>>>>>>>>>>>>>>>>>>>>>>> supposed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              to write
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > into
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> them. In fact, no
>>>>>>> user
>>>>>>>>>>> header
>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              changelog topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> thus, there are not
>>>>>>>>>>> conflicts.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Nevertheless, I
>>>>>>> don't
>>>>>>>>> see
>>>>>>>>>> a
>>>>>>>>>>>> big
>>>>>>>>>>>>>> issue
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              headers within
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Streams.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> As long as we
>>>>>>> document
>>>>>>>>> it,
>>>>>>>>>>> we
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>> "reserved"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              header keys
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> users are not
>>>>>>> allowed
>>>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>>>>>> data with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              Kafka
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > Streams.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> IMHO, this should be
>>>>>>>>> ok.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> I think there is a
>>>>>>> safe
>>>>>>>>>> way
>>>>>>>>>>> to
>>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>>>>>>>>> conflicts,
>>>>>>>>>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > headers
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> only needed in
>>>>>>>>> internal
>>>>>>>>>>>> topics
>>>>>>>>>>>>> (I
>>>>>>>>>>>>>>>>>>> think):
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> For internal and
>>>>>>>>>> changelog
>>>>>>>>>>>>>> topics,
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> namespace
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              all headers:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> * user-defined
>>>>>>> headers
>>>>>>>>>> are
>>>>>>>>>>>>>>>> namespaced
>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>> "external."
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              headerKey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> * internal headers
>>>>>>> are
>>>>>>>>>>>>>> namespaced as
>>>>>>>>>>>>>>>>>>>>>>>>>> "internal." +
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              headerKey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> While name spacing
>>>>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>>>>>> possible,
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> deserialize
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> user headers what
>>>>>>>>> implies
>>>>>>>>>> a
>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              suggest to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > no
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> namespace for now to
>>>>>>>>> avoid
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > problem in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> the future, we can
>>>>>>>>> still
>>>>>>>>>> add
>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>> spacing
>>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> My main concern
>>>>>>> about
>>>>>>>>> the
>>>>>>>>>>>> design
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> type
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              result KTable:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> If I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> understood the
>>>>>>> proposal
>>>>>>>>>>>>> correctly,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> KTable<K1,V1>
>>>>>>> table1 =
>>>>>>>>> ...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> KTable<K2,V2>
>>>>>>> table2 =
>>>>>>>>> ...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> KTable<K1,V3>
>>>>>>>>> joinedTable
>>>>>>>>>> =
>>>>>>>>>>>>>>>>>>>>>>>>>> table1.join(table2,...);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> implies that the
>>>>>>>>>>> `joinedTable`
>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>> as the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              left input
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> table.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> IMHO, this does not
>>>>>>>>> work
>>>>>>>>>>>> because
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> table2
>>>>>>>>>>>>>>>>>>>>>>>>>> contains
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              multiple rows
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> join with a record
>>>>>>> in
>>>>>>>>>> table1
>>>>>>>>>>>>>> (what is
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> main
>>>>>>>>>>>>>>>>>>>>>>>>>> purpose
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > foreign
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> join), the result
>>>>>>> table
>>>>>>>>>>> would
>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>> contain a
>>>>>>>>>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              join result,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> multiple.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Example:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> table1 input stream:
>>>>>>>>> <A,X>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> table2 input stream:
>>>>>>>>>>>> <a,(A,1)>,
>>>>>>>>>>>>>>>>>>> <b,(A,2)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> We use table2 value
>>>>>>> a
>>>>>>>>>>> foreign
>>>>>>>>>>>>> key
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> table1
>>>>>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              "A" joins).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> result key is the
>>>>>>> same
>>>>>>>>> key
>>>>>>>>>>> as
>>>>>>>>>>>>> key
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> table1,
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              implies that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> result can either be
>>>>>>>>> <A,
>>>>>>>>>>>>>> join(X,1)>
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> <A,
>>>>>>>>>>>>>>>>>>>>>>>>>> join(X,2)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              but not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > both.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Because the share
>>>>>>> the
>>>>>>>>> same
>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>> whatever
>>>>>>>>>>>>>>>>>>>>>>> result
>>>>>>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              we emit
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > later,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> overwrite the
>>>>>>> previous
>>>>>>>>>>> result.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> This is the reason
>>>>>>> why
>>>>>>>>> Jan
>>>>>>>>>>>>>> originally
>>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>>>>> to use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > combination
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> both primary keys of
>>>>>>>>> the
>>>>>>>>>>> input
>>>>>>>>>>>>>> tables
>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              output table.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> makes the keys of
>>>>>>> the
>>>>>>>>>> output
>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>> unique
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              store both in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> output table:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Result would be
>>>>>>> <A-a,
>>>>>>>>>>>>> join(X,1)>,
>>>>>>>>>>>>>>>> <A-b,
>>>>>>>>>>>>>>>>>>>>>>>>>> join(X,2)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> On 9/4/18 1:36 PM,
>>>>>>> Jan
>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>> Just on remark here.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> The high-watermark
>>>>>>>>> could
>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> disregarded.
>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>>> decision
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              about the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> forward
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> depends on the
>>>>>>> size of
>>>>>>>>>> the
>>>>>>>>>>>>>>>> aggregated
>>>>>>>>>>>>>>>>>>>>> map.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> Only 1 element long
>>>>>>>>> maps
>>>>>>>>>>>> would
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> unpacked
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              forwarded. 0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > element
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> maps
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> would be published
>>>>>>> as
>>>>>>>>>>> delete.
>>>>>>>>>>>>> Any
>>>>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>> count
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> of map entries is
>>>>>>> in
>>>>>>>>>>> "waiting
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> correct
>>>>>>>>>>>>>>>>>>>>>>>>>> deletes to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > arrive"-state.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> On 04.09.2018
>>>>>>> 21:29,
>>>>>>>>> Adam
>>>>>>>>>>>>>> Bellemare
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> It does look like I
>>>>>>>>> could
>>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              repartition store
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>> highwater store
>>>>>>> with
>>>>>>>>> a
>>>>>>>>>>>> groupBy
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> reduce.
>>>>>>>>>>>>>>>>>>>>>>>>>> However,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              it looks
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>> still need to
>>>>>>> store
>>>>>>>>> the
>>>>>>>>>>>>>> highwater
>>>>>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>>>>>>>> within
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              materialized
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>> store,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> compare the
>>>>>>> arrival of
>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>>>>>>>>>> (assuming
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> understanding
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> THIS is
>>>>>>> correct...).
>>>>>>>>> This
>>>>>>>>>>> in
>>>>>>>>>>>>>> effect
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              design I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> now,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>> just with the two
>>>>>>>>> tables
>>>>>>>>>>>> merged
>>>>>>>>>>>>>>>>>>> together.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>              >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Mime
View raw message