From dev-return-100726-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jan 2 23:44:59 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 78B53180662 for ; Wed, 2 Jan 2019 23:44:56 +0100 (CET) Received: (qmail 82698 invoked by uid 500); 2 Jan 2019 22:44:50 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 82681 invoked by uid 99); 2 Jan 2019 22:44:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jan 2019 22:44:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E1454C0313 for ; Wed, 2 Jan 2019 22:44:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.799 X-Spam-Level: * X-Spam-Status: No, score=1.799 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent.io Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ZLT6p7g637zm for ; Wed, 2 Jan 2019 22:44:37 +0000 (UTC) Received: from mail-wm1-f49.google.com (mail-wm1-f49.google.com [209.85.128.49]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 7B2DE5F514 for ; Wed, 2 Jan 2019 22:44:37 +0000 (UTC) Received: by mail-wm1-f49.google.com with SMTP id n190so27894084wmd.0 for ; Wed, 02 Jan 2019 14:44:37 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent.io; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=u/VMOBav628Sm5w1gE4ZKN82sAjXefAa+4VF6uBopkQ=; b=CxbYlVjTO/LmmdmEZMnxY269IddOIuWCzi03iTgfxlYLx/9d4UeMGS4bEskUfmPbHi ++gTO90lHcHOG3ZllL9r9I6OezhguSo2mgssyIE4e3dIEPrNUI5duMBSd6BHWC3fbCKA wV+t4s7vECOHlpAx4yTGr0r1sIyxL5P+5+LUq2qOhmcw5oO5A7BzFCEwlTvWXt8s+a9r TLRwJgguaziJGXEiiX2KAooYt22pGzta2my393OpXh3XjxcN6RkK3XiE3OcDBylODxD5 KTmAgGoDIGQT0/p7Ok2AH2yR5phyBqMhcK4HwkqrnBvO3cdik1kEy6ijN0KBn/xHH3YR 0NAw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=u/VMOBav628Sm5w1gE4ZKN82sAjXefAa+4VF6uBopkQ=; b=nHUmp1Nr5xzZiMgqLvsh0l770+yYyiQ+2wvBbB4h2PQARkm9dummvkDpWFmgLUadq+ yxjNI4WEDFopMbeNuUdZ4LHwhHZo18Kji68G84dD2GI7rUh2YasFzmSqapCGBzxgbnuf +JxiPiNMxIxch83+CNjnHsunI1mvjM1N4wwxUOAFeX263b7rZWj4BKO4kGfQsfXzycXR g9whR5FKrgZInne3KWXAavsFgv4JMy0Zkv0gKx2btqFKTtz0caGRf0aMqloKnOyYKK0W MltrG9z3fLm/LX639XZpE+4XS3SNFhcBpM5rHO4Wk3xeUx5QGGTzEx3ZprAAfMhHvCEE Pihw== X-Gm-Message-State: AJcUukdUczUZjUsskWf7zYIHdcgjCjMTtWt17rjqnCcdydbxBiruu1x2 gTfUxkUZcgdtvg8WnPF4tmWNRMkBG7R5UJgasqvQFAoA5Zw= X-Google-Smtp-Source: ALg8bN4C6khiI2wjmxGZPmfq56vWvqksTwsADY6NNnfi4TuAq3kEyY+ONEOaCaSdGbhDARh2ti7Sbajw3gURMc5tReQ= X-Received: by 2002:a1c:2007:: with SMTP id g7mr35327563wmg.79.1546469075738; Wed, 02 Jan 2019 14:44:35 -0800 (PST) MIME-Version: 1.0 References: <5C0853D2.6050903@trivago.com> <5C0E238A.5030907@trivago.com> <5C2528BA.5080407@trivago.com> <5C2D19D0.8080809@trivago.com> In-Reply-To: From: John Roesler Date: Wed, 2 Jan 2019 16:44:21 -0600 Message-ID: Subject: Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted. To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="000000000000ea7f24057e81637c" --000000000000ea7f24057e81637c Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Jan and Adam, Good to hear from you both! Happy new year! Hey Jan, I'm sorry you feel that we aren't listening to you. I was hoping to convey that we did understand your proposal, but simply differed in opinion at the end of the day, I agree with your assessment that your proposal is "eventually correct", which is the same guarantee that all the other proposals have offered. It also seems like your proposal does create an opportunity for developers to implement optimizations, particularly when chaining together multiple joins. API: However, this feature does come at the expense of adding a whole new KScatteredTable interface and the corresponding mental overhead of picturin= g the data flow to make sense of it and understand how to use it in both single and multiple join scenarios. I suppose there's an argument that developers should *always* bear in mind the underlying implementation details of the systems they use. Practically speaking, though, it seems like the best systems are ones that don't require detailed internal knowledge to be used productively. I'm not sure how to express that I'm strongly sympathetic to the point of view that we shouldn't design to prohibit optimization. And that the promise of unspecified future possible internal optimizations may never actually bear fruit. Efficiency: In this case, I haven't seen or been able to build for myself a strong case that the scattered table API would actually be more efficient, even in the presence of multiple chained joins. It's true that you get to amortize the cost of the post-join group/gather operation over the number of chained joins. But on the other hand, it requires sending the higher-cardinality data over the wire, instead of the lower-cardinality data. And it requires maintaining a linear amount of candidate join result data (in the "map" in your example) to perform the final resolution. I decided not to bring this up in the KIP discussion before, since it sounded like everyone liked the latest proposal, but there is an additional optimization available to the current proposal: https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+j= oining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Workflow In step 2, we are only sending the primary and foreign key data over the wire, so if we really want to pare down the amount of traffic and computation, we can check first whether the FK has actually changed. If we also materialize step 6, the right kTable values, then we can avoid any network or right-side range scans and simply recompute the join result directly in the left-side Joiner (8) when there's any change on the left side data that doesn't affect the PK->FK mapping. As you pointed out, we have to make some kind of decision in the absence of any good data regarding the actual data set or workload. Here are the assumptions I'm currently operating with: * the left side is higher cardinality than the right side * the higher cardinality data would change more frequently * (step 2) sending only PK+FK is smaller than sending the whole left-side record * broadcasting the right-side data only on right-side changes and FK changes is less than broadcasting the join result on all changes * comparing the records in step 6 for FK consistency with the left-side table is cheaper than maintaining a map of all recent join results to resolve disordering These are all assumptions for sure, but I think they are reasonable ones. Together, they mean that when you're joining just two tables, the single-operator join is also more efficient. I've been trying to do some math to determine if this holds up for three tables as well, but it's murky with all the unbound terms. Tentatively, I actually do think that the single-operator join is also likely to be more efficient for three tables. Happy to provide more detail if you don't buy this. However, you seem to have a strong intuition that the scatter/gather approach is better. Is this informed by your actual applications at work? Perhaps you can provide an example data set and sequence of operations so we can all do the math and agree with you. It seems like we should have a convincing efficiency argument before choosing a more complicated API over a simpler one. Last thought: > Regarding what will be observed. I consider it a plus that all events > that are in the inputs have an respective output. Whereas your solution > might "swallow" events. I didn't follow this. Following Adam's example, we have two join results: a "dead" one and a "live" one. If we get the dead one first, both solutions emit it, followed by the live result. If we get the dead result second, both solutions should suppress it. In you= r proposal, both the dead and live result would be stored in that map, but the groupBy operator must not emit the dead result after the live one in any case. I guess you were referring only to the fact that the scattered table emits does not swallow any events? This seems partially unrelated, since the join is still incomplete at that point. Thanks for your time to help us get this right! -John On Wed, Jan 2, 2019 at 2:36 PM Adam Bellemare wrote: > Hi Jan > > Ahh, I got it! It is deterministic once you apply the groupBy function yo= u > mentioned a few months ago to the output, but not before you apply it... > correct? I was not thinking about the groupBy function. > > Here's how I understand how it could work from an API perspective: I am > going to use the terminology "KScatteredTable" to represent the > intermediate table that is not yet resolved - basically the join was > performed but no race condition handling is done. > > If I wanted to join three KTables together on foreign keys, one of the wa= ys > I could do it is: > > KScatteredTable scatteredOne =3D ktableOne.oneToManyJoin(kTableTwo, > joinerFuncTwo, foreignKeyExtractorTwo); > KScatteredTable scatteredTwo =3D scatteredOne.oneToManyJoin(kTableThree, > joinerFuncThree, foreignKeyExtractorThree) > > //Now I groupBy the key that I want to obtain, and I can resolve the out = of > order dependencies here. > scatteredTwo.groupBy( keyValueMapper ) ( shown here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key= +joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-SolutionB-User-Mana= gedGroupBy(Jan's) > ) > > Is this in line with what you're doing? Can this be done without exposing > the CombinedKey? As you mentioned before "A Table > KTable,JoinedResult> is not a good return type. It break= s > the KTable invariant that a table is currently partitioned by its key". > With that being said, are the only two operations that a KScatteredTable > would need to support be oneToManyJoin and groupBy? > > Thanks for your thoughts > > Adam > > > On Wed, Jan 2, 2019 at 3:07 PM Jan Filipiak > wrote: > > > Hi Adam, > > > > I am kinda surprised! Yes my solution of course is correct. Don't reall= y > > know what to show in an example as I am convinced you grabbed the > > concept of how mine works, > > > > If there is a race condition there is a race condition. It doesn't > > matter if there is 10 minutes or milliseconds between events. Either > > they are properly guarded or not. My solution has no such race > > condition. It is 'eventual consistent'. You gonna see all sort of stuff > > coming up during a reprocess. > > > > The user can still fk it up later though. But that is usual business. > > > > In reality I try to supress updates from left sides as long as possible > > because right side updates are more expensive if left is already > > fullish. So that limits the space a little but there are no grantees. > > The result however, after lag is zero is the same every time. > > > > The trade-offs can be shifted as you like. My solution gives full power > > to the user and only does a minimum in the framework. You push > > everything into streams. > > > > If you ask me, not a good choice. Will anyone listen. No. > > I do actually think its to late to do my way. It's not like if you > > haven't been gone through the effort and building it. > > > > Just wanted to give you guys another chance, to think it through ;) > > > > Regarding what will be observed. I consider it a plus that all events > > that are in the inputs have an respective output. Whereas your solution > > might "swallow" events. > > > > Best Jan > > > > > > On 02.01.2019 15:30, Adam Bellemare wrote: > > > Jan > > > > > > I have been thinking a lot about the history of the discussion and yo= ur > > > original proposal, and why you believe it is a better solution. The > > biggest > > > problem with your original proposed design is that it seems to me to = be > > > non-deterministic. It is subject to race conditions that are dependen= t > > > entirely on the data, and without resolution of these races you can e= nd > > up > > > with different results each time. If I am mistaken and this is indeed > > > deterministic, then please let me know and provide an explanation, > > ideally > > > with an example. > > > > > > The way I see it is that you will get very different answers to your > > > non-race-condition-resolved join topology, especially if you are > nesting > > it > > > with additional joins as you have indicated you are doing. Consider > > > rebuilding an application state from the beginning of two topics. If > the > > > left/this side has multiple foreign-key changes in a row, spaced out > > every > > > ten minutes, you may see something like this: > > > > > > (foo, foreignKey=3Dred) t=3D0 > > > (foo, foreignKey=3Dblue) t=3D0+10m > > > (foo, foreignKey=3Dgreen) t=3D0+20m > > > (foo, foreignKey=3Dpurple) t=3D0+30m > > > (foo, foreignKey=3Dblue) t=3D0+40m > > > (foo, foreignKey=3Dwhite) t=3D0+50m > > > > > > During realtime processing, all of the updates may have correctly > > > propagated because it took less than 10 minutes to resolve each join. > > Upon > > > rebuilding from the start, however, all of these events would be > > processed > > > in quick succession. The presence or absence of data will affect the > > > results of your join, and the results can vary with each run dependin= g > on > > > the data. Because of this, I cannot support any kind of solution that > > would > > > allow the exposure of an unresolved intermediate state. I can > understand > > if > > > you don't support this, but this is why, as you said, you have the > > freedom > > > to use the Processor API. > > > > > > > > > With that being said, either the solution that I originally proposed > > > (join's ocurring on the foreign node) or John + Guozhang's solution > > > (registering with the foreign node for notifications) is fine with me= - > > > both have the same API and we can evaluate it further during > > implementation. > > > > > > > > > Thanks > > > > > > Adam > > > > > > On Thu, Dec 27, 2018 at 2:38 PM Jan Filipiak > > > > wrote: > > > > > >> Hi, > > >> > > >> just want to let you guys know that this thing is spiralling out of > > >> control if you ask me. > > >> > > >> First you take away the possibility for the user to optimize. Now yo= u > > >> pile up complexity to perform some afterwards optimisation, that fro= m > my > > >> POV completely misses the point. As if the actual call to the joiner > > >> really gonna be an expensive part. It wont. Truth is, you don't have= a > > >> clue which side is gonna be smaller. might be the key you shuffle > around > > >> is >>> than the value on the other side already. > > >> > > >> You know my opinion on this. For me its dead, I just leave you the > > >> message here as an opportunity to reconsider the choices that were > made. > > >> > > >> Whish y'll a happy new year :) > > >> > > >> > > >> > > >> > > >> > > >> > > >> On 27.12.2018 17:22, Adam Bellemare wrote: > > >>> Hi All > > >>> > > >>> Sorry for the delay - holidays and all. I have since updated the KI= P > > with > > >>> John's original suggestion and have pruned a number of the no longe= r > > >>> relevant diagrams. Any more comments would be welcomed, otherwise I > > will > > >>> look to kick off the vote again shortly. > > >>> > > >>> Thanks > > >>> Adam > > >>> > > >>> On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare < > > adam.bellemare@gmail.com > > >>> > > >>> wrote: > > >>> > > >>>> Hi John and Guozhang > > >>>> > > >>>> Ah yes, I lost that in the mix! Thanks for the convergent solution= s > - > > I > > >> do > > >>>> think that the attachment that John included makes for a better > > design. > > >> It > > >>>> should also help with overall performance as very high-cardinality > > >> foreign > > >>>> keyed data (say millions of events with the same entity) will be > able > > to > > >>>> leverage the multiple nodes for join functionality instead of havi= ng > > it > > >> all > > >>>> performed in one node. There is still a bottleneck in the right > table > > >>>> having to propagate all those events, but with slimmer structures, > > less > > >> IO > > >>>> and no need to perform the join I think the throughput will be muc= h > > >> higher > > >>>> in those scenarios. > > >>>> > > >>>> Okay, I am convinced. I will update the KIP accordingly to a Gliff= y > > >>>> version of John's diagram and ensure that the example flow matches > > >>>> correctly. Then I can go back to working on the PR to match the > > diagram. > > >>>> > > >>>> Thanks both of you for all the help - very much appreciated. > > >>>> > > >>>> Adam > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang > > >> wrote: > > >>>> > > >>>>> Hi John, > > >>>>> > > >>>>> Just made a pass on your diagram (nice hand-drawing btw!), and > > >> obviously > > >>>>> we > > >>>>> are thinking about the same thing :) A neat difference that I lik= e, > > is > > >>>>> that > > >>>>> in the pre-join repartition topic we can still send message in th= e > > >> format > > >>>>> of `K=3Dk, V=3D(i=3D2)` while using "i" as the partition key in > > >>>>> StreamsPartition, > > >>>>> this way we do not need to even augment the key for the repartiti= on > > >> topic, > > >>>>> but just do a projection on the foreign key part but trim all oth= er > > >>>>> fields: > > >>>>> as long as we still materialize the store as `A-2` co-located wit= h > > the > > >>>>> right KTable, that is fine. > > >>>>> > > >>>>> As I mentioned in my previous email, I also think this has a few > > >>>>> advantages > > >>>>> on saving over-the-wire bytes as well as disk bytes. > > >>>>> > > >>>>> Guozhang > > >>>>> > > >>>>> > > >>>>> On Mon, Dec 17, 2018 at 3:17 PM John Roesler > > >> wrote: > > >>>>> > > >>>>>> Hi Guozhang, > > >>>>>> > > >>>>>> Thanks for taking a look! I think Adam's already addressed your > > >>>>> questions > > >>>>>> as well as I could have. > > >>>>>> > > >>>>>> Hi Adam, > > >>>>>> > > >>>>>> Thanks for updating the KIP. It looks great, especially how all > the > > >>>>>> need-to-know information is right at the top, followed by the > > details. > > >>>>>> > > >>>>>> Also, thanks for that high-level diagram. Actually, now that I'm > > >> looking > > >>>>>> at it, I think part of my proposal got lost in translation, > > although I > > >>>>> do > > >>>>>> think that what you have there is also correct. > > >>>>>> > > >>>>>> I sketched up a crude diagram based on yours and attached it to > the > > >> KIP > > >>>>>> (I'm not sure if attached or inline images work on the mailing > > list): > > >>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/download/attachments/74684836/suggest= ion.png > > >>>>>> . It's also attached to this email for convenience. > > >>>>>> > > >>>>>> Hopefully, you can see how it's intended to line up, and which > parts > > >> are > > >>>>>> modified. > > >>>>>> At a high level, instead of performing the join on the right-han= d > > >> side, > > >>>>>> we're essentially just registering interest, like "LHS key A > wishes > > to > > >>>>>> receive updates for RHS key 2". Then, when there is a new > "interest" > > >> or > > >>>>> any > > >>>>>> updates to the RHS records, it "broadcasts" its state back to th= e > > LHS > > >>>>>> records who are interested in it. > > >>>>>> > > >>>>>> Thus, instead of sending the LHS values to the RHS joiner worker= s > > and > > >>>>> then > > >>>>>> sending the join results back to the LHS worke be co-partitioned > and > > >>>>>> validated, we instead only send the LHS *keys* to the RHS worker= s > > and > > >>>>> then > > >>>>>> only the RHS k/v back to be joined by the LHS worker. > > >>>>>> > > >>>>>> I've been considering both your diagram and mine, and I *think* > what > > >> I'm > > >>>>>> proposing has a few advantages. > > >>>>>> > > >>>>>> Here are some points of interest as you look at the diagram: > > >>>>>> * When we extract the foreign key and send it to the Pre-Join > > >>>>> Repartition > > >>>>>> Topic, we can send only the FK/PK pair. There's no need to worry > > about > > >>>>>> custom partitioner logic, since we can just use the foreign key > > >> plainly > > >>>>> as > > >>>>>> the repartition record key. Also, we save on transmitting the LH= S > > >> value, > > >>>>>> since we only send its key in this step. > > >>>>>> * We also only need to store the RHSKey:LHSKey mapping in the > > >>>>>> MaterializedSubscriptionStore, saving on disk. We can use the sa= me > > >> rocks > > >>>>>> key format you proposed and the same algorithm involving range > scans > > >>>>> when > > >>>>>> the RHS records get updated. > > >>>>>> * Instead of joining on the right side, all we do is compose a > > >>>>>> re-repartition record so we can broadcast the RHS k/v pair back = to > > the > > >>>>>> original LHS partition. (this is what the "rekey" node is doing) > > >>>>>> * Then, there is a special kind of Joiner that's co-resident in > the > > >> same > > >>>>>> StreamTask as the LHS table, subscribed to the Post-Join > Repartition > > >>>>> Topic. > > >>>>>> ** This Joiner is *not* triggered directly by any changes in the > LHS > > >>>>>> KTable. Instead, LHS events indirectly trigger the join via the > > whole > > >>>>>> lifecycle. > > >>>>>> ** For each event arriving from the Post-Join Repartition Topic, > the > > >>>>>> Joiner looks up the corresponding record in the LHS KTable. It > > >> validates > > >>>>>> the FK as you noted, discarding any inconsistent events. > Otherwise, > > it > > >>>>>> unpacks the RHS K/V pair and invokes the ValueJoiner to obtain t= he > > >> join > > >>>>>> result > > >>>>>> ** Note that the Joiner itself is stateless, so materializing th= e > > join > > >>>>>> result is optional, just as with the 1:1 joins. > > >>>>>> > > >>>>>> So in summary: > > >>>>>> * instead of transmitting the LHS keys and values to the right a= nd > > the > > >>>>>> JoinResult back to the left, we only transmit the LHS keys to th= e > > >> right > > >>>>> and > > >>>>>> the RHS values to the left. Assuming the average RHS value is on > > >> smaller > > >>>>>> than or equal to the average join result size, it's a clear win = on > > >>>>> broker > > >>>>>> traffic. I think this is actually a reasonable assumption, which > we > > >> can > > >>>>>> discuss more if you're suspicious. > > >>>>>> * we only need one copy of the data (the left and right tables > need > > to > > >>>>> be > > >>>>>> materialized) and one extra copy of the PK:FK pairs in the > > >> Materialized > > >>>>>> Subscription Store. Materializing the join result is optional, > just > > as > > >>>>> with > > >>>>>> the existing 1:1 joins. > > >>>>>> * we still need the fancy range-scan algorithm on the right to > > locate > > >>>>> all > > >>>>>> interested LHS keys when a RHS value is updated, but we don't > need a > > >>>>> custom > > >>>>>> partitioner for either repartition topic (this is of course a > > >>>>> modification > > >>>>>> we could make to your version as well) > > >>>>>> > > >>>>>> How does this sound to you? (And did I miss anything?) > > >>>>>> -John > > >>>>>> > > >>>>>> On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare < > > >>>>> adam.bellemare@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi John & Guozhang > > >>>>>>> > > >>>>>>> @John & @Guozhang Wang - I have cleaned up > > the > > >>>>> KIP, > > >>>>>>> pruned much of what I wrote and put a simplified diagram near t= he > > top > > >>>>> to > > >>>>>>> illustrate the workflow. I encapsulated Jan's content at the > bottom > > >> of > > >>>>> the > > >>>>>>> document. I believe it is simpler to read by far now. > > >>>>>>> > > >>>>>>> @Guozhang Wang : > > >>>>>>>> #1: rekey left table > > >>>>>>>> -> source from the left upstream, send to rekey-processor = to > > >>>>> generate > > >>>>>>> combined key, and then sink to copartition topic. > > >>>>>>> Correct. > > >>>>>>> > > >>>>>>>> #2: first-join with right table > > >>>>>>>> -> source from the right table upstream, materialize the > right > > >>>>> table. > > >>>>>>>> -> source from the co-partition topic, materialize the > rekeyed > > >> left > > >>>>>>> table, join with the right table, rekey back, and then sink to > the > > >>>>>>> rekeyed-back topic. > > >>>>>>> Almost - I cleared up the KIP. We do not rekey back yet, as I > need > > >> the > > >>>>>>> Foreign-Key value generated in #1 above to compare in the > > resolution > > >>>>>>> stage. > > >>>>>>> > > >>>>>>>> #3: second join > > >>>>>>>> -> source from the rekeyed-back topic, materialize the > > rekeyed > > >>>>> back > > >>>>>>> table. > > >>>>>>>> -> source from the left upstream, materialize the left > table, > > >> join > > >>>>>>> with > > >>>>>>> the rekeyed back table. > > >>>>>>> Almost - As each event comes in, we just run it through a > stateful > > >>>>>>> processor that checks the original ("This") KTable for the key. > The > > >>>>> value > > >>>>>>> payload then has the foreignKeyExtractor applied again as in Pa= rt > > #1 > > >>>>>>> above, > > >>>>>>> and gets the current foreign key. Then we compare it to the > joined > > >>>>> event > > >>>>>>> that we are currently resolving. If they have the same > foreign-key, > > >>>>>>> propagate the result out. If they don't, throw the event away. > > >>>>>>> > > >>>>>>> The end result is that we do need to materialize 2 additional > > tables > > >>>>>>> (left/this-combinedkey table, and the final Joined table) as I'= ve > > >>>>>>> illustrated in the updated KIP. I hope the diagram clears it up= a > > lot > > >>>>>>> better. Please let me know. > > >>>>>>> > > >>>>>>> Thanks again > > >>>>>>> Adam > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang < > wangguoz@gmail.com> > > >>>>> wrote: > > >>>>>>> > > >>>>>>>> John, > > >>>>>>>> > > >>>>>>>> Thanks a lot for the suggestions on refactoring the wiki, I > agree > > >>>>> with > > >>>>>>> you > > >>>>>>>> that we should consider the KIP proposal to be easily understo= od > > by > > >>>>>>> anyone > > >>>>>>>> in the future to read, and hence should provide a good summary > on > > >> the > > >>>>>>>> user-facing interfaces, as well as rejected alternatives to > > >> represent > > >>>>>>>> briefly "how we came a long way to this conclusion, and what w= e > > have > > >>>>>>>> argued, disagreed, and agreed about, etc" so that readers do n= ot > > >>>>> need to > > >>>>>>>> dig into the DISCUSS thread to get all the details. We can, of > > >>>>> course, > > >>>>>>> keep > > >>>>>>>> the implementation details like "workflows" on the wiki page a= s > a > > >>>>>>> addendum > > >>>>>>>> section since it also has correlations. > > >>>>>>>> > > >>>>>>>> Regarding your proposal on comment 6): that's a very interesti= ng > > >>>>> idea! > > >>>>>>> Just > > >>>>>>>> to clarify that I understands it fully correctly: the proposal= 's > > >>>>>>> resulted > > >>>>>>>> topology is still the same as the current proposal, where we > will > > >>>>> have 3 > > >>>>>>>> sub-topologies for this operator: > > >>>>>>>> > > >>>>>>>> #1: rekey left table > > >>>>>>>> -> source from the left upstream, send to rekey-processor > to > > >>>>> generate > > >>>>>>>> combined key, and then sink to copartition topic. > > >>>>>>>> > > >>>>>>>> #2: first-join with right table > > >>>>>>>> -> source from the right table upstream, materialize the > > right > > >>>>> table. > > >>>>>>>> -> source from the co-partition topic, materialize the > > rekeyed > > >>>>> left > > >>>>>>>> table, join with the right table, rekey back, and then sink to > the > > >>>>>>>> rekeyed-back topic. > > >>>>>>>> > > >>>>>>>> #3: second join > > >>>>>>>> -> source from the rekeyed-back topic, materialize the > > rekeyed > > >>>>> back > > >>>>>>>> table. > > >>>>>>>> -> source from the left upstream, materialize the left > table, > > >> join > > >>>>>>> with > > >>>>>>>> the rekeyed back table. > > >>>>>>>> > > >>>>>>>> Sub-topology #1 and #3 may be merged to a single sub-topology > > since > > >>>>>>> both of > > >>>>>>>> them read from the left table source stream. In this workflow, > we > > >>>>> need > > >>>>>>> to > > >>>>>>>> materialize 4 tables (left table in #3, right table in #2, > rekeyed > > >>>>> left > > >>>>>>>> table in #2, rekeyed-back table in #3), and 2 repartition topi= cs > > >>>>>>>> (copartition topic, rekeyed-back topic). > > >>>>>>>> > > >>>>>>>> Compared with Adam's current proposal in the workflow overview= , > it > > >>>>> has > > >>>>>>> the > > >>>>>>>> same num.materialize tables (left table, rekeyed left table, > right > > >>>>>>> table, > > >>>>>>>> out-of-ordering resolver table), and same num.internal topics > > (two). > > >>>>> The > > >>>>>>>> advantage is that on the copartition topic, we can save > bandwidth > > by > > >>>>> not > > >>>>>>>> sending value, and in #2 the rekeyed left table is smaller sin= ce > > we > > >>>>> do > > >>>>>>> not > > >>>>>>>> have any values to materialize. Is that right? > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Wed, Dec 12, 2018 at 1:22 PM John Roesler > > > >>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Adam, > > >>>>>>>>> > > >>>>>>>>> Given that the committers are all pretty busy right now, I > think > > >>>>> that > > >>>>>>> it > > >>>>>>>>> would help if you were to refactor the KIP a little to reduce > the > > >>>>>>>> workload > > >>>>>>>>> for reviewers. > > >>>>>>>>> > > >>>>>>>>> I'd recommend the following changes: > > >>>>>>>>> * relocate all internal details to a section at the end calle= d > > >>>>>>> something > > >>>>>>>>> like "Implementation Notes" or something like that. > > >>>>>>>>> * rewrite the rest of the KIP to be a succinct as possible an= d > > >>>>> mention > > >>>>>>>> only > > >>>>>>>>> publicly-facing API changes. > > >>>>>>>>> ** for example, the interface that you've already listed ther= e, > > as > > >>>>>>> well > > >>>>>>>> as > > >>>>>>>>> a textual description of the guarantees we'll be providing > (join > > >>>>>>> result > > >>>>>>>> is > > >>>>>>>>> copartitioned with the LHS, and the join result is guaranteed > > >>>>> correct) > > >>>>>>>>> > > >>>>>>>>> A good target would be that the whole main body of the KIP, > > >>>>> including > > >>>>>>>>> Status, Motivation, Proposal, Justification, and Rejected > > >>>>> Alternatives > > >>>>>>>> all > > >>>>>>>>> fit "above the fold" (i.e., all fit on the screen at a > > comfortable > > >>>>>>> zoom > > >>>>>>>>> level). > > >>>>>>>>> I think the only real Rejected Alternative that bears mention > at > > >>>>> this > > >>>>>>>> point > > >>>>>>>>> is KScatteredTable, which you could just include the executiv= e > > >>>>>>> summary on > > >>>>>>>>> (no implementation details), and link to extra details in the > > >>>>>>>>> Implementation Notes section. > > >>>>>>>>> > > >>>>>>>>> Taking a look at the wiki page, ~90% of the text there is > > internal > > >>>>>>>> detail, > > >>>>>>>>> which is useful for the dubious, but doesn't need to be > ratified > > >>>>> in a > > >>>>>>>> vote > > >>>>>>>>> (and would be subject to change without notice in the future > > >>>>> anyway). > > >>>>>>>>> There's also a lot of conflicting discussion, as you've very > > >>>>>>> respectfully > > >>>>>>>>> tried to preserve the original proposal from Jan while adding > > your > > >>>>>>> own. > > >>>>>>>>> Isolating all this information in a dedicated section at the > > bottom > > >>>>>>> frees > > >>>>>>>>> the voters up to focus on the public API part of the proposal= , > > >>>>> which > > >>>>>>> is > > >>>>>>>>> really all they need to consider. > > >>>>>>>>> > > >>>>>>>>> Plus, it'll be clear to future readers which parts of the > > document > > >>>>> are > > >>>>>>>>> enduring, and which parts are a snapshot of our implementatio= n > > >>>>>>> thinking > > >>>>>>>> at > > >>>>>>>>> the time. > > >>>>>>>>> > > >>>>>>>>> I'm suggesting this because I suspect that the others haven't > > made > > >>>>>>> time > > >>>>>>>> to > > >>>>>>>>> review it partly because it seems daunting. If it seems like = it > > >>>>> would > > >>>>>>> be > > >>>>>>>> a > > >>>>>>>>> huge time investment to review, people will just keep putting > it > > >>>>> off. > > >>>>>>> But > > >>>>>>>>> if the KIP is a single page, then they'll be more inclined to > > give > > >>>>> it > > >>>>>>> a > > >>>>>>>>> read. > > >>>>>>>>> > > >>>>>>>>> Honestly, I don't think the KIP itself is that controversial > > (apart > > >>>>>>> from > > >>>>>>>>> the scattered table thing (sorry, Jan) ). Most of the > discussion > > >>>>> has > > >>>>>>> been > > >>>>>>>>> around the implementation, which we can continue more > effectively > > >>>>> in > > >>>>>>> a PR > > >>>>>>>>> once the KIP has passed. > > >>>>>>>>> > > >>>>>>>>> How does that sound? > > >>>>>>>>> -John > > >>>>>>>>> > > >>>>>>>>> On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare < > > >>>>>>> adam.bellemare@gmail.com > > >>>>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> 1) I believe that the resolution mechanism John has proposed > is > > >>>>>>>>> sufficient > > >>>>>>>>>> - it is clean and easy and doesn't require additional RocksD= B > > >>>>>>> stores, > > >>>>>>>>> which > > >>>>>>>>>> reduces the footprint greatly. I don't think we need to > resolve > > >>>>>>> based > > >>>>>>>> on > > >>>>>>>>>> timestamp or offset anymore, but if we decide to do to that > > >>>>> would be > > >>>>>>>>> within > > >>>>>>>>>> the bounds of the existing API. > > >>>>>>>>>> > > >>>>>>>>>> 2) Is the current API sufficient, or does it need to be > altered > > >>>>> to > > >>>>>>> go > > >>>>>>>>> back > > >>>>>>>>>> to vote? > > >>>>>>>>>> > > >>>>>>>>>> 3) KScatteredTable implementation can always be added in a > > future > > >>>>>>>>> revision. > > >>>>>>>>>> This API does not rule it out. This implementation of this > > >>>>> function > > >>>>>>>> would > > >>>>>>>>>> simply be replaced with `KScatteredTable.resolve()` while > still > > >>>>>>>>> maintaining > > >>>>>>>>>> the existing API, thereby giving both features as Jan outlin= ed > > >>>>>>> earlier. > > >>>>>>>>>> Would this work? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Thanks Guozhang, John and Jan > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Mon, Dec 10, 2018 at 10:39 AM John Roesler < > > john@confluent.io > > >>>>>> > > >>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi, all, > > >>>>>>>>>>> > > >>>>>>>>>>>>> In fact, we > > >>>>>>>>>>>>> can just keep a single final-result store with timestamps > > >>>>> and > > >>>>>>>> reject > > >>>>>>>>>>> values > > >>>>>>>>>>>>> that have a smaller timestamp, is that right? > > >>>>>>>>>>> > > >>>>>>>>>>>> Which is the correct output should at least be decided on > the > > >>>>>>>> offset > > >>>>>>>>> of > > >>>>>>>>>>>> the original message. > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks for this point, Jan. > > >>>>>>>>>>> > > >>>>>>>>>>> KIP-258 is merely to allow embedding the record timestamp = in > > >>>>> the > > >>>>>>> k/v > > >>>>>>>>>>> store, > > >>>>>>>>>>> as well as providing a storage-format upgrade path. > > >>>>>>>>>>> > > >>>>>>>>>>> I might have missed it, but I think we have yet to discuss > > >>>>> whether > > >>>>>>>> it's > > >>>>>>>>>>> safe > > >>>>>>>>>>> or desirable just to swap topic-ordering our for > > >>>>>>> timestamp-ordering. > > >>>>>>>>> This > > >>>>>>>>>>> is > > >>>>>>>>>>> a very deep topic, and I think it would only pollute the > > >>>>> current > > >>>>>>>>>>> discussion. > > >>>>>>>>>>> > > >>>>>>>>>>> What Adam has proposed is safe, given the *current* orderin= g > > >>>>>>>> semantics > > >>>>>>>>>>> of the system. If we can agree on his proposal, I think we > can > > >>>>>>> merge > > >>>>>>>>> the > > >>>>>>>>>>> feature well before the conversation about timestamp orderi= ng > > >>>>> even > > >>>>>>>>> takes > > >>>>>>>>>>> place, much less reaches a conclusion. In the mean time, it > > >>>>> would > > >>>>>>>> seem > > >>>>>>>>> to > > >>>>>>>>>>> be unfortunate to have one join operator with different > > >>>>> ordering > > >>>>>>>>>> semantics > > >>>>>>>>>>> from every other KTable operator. > > >>>>>>>>>>> > > >>>>>>>>>>> If and when that timestamp discussion takes place, many > (all?) > > >>>>>>> KTable > > >>>>>>>>>>> operations > > >>>>>>>>>>> will need to be updated, rendering the many:one join a smal= l > > >>>>>>> marginal > > >>>>>>>>>> cost. > > >>>>>>>>>>> > > >>>>>>>>>>> And, just to plug it again, I proposed an algorithm above > that > > >>>>> I > > >>>>>>>>> believe > > >>>>>>>>>>> provides > > >>>>>>>>>>> correct ordering without any additional metadata, and > > >>>>> regardless > > >>>>>>> of > > >>>>>>>> the > > >>>>>>>>>>> ordering semantics. I didn't bring it up further, because I > > >>>>> felt > > >>>>>>> the > > >>>>>>>>> KIP > > >>>>>>>>>>> only needs > > >>>>>>>>>>> to agree on the public API, and we can discuss the > > >>>>> implementation > > >>>>>>> at > > >>>>>>>>>>> leisure in > > >>>>>>>>>>> a PR... > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> -John > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak < > > >>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 10.12.2018 07:42, Guozhang Wang wrote: > > >>>>>>>>>>>>> Hello Adam / Jan / John, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Sorry for being late on this thread! I've finally got som= e > > >>>>>>> time > > >>>>>>>>> this > > >>>>>>>>>>>>> weekend to cleanup a load of tasks on my queue (actually > > >>>>> I've > > >>>>>>>> also > > >>>>>>>>>>>> realized > > >>>>>>>>>>>>> there are a bunch of other things I need to enqueue while > > >>>>>>>> cleaning > > >>>>>>>>>> them > > >>>>>>>>>>>> up > > >>>>>>>>>>>>> --- sth I need to improve on my side). So here are my > > >>>>>>> thoughts: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Regarding the APIs: I like the current written API in the > > >>>>> KIP. > > >>>>>>>> More > > >>>>>>>>>>>>> generally I'd prefer to keep the 1) one-to-many join > > >>>>>>>>> functionalities > > >>>>>>>>>> as > > >>>>>>>>>>>>> well as 2) other join types than inner as separate KIPs > > >>>>> since > > >>>>>>> 1) > > >>>>>>>>> may > > >>>>>>>>>>>> worth > > >>>>>>>>>>>>> a general API refactoring that can benefit not only > > >>>>> foreignkey > > >>>>>>>>> joins > > >>>>>>>>>>> but > > >>>>>>>>>>>>> collocate joins as well (e.g. an extended proposal of > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams= +Cogroup > > >>>>>>>>>>>> ), > > >>>>>>>>>>>>> and I'm not sure if other join types would actually be > > >>>>> needed > > >>>>>>>>> (maybe > > >>>>>>>>>>> left > > >>>>>>>>>>>>> join still makes sense), so it's better to > > >>>>>>>>>>> wait-for-people-to-ask-and-add > > >>>>>>>>>>>>> than add-sth-that-no-one-uses. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Regarding whether we enforce step 3) / 4) v.s. introducin= g > > >>>>> a > > >>>>>>>>>>>>> KScatteredTable for users to inject their own optimizatio= n: > > >>>>>>> I'd > > >>>>>>>>>> prefer > > >>>>>>>>>>> to > > >>>>>>>>>>>>> do the current option as-is, and my main rationale is for > > >>>>>>>>>> optimization > > >>>>>>>>>>>>> rooms inside the Streams internals and the API > > >>>>> succinctness. > > >>>>>>> For > > >>>>>>>>>>> advanced > > >>>>>>>>>>>>> users who may indeed prefer KScatteredTable and do their > > >>>>> own > > >>>>>>>>>>>> optimization, > > >>>>>>>>>>>>> while it is too much of the work to use Processor API > > >>>>>>> directly, I > > >>>>>>>>>> think > > >>>>>>>>>>>> we > > >>>>>>>>>>>>> can still extend the current API to support it in the > > >>>>> future > > >>>>>>> if > > >>>>>>>> it > > >>>>>>>>>>>> becomes > > >>>>>>>>>>>>> necessary. > > >>>>>>>>>>>> > > >>>>>>>>>>>> no internal optimization potential. it's a myth > > >>>>>>>>>>>> > > >>>>>>>>>>>> =C2=AF\_(=E3=83=84)_/=C2=AF > > >>>>>>>>>>>> > > >>>>>>>>>>>> :-) > > >>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Another note about step 4) resolving out-of-ordering data= , > > >>>>> as > > >>>>>>> I > > >>>>>>>>>>> mentioned > > >>>>>>>>>>>>> before I think with KIP-258 (embedded timestamp with > > >>>>> key-value > > >>>>>>>>> store) > > >>>>>>>>>>> we > > >>>>>>>>>>>>> can actually make this step simpler than the current > > >>>>>>> proposal. In > > >>>>>>>>>> fact, > > >>>>>>>>>>>> we > > >>>>>>>>>>>>> can just keep a single final-result store with timestamps > > >>>>> and > > >>>>>>>>> reject > > >>>>>>>>>>>> values > > >>>>>>>>>>>>> that have a smaller timestamp, is that right? > > >>>>>>>>>>>> > > >>>>>>>>>>>> Which is the correct output should at least be decided on > the > > >>>>>>>> offset > > >>>>>>>>> of > > >>>>>>>>>>>> the original message. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> That's all I have in mind now. Again, great appreciation = to > > >>>>>>> Adam > > >>>>>>>> to > > >>>>>>>>>>> make > > >>>>>>>>>>>>> such HUGE progress on this KIP! > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak < > > >>>>>>>>>> Jan.Filipiak@trivago.com> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> If they don't find the time: > > >>>>>>>>>>>>>> They usually take the opposite path from me :D > > >>>>>>>>>>>>>> so the answer would be clear. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> hence my suggestion to vote. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On 04.12.2018 21:06, Adam Bellemare wrote: > > >>>>>>>>>>>>>>> Hi Guozhang and Matthias > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I know both of you are quite busy, but we've gotten thi= s > > >>>>> KIP > > >>>>>>>> to a > > >>>>>>>>>>> point > > >>>>>>>>>>>>>>> where we need more guidance on the API (perhaps a bit o= f > > >>>>> a > > >>>>>>>>>>> tie-breaker, > > >>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>> you will). If you have anyone else you may think should > > >>>>>>> look at > > >>>>>>>>>> this, > > >>>>>>>>>>>>>>> please tag them accordingly. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The scenario is as such: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Current Option: > > >>>>>>>>>>>>>>> API: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key= +joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces > > >>>>>>>>>>>>>>> 1) Rekey the data to CombinedKey, and shuffles it to th= e > > >>>>>>>>> partition > > >>>>>>>>>>> with > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> foreignKey (repartition 1) > > >>>>>>>>>>>>>>> 2) Join the data > > >>>>>>>>>>>>>>> 3) Shuffle the data back to the original node > > >>>>> (repartition > > >>>>>>> 2) > > >>>>>>>>>>>>>>> 4) Resolve out-of-order arrival / race condition due to > > >>>>>>>>> foreign-key > > >>>>>>>>>>>>>> changes. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Alternate Option: > > >>>>>>>>>>>>>>> Perform #1 and #2 above, and return a KScatteredTable. > > >>>>>>>>>>>>>>> - It would be keyed on a wrapped key function: > > >>>>>>> > >>>>>>>>> K>, > > >>>>>>>>>>> VR> > > >>>>>>>>>>>>>> (KO > > >>>>>>>>>>>>>>> =3D Other Table Key, K =3D This Table Key, VR =3D Joine= d > > >>>>> Result) > > >>>>>>>>>>>>>>> - KScatteredTable.resolve() would perform #3 and #4 but > > >>>>>>>>> otherwise a > > >>>>>>>>>>>> user > > >>>>>>>>>>>>>>> would be able to perform additional functions directly > > >>>>> from > > >>>>>>> the > > >>>>>>>>>>>>>>> KScatteredTable (TBD - currently out of scope). > > >>>>>>>>>>>>>>> - John's analysis 2-emails up is accurate as to the > > >>>>>>> tradeoffs. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Current Option is coded as-is. Alternate option is > > >>>>> possible, > > >>>>>>>> but > > >>>>>>>>>> will > > >>>>>>>>>>>>>>> require for implementation details to be made in the AP= I > > >>>>> and > > >>>>>>>> some > > >>>>>>>>>>>>>> exposure > > >>>>>>>>>>>>>>> of new data structures into the API (ie: CombinedKey). > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I appreciate any insight into this. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare < > > >>>>>>>>>>>> adam.bellemare@gmail.com> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi John > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for your feedback and assistance. I think your > > >>>>>>> summary > > >>>>>>>> is > > >>>>>>>>>>>>>> accurate > > >>>>>>>>>>>>>>>> from my perspective. Additionally, I would like to add > > >>>>> that > > >>>>>>>>> there > > >>>>>>>>>>> is a > > >>>>>>>>>>>>>> risk > > >>>>>>>>>>>>>>>> of inconsistent final states without performing the > > >>>>>>>> resolution. > > >>>>>>>>>> This > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>> major concern for me as most of the data I have dealt > > >>>>> with > > >>>>>>> is > > >>>>>>>>>>> produced > > >>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>> relational databases. We have seen a number of cases > > >>>>> where > > >>>>>>> a > > >>>>>>>>> user > > >>>>>>>>>> in > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> Rails UI has modified the field (foreign key), realize= d > > >>>>>>> they > > >>>>>>>>> made > > >>>>>>>>>> a > > >>>>>>>>>>>>>>>> mistake, and then updated the field again with a new > > >>>>> key. > > >>>>>>> The > > >>>>>>>>>> events > > >>>>>>>>>>>> are > > >>>>>>>>>>>>>>>> propagated out as they are produced, and as such we ha= ve > > >>>>>>> had > > >>>>>>>>>>>> real-world > > >>>>>>>>>>>>>>>> cases where these inconsistencies were propagated > > >>>>>>> downstream > > >>>>>>>> as > > >>>>>>>>>> the > > >>>>>>>>>>>>>> final > > >>>>>>>>>>>>>>>> values due to the race conditions in the fanout of the > > >>>>>>> data. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> This solution that I propose values correctness of the > > >>>>>>> final > > >>>>>>>>>> result > > >>>>>>>>>>>> over > > >>>>>>>>>>>>>>>> other factors. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> We could always move this function over to using a > > >>>>>>>>> KScatteredTable > > >>>>>>>>>>>>>>>> implementation in the future, and simply deprecate it > > >>>>> this > > >>>>>>>> join > > >>>>>>>>>> API > > >>>>>>>>>>> in > > >>>>>>>>>>>>>>>> time. I think I would like to hear more from some of t= he > > >>>>>>> other > > >>>>>>>>>> major > > >>>>>>>>>>>>>>>> committers on which course of action they would think = is > > >>>>>>> best > > >>>>>>>>>> before > > >>>>>>>>>>>> any > > >>>>>>>>>>>>>>>> more coding is done. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks again > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler < > > >>>>>>>> john@confluent.io> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi Jan and Adam, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Wow, thanks for doing that test, Adam. Those results > > >>>>> are > > >>>>>>>>>>> encouraging. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks for your performance experience as well, Jan. = I > > >>>>>>> agree > > >>>>>>>>> that > > >>>>>>>>>>>>>> avoiding > > >>>>>>>>>>>>>>>>> unnecessary join outputs is especially important when > > >>>>> the > > >>>>>>>>> fan-out > > >>>>>>>>>>> is > > >>>>>>>>>>>> so > > >>>>>>>>>>>>>>>>> high. I suppose this could also be built into the > > >>>>>>>>> implementation > > >>>>>>>>>>>> we're > > >>>>>>>>>>>>>>>>> discussing, but it wouldn't have to be specified in t= he > > >>>>>>> KIP > > >>>>>>>>>> (since > > >>>>>>>>>>>>>> it's an > > >>>>>>>>>>>>>>>>> API-transparent optimization). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> As far as whether or not to re-repartition the data, = I > > >>>>>>> didn't > > >>>>>>>>>> bring > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>> up > > >>>>>>>>>>>>>>>>> because it sounded like the two of you agreed to leav= e > > >>>>> the > > >>>>>>>> KIP > > >>>>>>>>>>> as-is, > > >>>>>>>>>>>>>>>>> despite the disagreement. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> If you want my opinion, I feel like both approaches a= re > > >>>>>>>>>> reasonable. > > >>>>>>>>>>>>>>>>> It sounds like Jan values more the potential for > > >>>>>>> developers > > >>>>>>>> to > > >>>>>>>>>>>> optimize > > >>>>>>>>>>>>>>>>> their topologies to re-use the intermediate nodes, > > >>>>> whereas > > >>>>>>>> Adam > > >>>>>>>>>>>> places > > >>>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>> value on having a single operator that people can use > > >>>>>>> without > > >>>>>>>>>> extra > > >>>>>>>>>>>>>> steps > > >>>>>>>>>>>>>>>>> at the end. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Personally, although I do find it exceptionally > > >>>>> annoying > > >>>>>>>> when a > > >>>>>>>>>>>>>> framework > > >>>>>>>>>>>>>>>>> gets in my way when I'm trying to optimize something, > > >>>>> it > > >>>>>>>> seems > > >>>>>>>>>>> better > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> go > > >>>>>>>>>>>>>>>>> for a single operation. > > >>>>>>>>>>>>>>>>> * Encapsulating the internal transitions gives us > > >>>>>>> significant > > >>>>>>>>>>>> latitude > > >>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>> the implementation (for example, joining only at the > > >>>>> end, > > >>>>>>> not > > >>>>>>>>> in > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> middle > > >>>>>>>>>>>>>>>>> to avoid extra data copying and out-of-order > > >>>>> resolution; > > >>>>>>> how > > >>>>>>>> we > > >>>>>>>>>>>>>> represent > > >>>>>>>>>>>>>>>>> the first repartition keys (combined keys vs. value > > >>>>>>> vectors), > > >>>>>>>>>>> etc.). > > >>>>>>>>>>>>>> If we > > >>>>>>>>>>>>>>>>> publish something like a KScatteredTable with the > > >>>>>>>>>> right-partitioned > > >>>>>>>>>>>>>> joined > > >>>>>>>>>>>>>>>>> data, then the API pretty much locks in the > > >>>>>>> implementation as > > >>>>>>>>>> well. > > >>>>>>>>>>>>>>>>> * The API seems simpler to understand and use. I do > > >>>>> mean > > >>>>>>>>> "seems"; > > >>>>>>>>>>> if > > >>>>>>>>>>>>>>>>> anyone > > >>>>>>>>>>>>>>>>> wants to make the case that KScatteredTable is actual= ly > > >>>>>>>>> simpler, > > >>>>>>>>>> I > > >>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>> hypothetical usage code would help. From a relational > > >>>>>>> algebra > > >>>>>>>>>>>>>> perspective, > > >>>>>>>>>>>>>>>>> it seems like KTable.join(KTable) should produce a ne= w > > >>>>>>> KTable > > >>>>>>>>> in > > >>>>>>>>>>> all > > >>>>>>>>>>>>>>>>> cases. > > >>>>>>>>>>>>>>>>> * That said, there might still be room in the API for= a > > >>>>>>>>> different > > >>>>>>>>>>>>>>>>> operation > > >>>>>>>>>>>>>>>>> like what Jan has proposed to scatter a KTable, and > > >>>>> then > > >>>>>>> do > > >>>>>>>>>> things > > >>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>> join, re-group, etc from there... I'm not sure; I > > >>>>> haven't > > >>>>>>>>> thought > > >>>>>>>>>>>>>> through > > >>>>>>>>>>>>>>>>> all the consequences yet. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> This is all just my opinion after thinking over the > > >>>>>>>> discussion > > >>>>>>>>> so > > >>>>>>>>>>>>>> far... > > >>>>>>>>>>>>>>>>> -John > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare < > > >>>>>>>>>>>>>> adam.bellemare@gmail.com> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Updated the PR to take into account John's feedback. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I did some preliminary testing for the performance o= f > > >>>>> the > > >>>>>>>>>>>> prefixScan. > > >>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>> have attached the file, but I will also include the > > >>>>> text > > >>>>>>> in > > >>>>>>>>> the > > >>>>>>>>>>> body > > >>>>>>>>>>>>>>>>> here > > >>>>>>>>>>>>>>>>>> for archival purposes (I am not sure what happens to > > >>>>>>>> attached > > >>>>>>>>>>>> files). > > >>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>> also updated the PR and the KIP accordingly. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Summary: It scales exceptionally well for scanning > > >>>>> large > > >>>>>>>>> values > > >>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> records. As Jan mentioned previously, the real issue > > >>>>>>> would > > >>>>>>>> be > > >>>>>>>>>> more > > >>>>>>>>>>>>>>>>> around > > >>>>>>>>>>>>>>>>>> processing the resulting records after obtaining the= m. > > >>>>>>> For > > >>>>>>>>>>> instance, > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>> takes approximately ~80-120 mS to flush the buffer > > >>>>> and a > > >>>>>>>>> further > > >>>>>>>>>>>>>>>>> ~35-85mS > > >>>>>>>>>>>>>>>>>> to scan 27.5M records, obtaining matches for 2.5M of > > >>>>>>> them. > > >>>>>>>>>>> Iterating > > >>>>>>>>>>>>>>>>>> through the records just to generate a simple count > > >>>>>>> takes ~ > > >>>>>>>> 40 > > >>>>>>>>>>> times > > >>>>>>>>>>>>>>>>> longer > > >>>>>>>>>>>>>>>>>> than the flush + scan combined. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > > >>>>>>>>>>>>>>>>>> Setup: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > > >>>>>>>>>>>>>>>>>> Java 9 with default settings aside from a 512 MB hea= p > > >>>>>>>>> (Xmx512m, > > >>>>>>>>>>>>>> Xms512m) > > >>>>>>>>>>>>>>>>>> CPU: i7 2.2 Ghz. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Note: I am using a slightly-modified, > > >>>>> directly-accessible > > >>>>>>>>> Kafka > > >>>>>>>>>>>>>> Streams > > >>>>>>>>>>>>>>>>>> RocksDB > > >>>>>>>>>>>>>>>>>> implementation (RocksDB.java, basically just avoidin= g > > >>>>> the > > >>>>>>>>>>>>>>>>>> ProcessorContext). > > >>>>>>>>>>>>>>>>>> There are no modifications to the default RocksDB > > >>>>> values > > >>>>>>>>>> provided > > >>>>>>>>>>> in > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> 2.1/trunk release. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> keysize =3D 128 bytes > > >>>>>>>>>>>>>>>>>> valsize =3D 512 bytes > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Step 1: > > >>>>>>>>>>>>>>>>>> Write X positive matching events: (key =3D prefix + > > >>>>>>>> left-padded > > >>>>>>>>>>>>>>>>>> auto-incrementing integer) > > >>>>>>>>>>>>>>>>>> Step 2: > > >>>>>>>>>>>>>>>>>> Write 10X negative matching events (key =3D left-pad= ded > > >>>>>>>>>>>>>> auto-incrementing > > >>>>>>>>>>>>>>>>>> integer) > > >>>>>>>>>>>>>>>>>> Step 3: > > >>>>>>>>>>>>>>>>>> Perform flush > > >>>>>>>>>>>>>>>>>> Step 4: > > >>>>>>>>>>>>>>>>>> Perform prefixScan > > >>>>>>>>>>>>>>>>>> Step 5: > > >>>>>>>>>>>>>>>>>> Iterate through return Iterator and validate the > > >>>>> count of > > >>>>>>>>>> expected > > >>>>>>>>>>>>>>>>> events. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > > >>>>>>>>>>>>>>>>>> Results: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > > >>>>>>>>>>>>>>>>>> X =3D 1k (11k events total) > > >>>>>>>>>>>>>>>>>> Flush Time =3D 39 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 7 mS > > >>>>>>>>>>>>>>>>>> 6.9 MB disk > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > -------------------------------------------------------------------------= ------------------- > > >>>>>>>>>>>>>>>>>> X =3D 10k (110k events total) > > >>>>>>>>>>>>>>>>>> Flush Time =3D 45 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 8 mS > > >>>>>>>>>>>>>>>>>> 127 MB > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > -------------------------------------------------------------------------= ------------------- > > >>>>>>>>>>>>>>>>>> X =3D 100k (1.1M events total) > > >>>>>>>>>>>>>>>>>> Test1: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 60 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 12 mS > > >>>>>>>>>>>>>>>>>> 678 MB > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Test2: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 45 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 7 mS > > >>>>>>>>>>>>>>>>>> 576 MB > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > -------------------------------------------------------------------------= ------------------- > > >>>>>>>>>>>>>>>>>> X =3D 1MB (11M events total) > > >>>>>>>>>>>>>>>>>> Test1: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 52 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 19 mS > > >>>>>>>>>>>>>>>>>> 7.2 GB > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Test2: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 84 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 34 mS > > >>>>>>>>>>>>>>>>>> 9.1 GB > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > -------------------------------------------------------------------------= ------------------- > > >>>>>>>>>>>>>>>>>> X =3D 2.5M (27.5M events total) > > >>>>>>>>>>>>>>>>>> Test1: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 82 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 63 mS > > >>>>>>>>>>>>>>>>>> 17GB - 276 sst files > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Test2: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 116 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 35 mS > > >>>>>>>>>>>>>>>>>> 23GB - 361 sst files > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Test3: > > >>>>>>>>>>>>>>>>>> Flush Time =3D 103 mS > > >>>>>>>>>>>>>>>>>> Scan Time =3D 82 mS > > >>>>>>>>>>>>>>>>>> 19 GB - 300 sst files > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > -------------------------------------------------------------------------= ------------------- > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I had to limit my testing on my laptop to X =3D 2.5M > > >>>>>>> events. I > > >>>>>>>>>> tried > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>> go > > >>>>>>>>>>>>>>>>>> to X =3D 10M (110M events) but RocksDB was going int= o > > >>>>> the > > >>>>>>>> 100GB+ > > >>>>>>>>>>> range > > >>>>>>>>>>>>>>>>> and my > > >>>>>>>>>>>>>>>>>> laptop ran out of disk. More extensive testing could > > >>>>> be > > >>>>>>> done > > >>>>>>>>>> but I > > >>>>>>>>>>>>>>>>> suspect > > >>>>>>>>>>>>>>>>>> that it would be in line with what we're seeing in t= he > > >>>>>>>> results > > >>>>>>>>>>>> above. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> At this point in time, I think the only major > > >>>>> discussion > > >>>>>>>> point > > >>>>>>>>>> is > > >>>>>>>>>>>>>> really > > >>>>>>>>>>>>>>>>>> around what Jan and I have disagreed on: > > >>>>> repartitioning > > >>>>>>>> back + > > >>>>>>>>>>>>>> resolving > > >>>>>>>>>>>>>>>>>> potential out of order issues or leaving that up to > > >>>>> the > > >>>>>>>> client > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>> handle. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks folks, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak < > > >>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On 29.11.2018 15:14, John Roesler wrote: > > >>>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Sorry that this discussion petered out... I think > > >>>>> the > > >>>>>>> 2.1 > > >>>>>>>>>>> release > > >>>>>>>>>>>>>>>>>>> caused an > > >>>>>>>>>>>>>>>>>>>> extended distraction that pushed it off everyone's > > >>>>>>> radar > > >>>>>>>>>> (which > > >>>>>>>>>>>> was > > >>>>>>>>>>>>>>>>>>>> precisely Adam's concern). Personally, I've also h= ad > > >>>>>>> some > > >>>>>>>>>> extend > > >>>>>>>>>>>>>>>>>>>> distractions of my own that kept (and continue to > > >>>>>>> keep) me > > >>>>>>>>>>>>>>>>> preoccupied. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> However, calling for a vote did wake me up, so I > > >>>>> guess > > >>>>>>> Jan > > >>>>>>>>> was > > >>>>>>>>>>> on > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> right > > >>>>>>>>>>>>>>>>>>>> track! > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I've gone back and reviewed the whole KIP document > > >>>>> and > > >>>>>>> the > > >>>>>>>>>> prior > > >>>>>>>>>>>>>>>>>>>> discussion, and I'd like to offer a few thoughts: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> API Thoughts: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 1. If I read the KIP right, you are proposing a > > >>>>>>>> many-to-one > > >>>>>>>>>>> join. > > >>>>>>>>>>>>>>>>> Could > > >>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>> consider naming it manyToOneJoin? Or, if you prefe= r, > > >>>>>>> flip > > >>>>>>>>> the > > >>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>>>> around > > >>>>>>>>>>>>>>>>>>>> and make it a oneToManyJoin? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The proposed name "joinOnForeignKey" disguises the > > >>>>> join > > >>>>>>>>> type, > > >>>>>>>>>>> and > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>>>>>> like it might trick some people into using it for = a > > >>>>>>>>> one-to-one > > >>>>>>>>>>>> join. > > >>>>>>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>>>> would work, of course, but it would be super > > >>>>>>> inefficient > > >>>>>>>>>>> compared > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>> simple rekey-and-join. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 2. I might have missed it, but I don't think it's > > >>>>>>>> specified > > >>>>>>>>>>>> whether > > >>>>>>>>>>>>>>>>>>> it's an > > >>>>>>>>>>>>>>>>>>>> inner, outer, or left join. I'm guessing an outer > > >>>>>>> join, as > > >>>>>>>>>>>>>>>>> (neglecting > > >>>>>>>>>>>>>>>>>>> IQ), > > >>>>>>>>>>>>>>>>>>>> the rest can be achieved by filtering or by handli= ng > > >>>>>>> it in > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> ValueJoiner. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 3. The arg list to joinOnForeignKey doesn't look > > >>>>> quite > > >>>>>>>>> right. > > >>>>>>>>>>>>>>>>>>>> 3a. Regarding Serialized: There are a few differen= t > > >>>>>>>>> paradigms > > >>>>>>>>>> in > > >>>>>>>>>>>>>>>>> play in > > >>>>>>>>>>>>>>>>>>>> the Streams API, so it's confusing, but instead of > > >>>>>>> three > > >>>>>>>>>>>> Serialized > > >>>>>>>>>>>>>>>>>>> args, I > > >>>>>>>>>>>>>>>>>>>> think it would be better to have one that allows > > >>>>>>>>> (optionally) > > >>>>>>>>>>>>>> setting > > >>>>>>>>>>>>>>>>>>> the 4 > > >>>>>>>>>>>>>>>>>>>> incoming serdes. The result serde is defined by th= e > > >>>>>>>>>>> Materialized. > > >>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>> incoming serdes can be optional because they might > > >>>>>>> already > > >>>>>>>>> be > > >>>>>>>>>>>>>>>>> available > > >>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>> the source KTables, or the default serdes from the > > >>>>>>> config > > >>>>>>>>>> might > > >>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> applicable. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 3b. Is the StreamPartitioner necessary? The other > > >>>>> joins > > >>>>>>>>> don't > > >>>>>>>>>>>> allow > > >>>>>>>>>>>>>>>>>>> setting > > >>>>>>>>>>>>>>>>>>>> one, and it seems like it might actually be harmfu= l, > > >>>>>>> since > > >>>>>>>>> the > > >>>>>>>>>>>> rekey > > >>>>>>>>>>>>>>>>>>>> operation needs to produce results that are > > >>>>>>> co-partitioned > > >>>>>>>>>> with > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> "other" > > >>>>>>>>>>>>>>>>>>>> KTable. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 4. I'm fine with the "reserved word" header, but I > > >>>>>>> didn't > > >>>>>>>>>>> actually > > >>>>>>>>>>>>>>>>>>> follow > > >>>>>>>>>>>>>>>>>>>> what Matthias meant about namespacing requiring > > >>>>>>>>>> "deserializing" > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> record > > >>>>>>>>>>>>>>>>>>>> header. The headers are already Strings, so I don'= t > > >>>>>>> think > > >>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>> deserialization is required. If we applied the > > >>>>>>> namespace > > >>>>>>>> at > > >>>>>>>>>>> source > > >>>>>>>>>>>>>>>>> nodes > > >>>>>>>>>>>>>>>>>>>> and stripped it at sink nodes, this would be > > >>>>>>> practically > > >>>>>>>> no > > >>>>>>>>>>>>>> overhead. > > >>>>>>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>> advantage of the namespace idea is that no public > > >>>>> API > > >>>>>>>> change > > >>>>>>>>>> wrt > > >>>>>>>>>>>>>>>>> headers > > >>>>>>>>>>>>>>>>>>>> needs to happen, and no restrictions need to be > > >>>>> placed > > >>>>>>> on > > >>>>>>>>>> users' > > >>>>>>>>>>>>>>>>>>> headers. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> (Although I'm wondering if we can get away without > > >>>>> the > > >>>>>>>>> header > > >>>>>>>>>> at > > >>>>>>>>>>>>>>>>> all... > > >>>>>>>>>>>>>>>>>>>> stay tuned) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 5. I also didn't follow the discussion about the H= WM > > >>>>>>> table > > >>>>>>>>>>> growing > > >>>>>>>>>>>>>>>>>>> without > > >>>>>>>>>>>>>>>>>>>> bound. As I read it, the HWM table is effectively > > >>>>>>>>> implementing > > >>>>>>>>>>> OCC > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> resolve the problem you noted with disordering whe= n > > >>>>> the > > >>>>>>>>> rekey > > >>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>> reversed... particularly notable when the FK > > >>>>> changes. > > >>>>>>> As > > >>>>>>>>> such, > > >>>>>>>>>>> it > > >>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>> needs to track the most recent "version" (the > > >>>>> offset in > > >>>>>>>> the > > >>>>>>>>>>> source > > >>>>>>>>>>>>>>>>>>>> partition) of each key. Therefore, it should have > > >>>>> the > > >>>>>>> same > > >>>>>>>>>>> number > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>> keys > > >>>>>>>>>>>>>>>>>>>> as the source table at all times. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I see that you are aware of KIP-258, which I think > > >>>>>>> might > > >>>>>>>> be > > >>>>>>>>>>>> relevant > > >>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>> couple of ways. One: it's just about storing the > > >>>>>>> timestamp > > >>>>>>>>> in > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>> store, but the ultimate idea is to effectively use > > >>>>> the > > >>>>>>>>>> timestamp > > >>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>> OCC > > >>>>>>>>>>>>>>>>>>>> "version" to drop disordered updates. You wouldn't > > >>>>>>> want to > > >>>>>>>>> use > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> timestamp for this operation, but if you were to > > >>>>> use a > > >>>>>>>>> similar > > >>>>>>>>>>>>>>>>>>> mechanism to > > >>>>>>>>>>>>>>>>>>>> store the source offset in the store alongside the > > >>>>>>>> re-keyed > > >>>>>>>>>>>> values, > > >>>>>>>>>>>>>>>>> then > > >>>>>>>>>>>>>>>>>>>> you could avoid a separate table. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 6. You and Jan have been thinking about this for a > > >>>>> long > > >>>>>>>>> time, > > >>>>>>>>>> so > > >>>>>>>>>>>>>> I've > > >>>>>>>>>>>>>>>>>>>> probably missed something here, but I'm wondering > > >>>>> if we > > >>>>>>>> can > > >>>>>>>>>>> avoid > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> HWM > > >>>>>>>>>>>>>>>>>>>> tracking at all and resolve out-of-order during a > > >>>>> final > > >>>>>>>> join > > >>>>>>>>>>>>>>>>> instead... > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Let's say we're joining a left table (Integer K: > > >>>>> Letter > > >>>>>>>> FK, > > >>>>>>>>>>> (other > > >>>>>>>>>>>>>>>>>>> data)) > > >>>>>>>>>>>>>>>>>>>> to a right table (Letter K: (some data)). > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Left table: > > >>>>>>>>>>>>>>>>>>>> 1: (A, xyz) > > >>>>>>>>>>>>>>>>>>>> 2: (B, asd) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Right table: > > >>>>>>>>>>>>>>>>>>>> A: EntityA > > >>>>>>>>>>>>>>>>>>>> B: EntityB > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> We could do a rekey as you proposed with a combine= d > > >>>>>>> key, > > >>>>>>>> but > > >>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> propagating the value at all.. > > >>>>>>>>>>>>>>>>>>>> Rekey table: > > >>>>>>>>>>>>>>>>>>>> A-1: (dummy value) > > >>>>>>>>>>>>>>>>>>>> B-2: (dummy value) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Which we then join with the right table to produce= : > > >>>>>>>>>>>>>>>>>>>> A-1: EntityA > > >>>>>>>>>>>>>>>>>>>> B-2: EntityB > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Which gets rekeyed back: > > >>>>>>>>>>>>>>>>>>>> 1: A, EntityA > > >>>>>>>>>>>>>>>>>>>> 2: B, EntityB > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> And finally we do the actual join: > > >>>>>>>>>>>>>>>>>>>> Result table: > > >>>>>>>>>>>>>>>>>>>> 1: ((A, xyz), EntityA) > > >>>>>>>>>>>>>>>>>>>> 2: ((B, asd), EntityB) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The thing is that in that last join, we have the > > >>>>>>>> opportunity > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>> compare > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> current FK in the left table with the incoming PK = of > > >>>>>>> the > > >>>>>>>>> right > > >>>>>>>>>>>>>>>>> table. If > > >>>>>>>>>>>>>>>>>>>> they don't match, we just drop the event, since it > > >>>>>>> must be > > >>>>>>>>>>>> outdated. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> In your KIP, you gave an example in which (1: A, > > >>>>> xyz) > > >>>>>>> gets > > >>>>>>>>>>> updated > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> (1: > > >>>>>>>>>>>>>>>>>>>> B, xyz), ultimately yielding a conundrum about > > >>>>> whether > > >>>>>>> the > > >>>>>>>>>> final > > >>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>> should be (1: null) or (1: joined-on-B). With the > > >>>>>>>> algorithm > > >>>>>>>>>>> above, > > >>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (= 1: > > >>>>>>> (B, > > >>>>>>>>> xyz), > > >>>>>>>>>>> (B, > > >>>>>>>>>>>>>>>>>>>> EntityB)). It seems like this does give you enough > > >>>>>>>>> information > > >>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> right choice, regardless of disordering. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Will check Adams patch, but this should work. As > > >>>>>>> mentioned > > >>>>>>>>>> often > > >>>>>>>>>>> I > > >>>>>>>>>>>> am > > >>>>>>>>>>>>>>>>>>> not convinced on partitioning back for the user > > >>>>>>>>> automatically. > > >>>>>>>>>> I > > >>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>> this is the real performance eater ;) > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 7. Last thought... I'm a little concerned about th= e > > >>>>>>>>>> performance > > >>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> range scans when records change in the right table= . > > >>>>>>> You've > > >>>>>>>>>> said > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>> you've > > >>>>>>>>>>>>>>>>>>>> been using the algorithm you presented in producti= on > > >>>>>>> for a > > >>>>>>>>>>> while. > > >>>>>>>>>>>>>> Can > > >>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>> give us a sense of the performance characteristics > > >>>>>>> you've > > >>>>>>>>>>>> observed? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Make it work, make it fast, make it beautiful. The > > >>>>>>> topmost > > >>>>>>>>>> thing > > >>>>>>>>>>>> here > > >>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> / was correctness. In practice I do not measure the > > >>>>>>>>> performance > > >>>>>>>>>>> of > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> range scan. Usual cases I run this with is emitting > > >>>>>>> 500k - > > >>>>>>>>> 1kk > > >>>>>>>>>>> rows > > >>>>>>>>>>>>>>>>>>> on a left hand side change. The range scan is just > > >>>>> the > > >>>>>>> work > > >>>>>>>>> you > > >>>>>>>>>>>> gotta > > >>>>>>>>>>>>>>>>>>> do, also when you pack your data into different > > >>>>> formats, > > >>>>>>>>>> usually > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> rocks performance is very tight to the size of the > > >>>>> data > > >>>>>>> and > > >>>>>>>>> we > > >>>>>>>>>>>> can't > > >>>>>>>>>>>>>>>>>>> really change that. It is more important for users = to > > >>>>>>>> prevent > > >>>>>>>>>>>> useless > > >>>>>>>>>>>>>>>>>>> updates to begin with. My left hand side is guarded > > >>>>> to > > >>>>>>> drop > > >>>>>>>>>>> changes > > >>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>> are not going to change my join output. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> usually it's: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> drop unused fields and then don't forward if > > >>>>>>>> old.equals(new) > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> regarding to the performance of creating an iterato= r > > >>>>> for > > >>>>>>>>>> smaller > > >>>>>>>>>>>>>>>>>>> fanouts, users can still just do a group by first > > >>>>> then > > >>>>>>>>> anyways. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I could only think of one alternative, but I'm not > > >>>>>>> sure if > > >>>>>>>>>> it's > > >>>>>>>>>>>>>>>>> better > > >>>>>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>> worse... If the first re-key only needs to preserv= e > > >>>>> the > > >>>>>>>>>> original > > >>>>>>>>>>>>>> key, > > >>>>>>>>>>>>>>>>>>> as I > > >>>>>>>>>>>>>>>>>>>> proposed in #6, then we could store a vector of > > >>>>> keys in > > >>>>>>>> the > > >>>>>>>>>>> value: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Left table: > > >>>>>>>>>>>>>>>>>>>> 1: A,... > > >>>>>>>>>>>>>>>>>>>> 2: B,... > > >>>>>>>>>>>>>>>>>>>> 3: A,... > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Gets re-keyed: > > >>>>>>>>>>>>>>>>>>>> A: [1, 3] > > >>>>>>>>>>>>>>>>>>>> B: [2] > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Then, the rhs part of the join would only need a > > >>>>>>> regular > > >>>>>>>>>>>> single-key > > >>>>>>>>>>>>>>>>>>> lookup. > > >>>>>>>>>>>>>>>>>>>> Of course we have to deal with the problem of larg= e > > >>>>>>>> values, > > >>>>>>>>> as > > >>>>>>>>>>>>>>>>> there's > > >>>>>>>>>>>>>>>>>>> no > > >>>>>>>>>>>>>>>>>>>> bound on the number of lhs records that can > > >>>>> reference > > >>>>>>> rhs > > >>>>>>>>>>> records. > > >>>>>>>>>>>>>>>>>>> Offhand, > > >>>>>>>>>>>>>>>>>>>> I'd say we could page the values, so when one row = is > > >>>>>>> past > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> threshold, we > > >>>>>>>>>>>>>>>>>>>> append the key for the next page. Then in most > > >>>>> cases, > > >>>>>>> it > > >>>>>>>>> would > > >>>>>>>>>>> be > > >>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>> single > > >>>>>>>>>>>>>>>>>>>> key lookup, but for large fan-out updates, it woul= d > > >>>>> be > > >>>>>>> one > > >>>>>>>>> per > > >>>>>>>>>>>> (max > > >>>>>>>>>>>>>>>>>>> value > > >>>>>>>>>>>>>>>>>>>> size)/(avg lhs key size). > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> This seems more complex, though... Plus, I think > > >>>>>>> there's > > >>>>>>>>> some > > >>>>>>>>>>>> extra > > >>>>>>>>>>>>>>>>>>>> tracking we'd need to do to know when to emit a > > >>>>>>>> retraction. > > >>>>>>>>>> For > > >>>>>>>>>>>>>>>>> example, > > >>>>>>>>>>>>>>>>>>>> when record 1 is deleted, the re-key table would > > >>>>> just > > >>>>>>> have > > >>>>>>>>> (A: > > >>>>>>>>>>>> [3]). > > >>>>>>>>>>>>>>>>>>> Some > > >>>>>>>>>>>>>>>>>>>> kind of tombstone is needed so that the join resul= t > > >>>>>>> for 1 > > >>>>>>>>> can > > >>>>>>>>>>> also > > >>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> retracted. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> That's all! > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks so much to both Adam and Jan for the > > >>>>> thoughtful > > >>>>>>>> KIP. > > >>>>>>>>>>> Sorry > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> discussion has been slow. > > >>>>>>>>>>>>>>>>>>>> -John > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak < > > >>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Id say you can just call the vote. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> that happens all the time, and if something comes > > >>>>> up, > > >>>>>>> it > > >>>>>>>>> just > > >>>>>>>>>>>> goes > > >>>>>>>>>>>>>>>>> back > > >>>>>>>>>>>>>>>>>>>>> to discuss. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> would not expect to much attention with another > > >>>>>>> another > > >>>>>>>>> email > > >>>>>>>>>>> in > > >>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>> thread. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> best Jan > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote: > > >>>>>>>>>>>>>>>>>>>>>> Hello Contributors > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> I know that 2.1 is about to be released, but I d= o > > >>>>>>> need > > >>>>>>>> to > > >>>>>>>>>> bump > > >>>>>>>>>>>>>>>>> this to > > >>>>>>>>>>>>>>>>>>>>> keep > > >>>>>>>>>>>>>>>>>>>>>> visibility up. I am still intending to push this > > >>>>>>> through > > >>>>>>>>>> once > > >>>>>>>>>>>>>>>>>>> contributor > > >>>>>>>>>>>>>>>>>>>>>> feedback is given. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Main points that need addressing: > > >>>>>>>>>>>>>>>>>>>>>> 1) Any way (or benefit) in structuring the curre= nt > > >>>>>>>>> singular > > >>>>>>>>>>>> graph > > >>>>>>>>>>>>>>>>> node > > >>>>>>>>>>>>>>>>>>>>> into > > >>>>>>>>>>>>>>>>>>>>>> multiple nodes? It has a whopping 25 parameters > > >>>>> right > > >>>>>>>>> now. I > > >>>>>>>>>>> am > > >>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>> bit > > >>>>>>>>>>>>>>>>>>>>> fuzzy > > >>>>>>>>>>>>>>>>>>>>>> on how the optimizations are supposed to work, s= o > > >>>>> I > > >>>>>>>> would > > >>>>>>>>>>>>>>>>> appreciate > > >>>>>>>>>>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>>>>>> help on this aspect. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 2) Overall strategy for joining + resolving. Thi= s > > >>>>>>> thread > > >>>>>>>>> has > > >>>>>>>>>>>> much > > >>>>>>>>>>>>>>>>>>>>> discourse > > >>>>>>>>>>>>>>>>>>>>>> between Jan and I between the current highwater > > >>>>> mark > > >>>>>>>>>> proposal > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>> groupBy > > >>>>>>>>>>>>>>>>>>>>>> + reduce proposal. I am of the opinion that we > > >>>>> need > > >>>>>>> to > > >>>>>>>>>>> strictly > > >>>>>>>>>>>>>>>>> handle > > >>>>>>>>>>>>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>>>>>> chance of out-of-order data and leave none of it > > >>>>> up > > >>>>>>> to > > >>>>>>>> the > > >>>>>>>>>>>>>>>>> consumer. > > >>>>>>>>>>>>>>>>>>> Any > > >>>>>>>>>>>>>>>>>>>>>> comments or suggestions here would also help. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 3) Anything else that you see that would prevent > > >>>>> this > > >>>>>>>> from > > >>>>>>>>>>>> moving > > >>>>>>>>>>>>>>>>> to a > > >>>>>>>>>>>>>>>>>>>>> vote? > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare = < > > >>>>>>>>>>>>>>>>>>>>> adam.bellemare@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Jan > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> With the Stores.windowStoreBuilder and > > >>>>>>>>>>>>>>>>> Stores.persistentWindowStore, > > >>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>> actually only need to specify the amount of > > >>>>> segments > > >>>>>>>> you > > >>>>>>>>>> want > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> how > > >>>>>>>>>>>>>>>>>>>>> large > > >>>>>>>>>>>>>>>>>>>>>>> they are. To the best of my understanding, what > > >>>>>>> happens > > >>>>>>>>> is > > >>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> segments are automatically rolled over as new > > >>>>> data > > >>>>>>> with > > >>>>>>>>> new > > >>>>>>>>>>>>>>>>>>> timestamps > > >>>>>>>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>>>> created. We use this exact functionality in som= e > > >>>>> of > > >>>>>>> the > > >>>>>>>>>> work > > >>>>>>>>>>>> done > > >>>>>>>>>>>>>>>>>>>>>>> internally at my company. For reference, this i= s > > >>>>> the > > >>>>>>>>>> hopping > > >>>>>>>>>>>>>>>>> windowed > > >>>>>>>>>>>>>>>>>>>>> store. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api= .html#id21 > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> In the code that I have provided, there are goi= ng > > >>>>>>> to be > > >>>>>>>>> two > > >>>>>>>>>>> 24h > > >>>>>>>>>>>>>>>>>>>>> segments. > > >>>>>>>>>>>>>>>>>>>>>>> When a record is put into the windowStore, it > > >>>>> will > > >>>>>>> be > > >>>>>>>>>>> inserted > > >>>>>>>>>>>> at > > >>>>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>>> T in > > >>>>>>>>>>>>>>>>>>>>>>> both segments. The two segments will always > > >>>>> overlap > > >>>>>>> by > > >>>>>>>>> 12h. > > >>>>>>>>>>> As > > >>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>>> goes on > > >>>>>>>>>>>>>>>>>>>>>>> and new records are added (say at time T+12h+), > > >>>>> the > > >>>>>>>>> oldest > > >>>>>>>>>>>>>> segment > > >>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> automatically deleted and a new segment created= . > > >>>>> The > > >>>>>>>>>> records > > >>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>> default > > >>>>>>>>>>>>>>>>>>>>>>> inserted with the context.timestamp(), such tha= t > > >>>>> it > > >>>>>>> is > > >>>>>>>>> the > > >>>>>>>>>>>> record > > >>>>>>>>>>>>>>>>>>> time, > > >>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>> the clock time, which is used. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> To the best of my understanding, the timestamps > > >>>>> are > > >>>>>>>>>> retained > > >>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>>>>>> restoring from the changelog. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Basically, this is heavy-handed way to deal wit= h > > >>>>> TTL > > >>>>>>>> at a > > >>>>>>>>>>>>>>>>>>> segment-level, > > >>>>>>>>>>>>>>>>>>>>>>> instead of at an individual record level. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < > > >>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Will that work? I expected it to blow up with > > >>>>>>>>>>>> ClassCastException > > >>>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>>>>>> similar. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> You either would have to specify the window yo= u > > >>>>>>>>> fetch/put > > >>>>>>>>>> or > > >>>>>>>>>>>>>>>>> iterate > > >>>>>>>>>>>>>>>>>>>>>>>> across all windows the key was found in right? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I just hope the window-store doesn't check > > >>>>>>> stream-time > > >>>>>>>>>> under > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> hoods > > >>>>>>>>>>>>>>>>>>>>>>>> that would be a questionable interface. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> If it does: did you see my comment on checking > > >>>>> all > > >>>>>>> the > > >>>>>>>>>>> windows > > >>>>>>>>>>>>>>>>>>> earlier? > > >>>>>>>>>>>>>>>>>>>>>>>> that would be needed to actually give reasonab= le > > >>>>>>> time > > >>>>>>>>>>>> gurantees. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jan > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Check for " highwaterMat " in the PR. I only > > >>>>>>> changed > > >>>>>>>>> the > > >>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>> store, > > >>>>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>> the ProcessorSupplier. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak= < > > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the information. This is indeed > > >>>>>>>> something > > >>>>>>>>>> that > > >>>>>>>>>>>>>>>>> will be > > >>>>>>>>>>>>>>>>>>>>>>>>>>> extremely > > >>>>>>>>>>>>>>>>>>>>>>>>>>> useful for this KIP. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your explanations. That being > > >>>>> said, I > > >>>>>>>> will > > >>>>>>>>>> not > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>> moving > > >>>>>>>>>>>>>>>>>>>>>>>> ahead > > >>>>>>>>>>>>>>>>>>>>>>>>>>> with an implementation using > > >>>>> reshuffle/groupBy > > >>>>>>>>> solution > > >>>>>>>>>>> as > > >>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>> propose. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> That being said, if you wish to implement i= t > > >>>>>>>> yourself > > >>>>>>>>>> off > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> my > > >>>>>>>>>>>>>>>>>>>>>>>> current PR > > >>>>>>>>>>>>>>>>>>>>>>>>>>> and submit it as a competitive alternative,= I > > >>>>>>> would > > >>>>>>>>> be > > >>>>>>>>>>> more > > >>>>>>>>>>>>>>>>> than > > >>>>>>>>>>>>>>>>>>>>>>>> happy to > > >>>>>>>>>>>>>>>>>>>>>>>>>>> help vet that as an alternate solution. As = it > > >>>>>>>> stands > > >>>>>>>>>>> right > > >>>>>>>>>>>>>>>>> now, > > >>>>>>>>>>>>>>>>>>> I do > > >>>>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>> really have more time to invest into > > >>>>>>> alternatives > > >>>>>>>>>> without > > >>>>>>>>>>>>>>>>> there > > >>>>>>>>>>>>>>>>>>>>> being > > >>>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>>> strong indication from the binding voters > > >>>>> which > > >>>>>>>> they > > >>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>> prefer. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hey, total no worries. I think I personally > > >>>>> gave > > >>>>>>> up > > >>>>>>>> on > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>>> streams > > >>>>>>>>>>>>>>>>>>>>> DSL > > >>>>>>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>>>> some time already, otherwise I would have > > >>>>> pulled > > >>>>>>>> this > > >>>>>>>>>> KIP > > >>>>>>>>>>>>>>>>> through > > >>>>>>>>>>>>>>>>>>>>>>>> already. > > >>>>>>>>>>>>>>>>>>>>>>>>>> I am currently reimplementing my own DSL > > >>>>> based on > > >>>>>>>>> PAPI. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I will look at finishing up my PR with the > > >>>>>>> windowed > > >>>>>>>>>> state > > >>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>> next > > >>>>>>>>>>>>>>>>>>>>>>>>>>> week or so, exercising it via tests, and > > >>>>> then I > > >>>>>>>> will > > >>>>>>>>>> come > > >>>>>>>>>>>>>> back > > >>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>> final > > >>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. In the meantime, I hope that > > >>>>> any of > > >>>>>>>> the > > >>>>>>>>>>>> binding > > >>>>>>>>>>>>>>>>>>> voters > > >>>>>>>>>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have > > >>>>>>> updated > > >>>>>>>> it > > >>>>>>>>>>>>>>>>> according > > >>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>> latest plan: > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Support+non-key+joining+in+KTable > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I have also updated the KIP PR to use a > > >>>>> windowed > > >>>>>>>>> store. > > >>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever > > >>>>> they > > >>>>>>>> are > > >>>>>>>>>>>>>>>>> completed. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsuppli= er > > >>>>>>>> already > > >>>>>>>>>>>> updated > > >>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> PR? > > >>>>>>>>>>>>>>>>>>>>>>>>>> expected it to change to Windowed,Long > > >>>>> Missing > > >>>>>>>>>>> something? > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang > > >>>>> Wang < > > >>>>>>>>>>>>>>>>>>> wangguoz@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 > > >>>>> is > > >>>>>>> the > > >>>>>>>>>> wrong > > >>>>>>>>>>>>>> link, > > >>>>>>>>>>>>>>>>>>> as it > > >>>>>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as > > >>>>>>> part of > > >>>>>>>>>>> KIP-258 > > >>>>>>>>>>>>>>>>> we do > > >>>>>>>>>>>>>>>>>>>>>>>> want to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> have "handling out-of-order data for sourc= e > > >>>>>>>> KTable" > > >>>>>>>>>> such > > >>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>> instead of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> blindly apply the updates to the > > >>>>> materialized > > >>>>>>>> store, > > >>>>>>>>>>> i.e. > > >>>>>>>>>>>>>>>>>>> following > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> offset > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering, we will reject updates that are > > >>>>> older > > >>>>>>>> than > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>>>>>>> key's > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps, i.e. following timestamp > > >>>>> ordering. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang > > >>>>>>> Wang < > > >>>>>>>>>>>>>>>>>>>>> wangguoz@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Adam, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the > > >>>>>>> final > > >>>>>>>>> step > > >>>>>>>>>>>> (i.e. > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> high > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermark store, now altered to be replac= ed > > >>>>>>> with > > >>>>>>>> a > > >>>>>>>>>>> window > > >>>>>>>>>>>>>>>>>>> store), > > >>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> another current on-going KIP may actually > > >>>>>>> help: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is for adding the timestamp into a > > >>>>>>> key-value > > >>>>>>>>>> store > > >>>>>>>>>>>>>>>>> (i.e. > > >>>>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its > > >>>>>>> usage, > > >>>>>>>> as > > >>>>>>>>>>>>>>>>> described > > >>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533 > > >>>>>>>> , > > >>>>>>>>> is > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>>>> then > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "reject" updates from the source topics i= f > > >>>>> its > > >>>>>>>>>>> timestamp > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>> smaller > > >>>>>>>>>>>>>>>>>>>>>>>> than > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current key's latest update timestamp= . > > >>>>> I > > >>>>>>>> think > > >>>>>>>>> it > > >>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> very > > >>>>>>>>>>>>>>>>>>>>>>>> similar to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> what you have in mind for high watermark > > >>>>> based > > >>>>>>>>>>> filtering, > > >>>>>>>>>>>>>>>>> while > > >>>>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to make sure that the timestamps of > > >>>>> the > > >>>>>>>>> joining > > >>>>>>>>>>>>>> records > > >>>>>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> correctly > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> inherited though the whole topology to th= e > > >>>>>>> final > > >>>>>>>>>> stage. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Note that this KIP is for key-value store > > >>>>> and > > >>>>>>>> hence > > >>>>>>>>>>>>>>>>>>> non-windowed > > >>>>>>>>>>>>>>>>>>>>>>>> KTables > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> only, but for windowed KTables we do not > > >>>>>>> really > > >>>>>>>>> have > > >>>>>>>>>> a > > >>>>>>>>>>>> good > > >>>>>>>>>>>>>>>>>>>>> support > > >>>>>>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> their joins anyways ( > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>> https://issues.apache.org/jira/browse/KAFKA-7107) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think we can just consider non-windowed > > >>>>>>>>> KTable-KTable > > >>>>>>>>>>>>>>>>> non-key > > >>>>>>>>>>>>>>>>>>>>> joins > > >>>>>>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan > > >>>>> Filipiak > > >>>>>>> < > > >>>>>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrot= e: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Guozhang > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current highwater mark implementation > > >>>>> would > > >>>>>>>> grow > > >>>>>>>>>>>>>> endlessly > > >>>>>>>>>>>>>>>>>>> based > > >>>>>>>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary key of original event. It is a > > >>>>> pair > > >>>>>>> of > > >>>>>>>>>> ( > >>>>>>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>>>>>>>>> primary > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key>, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ). This > > >>>>> is > > >>>>>>> used > > >>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> differentiate > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> late arrivals and new updates. My newest > > >>>>>>> proposal > > >>>>>>>>>> would > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> replace > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with a Windowed state store of Duration N= . > > >>>>>>> This > > >>>>>>>>> would > > >>>>>>>>>>>> allow > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on > > >>>>> time. > > >>>>>>> This > > >>>>>>>>>>> should > > >>>>>>>>>>>>>>>>> allow > > >>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>> all > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late-arriving events to be processed, a= nd > > >>>>>>>> should > > >>>>>>>>> be > > >>>>>>>>>>>>>>>>>>> customizable > > >>>>>>>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: > > >>>>>>> perhaps > > >>>>>>>>> just > > >>>>>>>>>>> 10 > > >>>>>>>>>>>>>>>>>>> minutes > > >>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or perhaps 7 days...). > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can > > >>>>> do > > >>>>>>> the > > >>>>>>>>>> trick > > >>>>>>>>>>>>>> here. > > >>>>>>>>>>>>>>>>>>> Even > > >>>>>>>>>>>>>>>>>>>>>>>> if I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would still like to see the automatic > > >>>>>>>>> repartitioning > > >>>>>>>>>>>>>>>>> optional > > >>>>>>>>>>>>>>>>>>>>>>>> since I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> just reshuffle again. With windowed store= I > > >>>>>>> am a > > >>>>>>>>>> little > > >>>>>>>>>>>> bit > > >>>>>>>>>>>>>>>>>>>>>>>> sceptical > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> about > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> how to determine the window. So esentiall= y > > >>>>> one > > >>>>>>>>> could > > >>>>>>>>>>> run > > >>>>>>>>>>>>>>>>> into > > >>>>>>>>>>>>>>>>>>>>>>>> problems > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the rapid change happens near a window > > >>>>>>> border. I > > >>>>>>>>> will > > >>>>>>>>>>>> check > > >>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in detail, if its > > >>>>>>> problematic, we > > >>>>>>>>>> could > > >>>>>>>>>>>>>>>>> still > > >>>>>>>>>>>>>>>>>>>>> check > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> _all_ > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows on read with not to bad > > >>>>> performance > > >>>>>>>>> impact I > > >>>>>>>>>>>>>> guess. > > >>>>>>>>>>>>>>>>>>> Will > > >>>>>>>>>>>>>>>>>>>>>>>> let > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know if the implementation would be > > >>>>> correct > > >>>>>>> as > > >>>>>>>>> is. I > > >>>>>>>>>>>>>>>>> wouldn't > > >>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =3D> > > >>>>>>>>> timestamp(A) < > > >>>>>>>>>>>>>>>>>>>>> timestamp(B). > > >>>>>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we can't expect that. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I believe I understand what you mean no= w > > >>>>> - > > >>>>>>>> thanks > > >>>>>>>>>> for > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> diagram, it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> did really help. You are correct that I > > >>>>> do > > >>>>>>> not > > >>>>>>>>> have > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> original > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> key available, and I can see that if it w= as > > >>>>>>>>> available > > >>>>>>>>>>>> then > > >>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>> would be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> able to add and remove events from the > > >>>>> Map. > > >>>>>>>> That > > >>>>>>>>>>> being > > >>>>>>>>>>>>>>>>> said, > > >>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> encourage > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> you to finish your diagrams / charts just > > >>>>> for > > >>>>>>>>> clarity > > >>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>> everyone > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> else. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just > > >>>>> really > > >>>>>>> hard > > >>>>>>>>>> work. > > >>>>>>>>>>>> But > > >>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>> understand > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about t= he > > >>>>>>>>> original > > >>>>>>>>>>>>>> primary > > >>>>>>>>>>>>>>>>>>> key, > > >>>>>>>>>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join and Group by implemented our own in > > >>>>> PAPI > > >>>>>>>> and > > >>>>>>>>>>>>>> basically > > >>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>> using > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely > > >>>>> missed > > >>>>>>>> that > > >>>>>>>>> in > > >>>>>>>>>>>>>>>>> original > > >>>>>>>>>>>>>>>>>>> DSL > > >>>>>>>>>>>>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> there and just assumed it. total brain me= ss > > >>>>>>> up on > > >>>>>>>>> my > > >>>>>>>>>>> end. > > >>>>>>>>>>>>>>>>> Will > > >>>>>>>>>>>>>>>>>>>>>>>> finish > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> chart as soon as i get a quite evening th= is > > >>>>>>> week. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My follow up question for you is, won't > > >>>>> the > > >>>>>>> Map > > >>>>>>>>> stay > > >>>>>>>>>>>>>> inside > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> State > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Store indefinitely after all of the > > >>>>> changes > > >>>>>>>> have > > >>>>>>>>>>>>>>>>> propagated? > > >>>>>>>>>>>>>>>>>>>>> Isn't > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> effectively the same as a highwater mar= k > > >>>>>>> state > > >>>>>>>>>> store? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, > > >>>>>>> substractor > > >>>>>>>> is > > >>>>>>>>>>> gonna > > >>>>>>>>>>>>>>>>>>> return > > >>>>>>>>>>>>>>>>>>>>>>>> `null` > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key is removed from the keyspace. But > > >>>>>>> there > > >>>>>>>> is > > >>>>>>>>>>> going > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use > > >>>>> this > > >>>>>>>> store > > >>>>>>>>>>>> directly > > >>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() > > >>>>> is a > > >>>>>>>>>> regular > > >>>>>>>>>>>>>>>>> store, > > >>>>>>>>>>>>>>>>>>>>>>>> satisfying > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all gurantees needed for further groupby= / > > >>>>>>> join. > > >>>>>>>>> The > > >>>>>>>>>>>>>>>>> Windowed > > >>>>>>>>>>>>>>>>>>>>>>>> store is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> keeping the values, so for the next > > >>>>> statefull > > >>>>>>>>>> operation > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to instantiate an extra store. or w= e > > >>>>>>> have > > >>>>>>>> the > > >>>>>>>>>>>> window > > >>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>> also > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the values then. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Long story short. if we can flip in a > > >>>>> custom > > >>>>>>>> group > > >>>>>>>>>> by > > >>>>>>>>>>>>>>>>> before > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioning to the original primary > > >>>>> key i > > >>>>>>>> think > > >>>>>>>>>> it > > >>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>> help > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> users > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> big time in building efficient apps. Give= n > > >>>>> the > > >>>>>>>>>> original > > >>>>>>>>>>>>>>>>> primary > > >>>>>>>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> issue I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand that we do not have a solid > > >>>>>>> foundation > > >>>>>>>>> to > > >>>>>>>>>>>> build > > >>>>>>>>>>>>>>>>> on. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Leaving primary key carry along to the > > >>>>> user. > > >>>>>>>> very > > >>>>>>>>>>>>>>>>>>> unfortunate. I > > >>>>>>>>>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand the decision goes like that. = I > > >>>>> do > > >>>>>>> not > > >>>>>>>>>> think > > >>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>> good > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> decision. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, > > >>>>> Prajakta > > >>>>>>>>> Dumbre < > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com > >>>>>>>>>>>>>>>>>>> dumbreprajakta311@gmail.com > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please remove me from this > > >>>>> group > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:2= 9 > PM > > >>>>>>> Jan > > >>>>>>>>>>> Filipiak > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Hi Adam, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > give me some time, will > make > > >>>>>>> such a > > >>>>>>>>>>> chart. > > >>>>>>>>>>>>>> last > > >>>>>>>>>>>>>>>>>>> time i > > >>>>>>>>>>>>>>>>>>>>>>>> didn't > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> get along > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > well with giphy and ruine= d > > >>>>> all > > >>>>>>> your > > >>>>>>>>>>> charts. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Hopefully i can get it do= ne > > >>>>>>> today > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > On 08.09.2018 16:00, Adam > > >>>>>>> Bellemare > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > Hi Jan > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > I have included a diagr= am > > >>>>> of > > >>>>>>>> what I > > >>>>>>>>>>>>>> attempted > > >>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> KIP. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S= u > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > I attempted this back a= t > > >>>>> the > > >>>>>>>> start > > >>>>>>>>> of > > >>>>>>>>>>> my > > >>>>>>>>>>>> own > > >>>>>>>>>>>>>>>>>>>>>>>> implementation > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > solution, and since I > could > > >>>>>>> not > > >>>>>>>> get > > >>>>>>>>>> it > > >>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> work I > > >>>>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>>>>>> since > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discarded the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > code. At this point in > > >>>>> time, > > >>>>>>> if > > >>>>>>>> you > > >>>>>>>>>>> wish > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> continue > > >>>>>>>>>>>>>>>>>>>>>>>> pursuing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > groupBy solution, I ask > > >>>>> that > > >>>>>>> you > > >>>>>>>>>> please > > >>>>>>>>>>>>>>>>> create a > > >>>>>>>>>>>>>>>>>>>>>>>> diagram on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the KIP > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > carefully explaining yo= ur > > >>>>>>>> solution. > > >>>>>>>>>>>> Please > > >>>>>>>>>>>>>>>>> feel > > >>>>>>>>>>>>>>>>>>> free > > >>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the image I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > just posted as a starti= ng > > >>>>>>> point. > > >>>>>>>> I > > >>>>>>>>> am > > >>>>>>>>>>>> having > > >>>>>>>>>>>>>>>>>>> trouble > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understanding your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > explanations but I thin= k > > >>>>> that > > >>>>>>> a > > >>>>>>>>>>> carefully > > >>>>>>>>>>>>>>>>>>> constructed > > >>>>>>>>>>>>>>>>>>>>>>>> diagram > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will clear > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > up > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > any misunderstandings. > > >>>>>>>> Alternately, > > >>>>>>>>>>>> please > > >>>>>>>>>>>>>>>>> post a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> comprehensive PR with > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > your solution. I can on= ly > > >>>>>>> guess > > >>>>>>>> at > > >>>>>>>>>> what > > >>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>> mean, and > > >>>>>>>>>>>>>>>>>>>>>>>> since I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value my > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > own > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > time as much as you val= ue > > >>>>>>> yours, > > >>>>>>>> I > > >>>>>>>>>>>> believe > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> responsibility to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > provide an implementati= on > > >>>>>>> instead > > >>>>>>>>> of > > >>>>>>>>>> me > > >>>>>>>>>>>>>>>>> trying to > > >>>>>>>>>>>>>>>>>>>>> guess. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > Adam > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > On Sat, Sep 8, 2018 at > 8:00 > > >>>>>>> AM, > > >>>>>>>> Jan > > >>>>>>>>>>>> Filipiak > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> Hi James, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> nice to see you beeing > > >>>>>>>> interested. > > >>>>>>>>>>> kafka > > >>>>>>>>>>>>>>>>>>> streams at > > >>>>>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> point supports > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> all sorts of joins as > > >>>>> long as > > >>>>>>>> both > > >>>>>>>>>>>> streams > > >>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> Adam is currently > > >>>>>>> implementing a > > >>>>>>>>>> join > > >>>>>>>>>>>>>> where a > > >>>>>>>>>>>>>>>>>>> KTable > > >>>>>>>>>>>>>>>>>>>>>>>> and a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KTable can > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> a one to many relation > > >>>>> ship > > >>>>>>>> (1:n). > > >>>>>>>>>> We > > >>>>>>>>>>>>>> exploit > > >>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>> rocksdb > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> datastore that keeps dat= a > > >>>>>>> sorted > > >>>>>>>> (At > > >>>>>>>>>>> least > > >>>>>>>>>>>>>>>>>>> exposes an > > >>>>>>>>>>>>>>>>>>>>>>>> API to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> access the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> stored data in a sorte= d > > >>>>>>>> fashion). > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> I think the technical > > >>>>> caveats > > >>>>>>>> are > > >>>>>>>>>> well > > >>>>>>>>>>>>>>>>>>> understood > > >>>>>>>>>>>>>>>>>>>>> now > > >>>>>>>>>>>>>>>>>>>>>>>> and we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > basically > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> down to philosophy and > API > > >>>>>>>> Design > > >>>>>>>>> ( > > >>>>>>>>>>> when > > >>>>>>>>>>>>>> Adam > > >>>>>>>>>>>>>>>>>>> sees > > >>>>>>>>>>>>>>>>>>>>> my > > >>>>>>>>>>>>>>>>>>>>>>>> newest > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> message). > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> I have a lengthy track > > >>>>>>> record of > > >>>>>>>>>>> loosing > > >>>>>>>>>>>>>>>>> those > > >>>>>>>>>>>>>>>>>>> kinda > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arguments within > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> streams community and = I > > >>>>> have > > >>>>>>> no > > >>>>>>>>> clue > > >>>>>>>>>>>> why. > > >>>>>>>>>>>>>> So > > >>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>> literally > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can't wait for > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > you > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> to churn through this > > >>>>> thread > > >>>>>>> and > > >>>>>>>>>> give > > >>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>> opinion on > > >>>>>>>>>>>>>>>>>>>>>>>> how we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > design > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> the return type of the > > >>>>>>>>> oneToManyJoin > > >>>>>>>>>>> and > > >>>>>>>>>>>>>> how > > >>>>>>>>>>>>>>>>>>> many > > >>>>>>>>>>>>>>>>>>>>>>>> power we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> want to give > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> the user vs "simplicit= y" > > >>>>>>> (where > > >>>>>>>>>>>> simplicity > > >>>>>>>>>>>>>>>>> isn't > > >>>>>>>>>>>>>>>>>>>>>>>> really that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as users > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > still > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> need to understand it = I > > >>>>>>> argue) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> waiting for you to joi= n > > >>>>> in on > > >>>>>>>> the > > >>>>>>>>>>>>>> discussion > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> Best Jan > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> On 07.09.2018 15:49, > James > > >>>>>>> Kwan > > >>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>> I am new to this grou= p > > >>>>> and I > > >>>>>>>>> found > > >>>>>>>>>>> this > > >>>>>>>>>>>>>>>>> subject > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interesting. Sounds > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > like > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>> you guys want to > > >>>>> implement a > > >>>>>>>> join > > >>>>>>>>>>>> table of > > >>>>>>>>>>>>>>>>> two > > >>>>>>>>>>>>>>>>>>>>>>>> streams? Is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > somewhere > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>> I can see the origina= l > > >>>>>>>>> requirement > > >>>>>>>>>> or > > >>>>>>>>>>>>>>>>> proposal? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:= 13 > > >>>>> AM, > > >>>>>>> Jan > > >>>>>>>>>>>> Filipiak > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> On 05.09.2018 22:17, > > >>>>> Adam > > >>>>>>>>>> Bellemare > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> I'm currently testi= ng > > >>>>>>> using a > > >>>>>>>>>>>> Windowed > > >>>>>>>>>>>>>>>>> Store > > >>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> store the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> highwater > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> mark. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> By all indications > this > > >>>>>>>> should > > >>>>>>>>>> work > > >>>>>>>>>>>>>> fine, > > >>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> caveat > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> being that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> can > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> only resolve > > >>>>> out-of-order > > >>>>>>>>> arrival > > >>>>>>>>>>>> for up > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> size of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the window > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > (ie: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> 24h, 72h, etc). Thi= s > > >>>>> would > > >>>>>>>>> remove > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> possibility > > >>>>>>>>>>>>>>>>>>>>>>>> of it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> being > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > unbounded > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> size. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> With regards to Jan= 's > > >>>>>>>>>> suggestion, I > > >>>>>>>>>>>>>>>>> believe > > >>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>> where > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we will > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> remain in > disagreement. > > >>>>>>>> While I > > >>>>>>>>>> do > > >>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>> disagree > > >>>>>>>>>>>>>>>>>>>>>>>> with your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> statement > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> about > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> there likely to be > > >>>>>>> additional > > >>>>>>>>>> joins > > >>>>>>>>>>>> done > > >>>>>>>>>>>>>>>>> in a > > >>>>>>>>>>>>>>>>>>>>>>>> real-world > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> workflow, I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > do > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> see how you can > > >>>>>>> conclusively > > >>>>>>>>> deal > > >>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>> out-of-order > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arrival > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> foreign-key > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> changes and > subsequent > > >>>>>>>> joins. I > > >>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>> attempted > > >>>>>>>>>>>>>>>>>>>>> what > > >>>>>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think you have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> proposed (without a > > >>>>>>>> high-water, > > >>>>>>>>>>> using > > >>>>>>>>>>>>>>>>>>> groupBy and > > >>>>>>>>>>>>>>>>>>>>>>>> reduce) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and found > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> that if > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> the foreign key > changes > > >>>>>>> too > > >>>>>>>>>>> quickly, > > >>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> load > > >>>>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stream thread > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> too > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> high, the joined > > >>>>> messages > > >>>>>>>> will > > >>>>>>>>>>> arrive > > >>>>>>>>>>>>>>>>>>>>> out-of-order > > >>>>>>>>>>>>>>>>>>>>>>>> and be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorrectly > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> propagated, such th= at > > >>>>> an > > >>>>>>>>>>> intermediate > > >>>>>>>>>>>>>>>>> event > > >>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> represented > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > final > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> event. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> Can you shed some > light > > >>>>> on > > >>>>>>>> your > > >>>>>>>>>>>> groupBy > > >>>>>>>>>>>>>>>>>>>>>>>> implementation. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There must be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> some sort of flaw in > it. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> I have a suspicion > > >>>>> where it > > >>>>>>>> is, > > >>>>>>>>> I > > >>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>> just > > >>>>>>>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> confirm. The idea > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> is bullet proof and = it > > >>>>>>> must be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> an implementation me= ss > > >>>>> up. > > >>>>>>> I > > >>>>>>>>> would > > >>>>>>>>>>>> like > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> clarify > > >>>>>>>>>>>>>>>>>>>>>>>> before > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we draw a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> conclusion. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> Repartitioning th= e > > >>>>>>>> scattered > > >>>>>>>>>>> events > > >>>>>>>>>>>>>>>>> back to > > >>>>>>>>>>>>>>>>>>>>> their > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> original > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> partitions is the onl= y > > >>>>> way I > > >>>>>>>> know > > >>>>>>>>>> how > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> conclusively > > >>>>>>>>>>>>>>>>>>>>>>>> deal > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> out-of-order events > in > > >>>>> a > > >>>>>>>> given > > >>>>>>>>>> time > > >>>>>>>>>>>>>> frame, > > >>>>>>>>>>>>>>>>>>> and to > > >>>>>>>>>>>>>>>>>>>>>>>> ensure > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > data > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> eventually consiste= nt > > >>>>> with > > >>>>>>>> the > > >>>>>>>>>>> input > > >>>>>>>>>>>>>>>>> events. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> If you have some co= de > > >>>>> to > > >>>>>>>> share > > >>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>> illustrates > > >>>>>>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> approach, I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> very grateful as it > > >>>>> would > > >>>>>>>>> remove > > >>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>>>>>>>> misunderstandings > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that I may > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > have. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> ah okay you were > looking > > >>>>>>> for > > >>>>>>>> my > > >>>>>>>>>>> code. > > >>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>> don't > > >>>>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> something easily > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> readable here as its > > >>>>>>> bloated > > >>>>>>>>> with > > >>>>>>>>>>>>>>>>> OO-patterns. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> its anyhow trivial: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> @Override > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> public T apply(= K > > >>>>>>> aggKey, > > >>>>>>>> V > > >>>>>>>>>>>> value, T > > >>>>>>>>>>>>>>>>>>>>> aggregate) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> { > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> Map > > >>>>>>>>>>> currentStateAsMap =3D > > >>>>>>>>>>>>>>>>>>>>>>>> asMap(aggregate); > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> << > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> imaginary > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> U > toModifyKey =3D > > >>>>>>>>>>>>>>>>> mapper.apply(value); > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> << this > is > > >>>>> the > > >>>>>>>>> place > > >>>>>>>>>>>> where > > >>>>>>>>>>>>>>>>> people > > >>>>>>>>>>>>>>>>>>>>>>>> actually > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> gonna have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > issues > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> and why you probably > > >>>>>>> couldn't > > >>>>>>>> do > > >>>>>>>>>> it. > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>> need > > >>>>>>>>>>>>>>>>>>>>>>>> to find > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a solution > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > here. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> I didn't realize tha= t > > >>>>> yet. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> << we > > >>>>>>> propagate > > >>>>>>>> the > > >>>>>>>>>>>> field in > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> joiner, so > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that we can > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > pick > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> it up in an aggregat= e. > > >>>>>>>> Probably > > >>>>>>>>>> you > > >>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>> thought > > >>>>>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this in your > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> approach right? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> << I am > > >>>>> very > > >>>>>>> open > > >>>>>>>>> to > > >>>>>>>>>>>> find a > > >>>>>>>>>>>>>>>>>>> generic > > >>>>>>>>>>>>>>>>>>>>>>>> solution > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here. In my > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> honest opinion this = is > > >>>>>>> broken > > >>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> KTableImpl.GroupBy > > >>>>>>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> looses > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > the keys > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> and only maintains t= he > > >>>>>>>> aggregate > > >>>>>>>>>>> key. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> << I > > >>>>>>> abstracted > > >>>>>>>> it > > >>>>>>>>>> away > > >>>>>>>>>>>> back > > >>>>>>>>>>>>>>>>>>> then way > > >>>>>>>>>>>>>>>>>>>>>>>> before > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> i > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > thinking > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> of oneToMany join. > That > > >>>>> is > > >>>>>>>> why I > > >>>>>>>>>>>> didn't > > >>>>>>>>>>>>>>>>>>> realize > > >>>>>>>>>>>>>>>>>>>>> its > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> significance here. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> << > > >>>>> Opinions? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> for (V m : > > >>>>>>> current) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> { > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>> currentStateAsMap.put(mapper.apply(m), > > >>>>>>>>>>>>>> m); > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> } > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> if (isAdder= ) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> { > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>> currentStateAsMap.put(toModifyKey, > > >>>>>>>>>>>>>> value); > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> } > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> else > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> { > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>> currentStateAsMap.remove(toModifyKey); > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>> if(currentStateAsMap.isEmpty()){ > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > return > > >>>>>>> null; > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> } > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> } > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> retrun > > >>>>>>>>>>>>>>>>>>> asAggregateType(currentStateAsMap) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> } > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> Adam > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 > at > > >>>>>>> 3:35 > > >>>>>>>> PM, > > >>>>>>>>>> Jan > > >>>>>>>>>>>>>>>>> Filipiak > > >>>>>>>>>>>>>>>>>>> < > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Jan.Filipiak@trivago.com > > >>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>> Thanks Adam for > > >>>>> bringing > > >>>>>>>>> Matthias > > >>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> speed! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> about the > > >>>>> differences. I > > >>>>>>>> think > > >>>>>>>>>>>>>> re-keying > > >>>>>>>>>>>>>>>>>>> back > > >>>>>>>>>>>>>>>>>>>>>>>> should be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optional at > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> best. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> I would say we > return > > >>>>> a > > >>>>>>>>>>>> KScatteredTable > > >>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>> reshuffle() > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> returning > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>> KTable > > >>>>>>>> to > > >>>>>>>>>> make > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> backwards > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioning > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> optional. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> I am also in a big > > >>>>>>> favour of > > >>>>>>>>>> doing > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> out > > >>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>> order > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processing using > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> group > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> by instead high > water > > >>>>>>> mark > > >>>>>>>>>>> tracking. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> Just because > unbounded > > >>>>>>>> growth > > >>>>>>>>> is > > >>>>>>>>>>>> just > > >>>>>>>>>>>>>>>>> scary > > >>>>>>>>>>>>>>>>>>> + It > > >>>>>>>>>>>>>>>>>>>>>>>> saves > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> us > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the header > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> stuff. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> I think the > > >>>>> abstraction > > >>>>>>> of > > >>>>>>>>>> always > > >>>>>>>>>>>>>>>>>>> repartitioning > > >>>>>>>>>>>>>>>>>>>>>>>> back is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just not so > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> strong. Like the > work > > >>>>> has > > >>>>>>>> been > > >>>>>>>>>>> done > > >>>>>>>>>>>>>>>>> before > > >>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>> partition > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> back and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> grouping > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> by something else > > >>>>>>> afterwards > > >>>>>>>>> is > > >>>>>>>>>>>> really > > >>>>>>>>>>>>>>>>>>> common. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> On 05.09.2018 13:4= 9, > > >>>>> Adam > > >>>>>>>>>>> Bellemare > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> Hi Matthias > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Thank you for you= r > > >>>>>>>> feedback, > > >>>>>>>>> I > > >>>>>>>>>> do > > >>>>>>>>>>>>>>>>>>> appreciate > > >>>>>>>>>>>>>>>>>>>>> it! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> While name spacin= g > > >>>>>>> would be > > >>>>>>>>>>>> possible, > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>> require > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > deserialize > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers wha= t > > >>>>>>> implies > > >>>>>>>> a > > >>>>>>>>>>>> runtime > > >>>>>>>>>>>>>>>>>>> overhead. > > >>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > no > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for no= w > to > > >>>>>>> avoid > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>> overhead. > > >>>>>>>>>>>>>>>>>>> If > > >>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > problem in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we c= an > > >>>>>>> still > > >>>>>>>> add > > >>>>>>>>>>> name > > >>>>>>>>>>>>>>>>> spacing > > >>>>>>>>>>>>>>>>>>>>> later > > >>>>>>>>>>>>>>>>>>>>>>>> on. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Agreed. I will g= o > > >>>>> with > > >>>>>>>>> using a > > >>>>>>>>>>>>>> reserved > > >>>>>>>>>>>>>>>>>>> string > > >>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> document it. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> My main concern > about > > >>>>>>> the > > >>>>>>>>>> design > > >>>>>>>>>>> it > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> type of > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result KTable: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > If > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> understood the > > >>>>> proposal > > >>>>>>>>>>> correctly, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> In your example, > you > > >>>>>>> have > > >>>>>>>>>> table1 > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> table2 > > >>>>>>>>>>>>>>>>>>>>>>>> swapped. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here is how it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> works > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> currently: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> 1) table1 has the > > >>>>>>> records > > >>>>>>>>> that > > >>>>>>>>>>>> contain > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> foreign key > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> within their > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> value. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> table1 input > stream: > > >>>>>>>>>>>> , > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> , > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> table2 input > stream: > > >>>>>>> , > > >>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> 2) A Value mapper > is > > >>>>>>>> required > > >>>>>>>>>> to > > >>>>>>>>>>>>>> extract > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> foreign > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> table1 foreign ke= y > > >>>>>>> mapper: > > >>>>>>>> ( > > >>>>>>>>>>> value > > >>>>>>>>>>>> =3D> > > >>>>>>>>>>>>>>>>>>> value.fk > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> The mapper is > > >>>>> applied to > > >>>>>>>> each > > >>>>>>>>>>>> element > > >>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> table1, > > >>>>>>>>>>>>>>>>>>>>>>>> and a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new combined > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> key is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> made: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> table1 mapped: > > >>>>>>>>>>> (fk=3DA,bar=3D1)>, > > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (fk=3DA,bar=3D2)>, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> (fk=3DB,bar=3D3)> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> 3) The rekeyed > events > > >>>>>>> are > > >>>>>>>>>>>>>> copartitioned > > >>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>> table2: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> a) Stream Thread > with > > >>>>>>>>> Partition > > >>>>>>>>>>> 0: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > RepartitionedTable1: > > >>>>>>> > >>>>>>>>>>>>>>>>> (fk=3DA,bar=3D1)>, > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (fk=3DA,bar=3D2)> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> b) Stream Thread > with > > >>>>>>>>> Partition > > >>>>>>>>>>> 1: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > RepartitionedTable1: > > >>>>>>> > >>>>>>>>>>>>>> (fk=3DB,bar=3D3)> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> 4) From here, the= y > > >>>>> can > > >>>>>>> be > > >>>>>>>>>> joined > > >>>>>>>>>>>>>>>>> together > > >>>>>>>>>>>>>>>>>>>>> locally > > >>>>>>>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> applying the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> joiner > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> function. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> At this point, > Jan's > > >>>>>>> design > > >>>>>>>>> and > > >>>>>>>>>>> my > > >>>>>>>>>>>>>>>>> design > > >>>>>>>>>>>>>>>>>>>>>>>> deviate. My > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design goes > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> repartition the > data > > >>>>>>>>> post-join > > >>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> resolve > > >>>>>>>>>>>>>>>>>>>>>>>> out-of-order > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arrival of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> records, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> finally returning > the > > >>>>>>> data > > >>>>>>>>>> keyed > > >>>>>>>>>>>> just > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> original key. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> expose > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> CombinedKey or an= y > of > > >>>>>>> the > > >>>>>>>>>>> internals > > >>>>>>>>>>>>>>>>>>> outside of > > >>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> joinOnForeignKey > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> function. This do= es > > >>>>> make > > >>>>>>>> for > > >>>>>>>>>>> larger > > >>>>>>>>>>>>>>>>>>> footprint, > > >>>>>>>>>>>>>>>>>>>>>>>> but it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> removes all > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> agency > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> for resolving > > >>>>>>> out-of-order > > >>>>>>>>>>> arrivals > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>> handling > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CombinedKeys from > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> user. I believe > that > > >>>>>>> this > > >>>>>>>>> makes > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> function > > >>>>>>>>>>>>>>>>>>>>> much > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to use. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Let me know if th= is > > >>>>>>> helps > > >>>>>>>>>> resolve > > >>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>> questions, > > >>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please feel > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> free to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> add anything else > on > > >>>>>>> your > > >>>>>>>>> mind. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Thanks again, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Adam > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 20= 18 > > >>>>> at > > >>>>>>> 8:36 > > >>>>>>>>> PM, > > >>>>>>>>>>>>>>>>> Matthias J. > > >>>>>>>>>>>>>>>>>>>>> Sax < > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>> matthias@confluent.io > > >>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> I am just catchi= ng > > >>>>> up > > >>>>>>> on > > >>>>>>>>> this > > >>>>>>>>>>>>>> thread. I > > >>>>>>>>>>>>>>>>>>> did > > >>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>> read > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> everything so > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> far, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> but want to shar= e > > >>>>>>> couple > > >>>>>>>> of > > >>>>>>>>>>>> initial > > >>>>>>>>>>>>>>>>>>> thoughts: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Headers: I think > > >>>>> there > > >>>>>>> is > > >>>>>>>> a > > >>>>>>>>>>>>>> fundamental > > >>>>>>>>>>>>>>>>>>>>>>>> difference > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between header > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> usage > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> in this KIP and > > >>>>> KP-258. > > >>>>>>>> For > > >>>>>>>>>> 258, > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>> add > > >>>>>>>>>>>>>>>>>>>>> headers > > >>>>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> changelog topic > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> are owned by Kaf= ka > > >>>>>>> Streams > > >>>>>>>>> and > > >>>>>>>>>>>> nobody > > >>>>>>>>>>>>>>>>>>> else is > > >>>>>>>>>>>>>>>>>>>>>>>> supposed > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to write > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > into > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> them. In fact, n= o > > >>>>> user > > >>>>>>>>> header > > >>>>>>>>>>> are > > >>>>>>>>>>>>>>>>> written > > >>>>>>>>>>>>>>>>>>> into > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> changelog topic > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> thus, there are > not > > >>>>>>>>> conflicts. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Nevertheless, I > > >>>>> don't > > >>>>>>> see > > >>>>>>>> a > > >>>>>>>>>> big > > >>>>>>>>>>>> issue > > >>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>> using > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> headers within > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Streams. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> As long as we > > >>>>> document > > >>>>>>> it, > > >>>>>>>>> we > > >>>>>>>>>>> can > > >>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>>>>>>> "reserved" > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> header keys > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> users are not > > >>>>> allowed > > >>>>>>> to > > >>>>>>>> use > > >>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>> processing > > >>>>>>>>>>>>>>>>>>>>>>>> data with > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Streams. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this shoul= d > be > > >>>>>>> ok. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> I think there is= a > > >>>>> safe > > >>>>>>>> way > > >>>>>>>>> to > > >>>>>>>>>>>> avoid > > >>>>>>>>>>>>>>>>>>>>> conflicts, > > >>>>>>>>>>>>>>>>>>>>>>>> since > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > headers > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> only needed in > > >>>>>>> internal > > >>>>>>>>>> topics > > >>>>>>>>>>> (I > > >>>>>>>>>>>>>>>>> think): > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> For internal an= d > > >>>>>>>> changelog > > >>>>>>>>>>>> topics, > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>>>> namespace > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all headers: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> * user-defined > > >>>>> headers > > >>>>>>>> are > > >>>>>>>>>>>>>> namespaced > > >>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>>> "external." > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> headerKey > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> * internal > headers > > >>>>> are > > >>>>>>>>>>>> namespaced as > > >>>>>>>>>>>>>>>>>>>>>>>> "internal." + > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> headerKey > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> While name > spacing > > >>>>>>> would > > >>>>>>>> be > > >>>>>>>>>>>>>> possible, > > >>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> require > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> deserialize > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers wha= t > > >>>>>>> implies > > >>>>>>>> a > > >>>>>>>>>>>> runtime > > >>>>>>>>>>>>>>>>>>> overhead. > > >>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > no > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for no= w > to > > >>>>>>> avoid > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>> overhead. > > >>>>>>>>>>>>>>>>>>> If > > >>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > problem in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we c= an > > >>>>>>> still > > >>>>>>>> add > > >>>>>>>>>>> name > > >>>>>>>>>>>>>>>>> spacing > > >>>>>>>>>>>>>>>>>>>>> later > > >>>>>>>>>>>>>>>>>>>>>>>> on. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> My main concern > > >>>>> about > > >>>>>>> the > > >>>>>>>>>> design > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> type > > >>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result KTable: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> If I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> understood the > > >>>>> proposal > > >>>>>>>>>>> correctly, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable > > >>>>> table1 =3D > > >>>>>>> ... > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable > > >>>>> table2 =3D > > >>>>>>> ... > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable > > >>>>>>> joinedTable > > >>>>>>>> =3D > > >>>>>>>>>>>>>>>>>>>>>>>> table1.join(table2,...); > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> implies that the > > >>>>>>>>> `joinedTable` > > >>>>>>>>>>> has > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>>>>>>>>>> as the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> left input > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> table. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this does > not > > >>>>>>> work > > >>>>>>>>>> because > > >>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>> table2 > > >>>>>>>>>>>>>>>>>>>>>>>> contains > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> multiple rows > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> join with a reco= rd > > >>>>> in > > >>>>>>>> table1 > > >>>>>>>>>>>> (what is > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> main > > >>>>>>>>>>>>>>>>>>>>>>>> purpose > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > foreign > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> key > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> join), the resul= t > > >>>>> table > > >>>>>>>>> would > > >>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>> contain a > > >>>>>>>>>>>>>>>>>>>>>>>> single > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join result, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > but > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> multiple. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Example: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> table1 input > stream: > > >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> table2 input > stream: > > >>>>>>>>>> , > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> We use table2 > value > > >>>>> a > > >>>>>>>>> foreign > > >>>>>>>>>>> key > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> table1 > > >>>>>>>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>>>>>>>>>> (ie, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "A" joins). > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > If > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> result key is th= e > > >>>>> same > > >>>>>>> key > > >>>>>>>>> as > > >>>>>>>>>>> key > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>> table1, > > >>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implies that the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> result can eithe= r > be > > >>>>>>> > >>>>>>>>>>>> join(X,1)> > > >>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> join(X,2)> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > both. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Because the shar= e > > >>>>> the > > >>>>>>> same > > >>>>>>>>>> key, > > >>>>>>>>>>>>>>>>> whatever > > >>>>>>>>>>>>>>>>>>>>> result > > >>>>>>>>>>>>>>>>>>>>>>>> record > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we emit > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > later, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> overwrite the > > >>>>> previous > > >>>>>>>>> result. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> This is the reas= on > > >>>>> why > > >>>>>>> Jan > > >>>>>>>>>>>> originally > > >>>>>>>>>>>>>>>>>>> proposed > > >>>>>>>>>>>>>>>>>>>>>>>> to use > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > combination > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> both primary key= s > of > > >>>>>>> the > > >>>>>>>>> input > > >>>>>>>>>>>> tables > > >>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output table. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> This > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> makes the keys o= f > > >>>>> the > > >>>>>>>> output > > >>>>>>>>>>> table > > >>>>>>>>>>>>>>>>> unique > > >>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> store both in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> output table: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Result would be > > >>>>> > >>>>>>>>>>> join(X,1)>, > > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> join(X,2)> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Thoughts? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 P= M, > > >>>>> Jan > > >>>>>>>>>> Filipiak > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> Just on remark > here. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> The > high-watermark > > >>>>>>> could > > >>>>>>>> be > > >>>>>>>>>>>>>>>>> disregarded. > > >>>>>>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>>>>>> decision > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> forward > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> depends on the > > >>>>> size of > > >>>>>>>> the > > >>>>>>>>>>>>>> aggregated > > >>>>>>>>>>>>>>>>>>> map. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> Only 1 element > long > > >>>>>>> maps > > >>>>>>>>>> would > > >>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>> unpacked > > >>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> forwarded. 0 > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > element > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> maps > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> would be > published > > >>>>> as > > >>>>>>>>> delete. > > >>>>>>>>>>> Any > > >>>>>>>>>>>>>>>>> other > > >>>>>>>>>>>>>>>>>>> count > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> of map entries = is > > >>>>> in > > >>>>>>>>> "waiting > > >>>>>>>>>>> for > > >>>>>>>>>>>>>>>>> correct > > >>>>>>>>>>>>>>>>>>>>>>>> deletes to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > arrive"-state. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 > > >>>>> 21:29, > > >>>>>>> Adam > > >>>>>>>>>>>> Bellemare > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> It does look > like I > > >>>>>>> could > > >>>>>>>>>>> replace > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> second > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> highwater stor= e > > >>>>> with > > >>>>>>> a > > >>>>>>>>>> groupBy > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>> reduce. > > >>>>>>>>>>>>>>>>>>>>>>>> However, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it looks > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > like > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> still need to > > >>>>> store > > >>>>>>> the > > >>>>>>>>>>>> highwater > > >>>>>>>>>>>>>>>>> value > > >>>>>>>>>>>>>>>>>>>>> within > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> store, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> compare the > > >>>>> arrival of > > >>>>>>>>>>>> out-of-order > > >>>>>>>>>>>>>>>>>>> records > > >>>>>>>>>>>>>>>>>>>>>>>> (assuming > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> understanding > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> THIS is > > >>>>> correct...). > > >>>>>>> This > > >>>>>>>>> in > > >>>>>>>>>>>> effect > > >>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > have > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> now, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> just with the t= wo > > >>>>>>> tables > > >>>>>>>>>> merged > > >>>>>>>>>>>>>>>>> together. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhanguozhang > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> -- > > >>>>> -- Guozhang > > >>>>> > > >>>> > > >>> > > >> > > > > > > --000000000000ea7f24057e81637c--