From dev-return-105479-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jul 3 15:04:17 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 10458180109 for ; Wed, 3 Jul 2019 17:04:16 +0200 (CEST) Received: (qmail 24437 invoked by uid 500); 3 Jul 2019 15:04:13 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 23673 invoked by uid 99); 3 Jul 2019 15:04:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jul 2019 15:04:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 164B91A31FF for ; Wed, 3 Jul 2019 15:04:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.811 X-Spam-Level: * X-Spam-Status: No, score=1.811 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, T_PDS_NO_HELO_DNS=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id zruYfpeC6MiU for ; Wed, 3 Jul 2019 15:04:06 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::829; helo=mail-qt1-x829.google.com; envelope-from=adam.bellemare@gmail.com; receiver= Received: from mail-qt1-x829.google.com (mail-qt1-x829.google.com [IPv6:2607:f8b0:4864:20::829]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id EDD3C7E20D for ; Wed, 3 Jul 2019 15:04:05 +0000 (UTC) Received: by mail-qt1-x829.google.com with SMTP id z4so239220qtc.3 for ; Wed, 03 Jul 2019 08:04:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=yPnHOJ5YLl+TyU3B0Gcdb1BTBpS+kA/CGjm+w1l8aeA=; b=iOhNdO+xw8hWlSf9NgUJP3IsKypApxfkeg2Qz09+Hega7ujXY6du/AiirL5FCi9jbl UxqAHM8YeVM1X0fsXCLy9F46CatMWoJL+ym7++fn9+QYDMGawbioJqEvtcFFwQBA3T00 RJu1CQIq6gKaZ9smcCqKSdEZ+Xh5ZtsQVmK1cuPHWcZPaVbCKsh9LcTTO5XkF1wWqGzO MJopWdyaSX4eFZXIz2aomCuqPv5rp5dUj7bTl2sSsclmdzNQy9bzPIvuLnm4xaZKxvOO Cx1SEpuCbja2o5EzSEAAF9Md6qGmXXGBLL2EtjCRkYDdt+hz3oHQMv/Qz4c8DV28a4Fc KubQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=yPnHOJ5YLl+TyU3B0Gcdb1BTBpS+kA/CGjm+w1l8aeA=; b=YL0dbXIONlTXIpiTPfu6JLagLZBQ2Txa6c3VBvq2SjbojHmi1q9bNIYyz5fG/PoPNa hjy2HP/4zfTDQQOuA0InRXR6sHS5KtRoLEzIEogRtkEfRRPNjo5hYrpnW5jIDFxnD7vU ql++4lEOyEabPpk29FqdtCmp9e9asXJ3WQjJudsfKH6A5hUDzZ8nOv/daafGHpJvlrnV dxZXVp0ea43Ul4mwUhXLhgx6JuODMzxLYuZPrNI7F3N0qVqFw7LQf2URtBgAwIoEAJAb ozyAfhsffP/axFg+aUIiaxVCyx4AdMqU1c1Wzx4IT4x/+wwzUYhnFxbPNt2w/0asnUSz BGbA== X-Gm-Message-State: APjAAAWcZzRBtpGJAjUtYrDVFnthEwl4JYO6wNPRx3myuG6prgeebz53 BbzlY39ANAF63J8QFsrch4huwUp1ycOiuniKUmvV4FISoos= X-Google-Smtp-Source: APXvYqy/lhUk5CxGjmwbiWqrmyR/ADlLt3XBvffZs95zYqnYOp3wvC71qNcAvZm5xGNGQFy0ILz1HYe+X7pWcPEpqqk= X-Received: by 2002:ac8:30d2:: with SMTP id w18mr31163850qta.296.1562166244534; Wed, 03 Jul 2019 08:04:04 -0700 (PDT) MIME-Version: 1.0 References: <4c8bf348-53da-5246-d909-09ad028fe48e@confluent.io> <64b24cfd-7c27-9863-bf0c-b9d2dc047fb2@confluent.io> In-Reply-To: <64b24cfd-7c27-9863-bf0c-b9d2dc047fb2@confluent.io> From: Adam Bellemare Date: Wed, 3 Jul 2019 11:03:53 -0400 Message-ID: Subject: Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="00000000000015ed15058cc82ccf" --00000000000015ed15058cc82ccf Content-Type: text/plain; charset="UTF-8" Hi Matthias Do you happen to recall what the impact was of having unnecessary tombstones? I am wondering if the negative impact is still relevant today, and if so, if you can recall the PRs or KIPs related to it. That being said, I think that S-1 is too complex in terms of synchronization. It seems to me that the processor would need to block while it waits for the unsubscribe to propagate and return, which would cause throughput to drop significantly. Alternately, we would need to maintain state anyways about which events were sent and which responses returned, while being sure to respect the offset order in which they're emitted. I think this would only reduce blocking slightly while increasing complexity. If I am wrong in understanding this, please let me know where my thinking is erroneous. S-2 could probably be simplified to "for a given key, was the previous propagated result a null/tombstone or not?" It would act very similarly to the hash value mechanism, where we discard any events that are not of the correct hash. In this case, we simply store (key, wasLastOutputATombstone) right before the event is output downstream of the Join + Resolver. This ignores all the complexities of which event is propagating over which wire and simply squelches any extra tombstones from being sent. For storage, we need to use the full primary key and a boolean. However, the table will grow indefinitely large as we can never remove keys from it. If we delete key=k from the table and propagate a tombstone, but later (say 3 weeks, 3 months, etc) we publish (k, baz), but baz does not exist on the RHS, we will end up publishing an extra tombstone because we have no idea what the previously sent record was for k. For this reason I think it's worth asking if we really can maintain state, and if it's even necessary (again, a full understanding of the impact of extra tombstones may help us figure out a better solution). As it stands, I don't think either of these will work well. That being said, I myself do not have any better ideas at the moment, but I would still like to better understand circumstances where it has a negative impact downstream as that may provide some insights. Thanks Adam On Tue, Jul 2, 2019 at 11:18 PM Matthias J. Sax wrote: > Thanks for the example. I was thinking about the problem a little bit, > and I believe we should look at it in some more details. > > Basically, there are 3 cases: > > a) insert new record LHS > b) delete record LHS > c) update exiting record LHS > > For those cases we want different things to happen: > > a-1) sent subscribe message to RHS > a-2) RHS lookup and send result back if there is one > a-3) emit result on LHS if any is returned > > b-1) delete subscription from RHS > b-2) if there was a previous result (can easily be decided by looking up > RHS table for an existing key), send tombstone back > b-3) emit tombstone on LHS if any is returned > > c-1) delete old subscription from RHS > c-2) send new subscription to RHS > c-3) if there was no previous result and there is no new result emit > nothing > c-4) if there was a previous result and there is no new result emit a > tombstone LHS > c-5) if there is a new result (old result may or may not exist), emit > only new result LHS (don't emit a tombstone) > > > Case (a) and (b) are simple and could be implemented with a "fire and > forget" strategy. The LHS just "blindly" updates the subscription, the > RHS can process the subscription with local knowledge and may send > something back. If something is sent back, the LHS blindly emits it. > > We know that for both cases, we never miss anything and we never emit > anything unnecessary. > > However, using this pattern for (c), we don't get our expected result: > > Issues: LHS sends both subscription updates in parallel. It does not > know if zero, one, or two result records will be produced by RHS. If RHS > produces two results, their order is not known (however, LHS would need > to emit them in the right order; also forcing RHS to always send a > result back is not a sufficient solution). If only one result comes > back, it's unclear if a second result may arrive later and thus the > result may need to be buffered... Overall, local knowledge does not seem > to be sufficient to tackle the case. > > The current proposal tries to address the issue with encoding additional > information, to tell the RHS to send a tombstone back or not. But it > does not seem to be perfect, and it might result in unnecessary > tombstones as it still uses local knowledge only and thus misses some > information. > > I think the main problem is, that the knowledge about the a potential > previous result and a potential new result is sharded on the RHS. Hence, > the "unsubscribe" does not know if it needs to send a tombstone back for > the case that there was an old result but there is no new result. > Similarly, the "new subscribe" cannot know if it needs to send a > tombstone or not (as it does not know if there was a previous result) if > it does not match. > > To really solve the issue, I see two possible solutions (both are not > great, but I wanted to discuss them anyway): > > S-1: First unsubscribe, and send new subscription after result comes > back. For this case, the RHS must always send something back to the LHS > on unsubscribe. The answer if "previous result exists/not-exist" can be > added to the new-subscription and hence RHS can either return nothing, a > tombstone, or a new result. The LHS can blindly emit whatever RHS > returns. This would also cover (a) and (b) cases. However, the overall > time to emit the join result is doubled for the (common) update case... > (we need two consecutive round-trips to the RHS). > > S-2: Remember/store if a previous result exists on LHS: for this case, > (a) is handled straightforward, (b) is handled by telling RHS to send > tombstone if previous result exits, and (c) can send both request in > parallel letting the unsubscribe never return anything, and subscribe is > handled as in (b). However, we need a second store on the LHS to > remember if there was a previous result. (Also not sure how > interleaving/inflight computation might affect the algorithm...) > > I think, sending unnecessary tombstones is quite bad (in very old > releases we had a similar issue and fixed it). However, I am also not > 100% sure if the solutions I came up with are good enough to justify > them. (Personally, I slightly tend to prefer S-2 because I think that > the additional store is less of an issue than the increase processing > time). > > Would love to hear your thoughts. > > > -Matthias > > > On 6/28/19 6:19 AM, Adam Bellemare wrote: > > Hi Matthias > > > > Yes, thanks for the questions - I know it's hard to keep up with all of > the > > various KIPs and everything. > > > > The instructions are not stored anywhere, but are simply a way of letting > > the RHS know how to handle the subscription and reply accordingly. > > > > The only case where we send an unnecessary tombstone is (that I can > > tell...) when we do the following: > > RHS: > > (1, bar) > > > > LHS > > (K,1) -> Results in (K, 1, bar) being output > > (K,1) -> (K,2) -> Results in (K, null) being output for INNER (no > matching > > element on LHS) > > (K,2) -> (K,3) -> Results in (K, null) being output for INNER (because > we > > don't maintain state to know we already output the tombstone on the > > previous transition). > > (K,2) -> (K,9000) -> Results in (K, null)... etc. > > > > Byte versioning is going in today, then I hope to get back to addressing > a > > number of John's previous questions in the PR. > > > > Adam > > > > > > On Thu, Jun 27, 2019 at 5:47 PM Matthias J. Sax > > wrote: > > > >> Thanks for bringing this issue to our attention. Great find @Joe! > >> > >> Adding the instruction field to the `subscription` sounds like a good > >> solution. What I don't understand atm: for which case would we need to > >> send unnecessary tombstone? I thought that the `instruction` field helps > >> to avoid any unnecessary tombstone? Seems I a missing case? > >> > >> Also for my own understanding: the `instruction` is only part of the > >> message? It is no necessary to store it in the RHS auxiliary store, > right? > >> > >> About right/full-outer joins. Agreed. Getting left-joins would be > awesome! > >> > >> About upgrading: Good call John! Adding a version byte for subscription > >> and response is good forward thinking. I personally prefer version > >> numbers, too, as they carry more information. > >> > >> Thanks for all the hard to everybody involved! > >> > >> > >> -Matthias > >> > >> On 6/27/19 1:44 PM, John Roesler wrote: > >>> Hi Adam, > >>> > >>> Hah! Yeah, I felt a headache coming on myself when I realized this > >>> would be a concern. > >>> > >>> For what it's worth, I'd also lean toward versioning. It seems more > >>> explicit and more likely to keep us all sane in the long run. Since we > >>> don't _think_ our wire protocol will be subject to a lot of revisions, > >>> we can just use one byte. The worst case is that we run out of numbers > >>> and reserve the last one to mean, "consult another field for the > >>> actual version number". It seems like a single byte on each message > >>> isn't too much to pay. > >>> > >>> Since you point it out, we might as well put a version number on the > >>> SubscriptionResponseWrapper as well. It may not be needed, but if we > >>> ever need it, even just once, we'll be glad we have it. > >>> > >>> Regarding the instructions field, we can also serialize the enum very > >>> compactly as a single byte (which is the same size a boolean takes > >>> anyway), so it seems like an Enum in Java-land and a byte on the wire > >>> is a good choice. > >>> > >>> Agreed on the right and full outer joins, it doesn't seem necessary > >>> right now, although I am happy to see the left join "join" the party, > >>> since as you said, we were so close to it anyway. Can you also add it > >>> to the KIP? > >>> > >>> Thanks as always for your awesome efforts on this, > >>> -John > >>> > >>> On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare < > adam.bellemare@gmail.com> > >> wrote: > >>>> > >>>> You're stretching my brain, John! > >>>> > >>>> I prefer STRATEGY 1 because it solves the problem in a simple way, and > >>>> allows us to deprecate support for older message types as we go (ie, > we > >>>> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1). > >>>> > >>>> STRATEGY 2 is akin to Avro schemas between two microservices - there > are > >>>> indeed cases where a breaking change must be made, and forward > >>>> compatibility will provide us with no out other than requiring a full > >> stop > >>>> and full upgrade for all nodes, shifting us back towards STRATEGY 1. > >>>> > >>>> My preference is STRATEGY 1 with instructions as an ENUM, and we can > >>>> certainly include a version. Would it make sense to include a version > >>>> number in SubscriptionResponseWrapper as well? Currently we don't > have > >> any > >>>> instructions in there, as I removed the boolean, but it is certainly > >>>> plausible that it could happen in the future. I don't *think* we'll > need > >>>> it, but I also didn't think we'd need it for SubscriptionWrapper and > >> here > >>>> we are. > >>>> > >>>> Thanks for the thoughts, and the info on the right-key. That was > >>>> enlightening, though I can't think of a use-case for it *at this point > >> in > >>>> time*. :) > >>>> > >>>> Adam > >>>> > >>>> > >>>> > >>>> On Thu, Jun 27, 2019 at 12:29 PM John Roesler > >> wrote: > >>>> > >>>>> I think I agree with you, right joins (and therefore full outer > joins) > >>>>> don't make sense here, because the result is a keyed table, where the > >>>>> key is the PK of the left-hand side. So, when you have a > >>>>> right-hand-side record with no incoming FK references, you would want > >>>>> to produce a join result like `nullKey: (null, rhsValue)`, but we > >>>>> don't currently allow null keys in Streams. It actually is possible > to > >>>>> define them, and therefore to add right- and full-outer foreign-key > >>>>> joins later, but it's non-trivial in a streaming context with > >>>>> continuously updated results. (See the PS if you're curious what I'm > >>>>> thinking). You're correct, right- and full-outer joins are trivial on > >>>>> our current 1:1 table joins because they are equi-joins. > >>>>> > >>>>> Regarding the transition, it sounds like what you're proposing is > that > >>>>> we would say, "adding a foreign-key join to your topology requires a > >>>>> full application reset (or a new application id)". This is also an > >>>>> acceptable constraint to place on the feature, but not strictly > >>>>> necessary. Since 2.3, it's now possible to give all the state in your > >>>>> application stable names. This means that it's no longer true that > >>>>> adding a node to your topology graph would break its structure, and > it > >>>>> does become possible to add new operators and simply restart the app. > >>>>> Revisiting my prior thought, though, I think the problem is not > >>>>> specific to your feature. For example, adding a new grouped > >>>>> aggregation would produce a new repartition topic, but the > repartition > >>>>> topic partitions might get assigned to old nodes in the middle of a > >>>>> rolling bounce, and they would need to just ignore them. This > >>>>> requirement is the same for the repartition topics in the FK join, so > >>>>> it's orthogonal to your design. > >>>>> > >>>>> Back to the first concern, though, I'm not sure I followed the > >>>>> explanation. As a thought experiment, let's imagine that Joe hadn't > >>>>> taken the time to experiment with your feature branch. We wouldn't > >>>>> have noticed the problem until the feature was already released in > >>>>> 2.4. So the wire protocol on that PK->FK subscription topic would > have > >>>>> been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the > >>>>> problem once they picked up the feature, so we would want to > implement > >>>>> your proposed fix and change the wire protocol to V2: > >>>>> "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we > >>>>> would see both 2.4 nodes encountering V2 messages and 2.5 nodes > >>>>> encountering V1 messages. How can they both detect that they are > >>>>> attempting to process a newer or older protocol? If they can detect > >>>>> it, then what should they do? > >>>>> > >>>>> From experience, there are two basic solutions to this problem: > >>>>> > >>>>> STRATEGY1. Add a protocol version to the message (could be a number > at > >>>>> the start of the message payload, or it could be a number in the > >>>>> message headers, not sure if it matters much. Payload is probably > more > >>>>> compact, since the header would need a name.) In this case, the 2.4 > >>>>> worker would know that it's max protocol version is V1, and when it > >>>>> sees the V2 message, it knows that it can't handle it properly. > Rather > >>>>> than doing something wrong, it would just not do anything. This means > >>>>> it would stop the task, if not shut down the whole instance. On the > >>>>> other hand, a 2.5 worker would have some defined logic for how to > >>>>> handle all versions (V1 and V2), so once the upgrade is complete, all > >>>>> messages can be processed. > >>>>> > >>>>> STRATEGY2. Make the schema forward-compatible. Basically, we ensure > >>>>> that new fields can only be appended to the message schema, and that > >>>>> older workers using only a prefix of the full message would still > >>>>> behave correctly. Using the example above, we'd instead evolve the > >>>>> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set > >>>>> the boolean field to true for the "new" foreign key. Then, 2.4 > workers > >>>>> encountering the a "new FK" message would just see the prefix of the > >>>>> payload that makes sense to them, and they would still continue > >>>>> processing the messages as they always have. Only after the 2.5 code > >>>>> is fully rolled out to the cluster would we be sure to see the > desired > >>>>> behavior. Note: in the reverse case, a 2.5 worker knows how to fully > >>>>> parse the new message format, even if it plans to ignore the BOOLEAN > >>>>> field. > >>>>> > >>>>> There are some tradeoffs between these strategies: STRATEGY1 ensures > >>>>> that all messages are only handled by workers that can properly > handle > >>>>> them, although it results in processing stalls while there are still > >>>>> old nodes in the cluster. STRATEGY2 ensures that all messages can be > >>>>> processed by all nodes, so there are no stalls, but we can never > >>>>> remove fields from the message, so if there are a lot of revisions in > >>>>> the future, the payloads will become bloated. Also, it's not clear > >>>>> that you can actually pull off STRATEGY2 in all cases. If there's > some > >>>>> new kind of message you want to send that has no way to be correctly > >>>>> processed at all under the 2.4 code paths, the prefix thing simply > >>>>> doesn't work. Etc. > >>>>> > >>>>> Also, note that you can modify the above strategies by instead > >>>>> designing the message fields for extensibility. E.g., if you make the > >>>>> instructions field an enum, then you can make sure that the default > >>>>> case is handled sensibly (probably similarly to STRATEGY1, just choke > >>>>> on unknown instructions) and that you never remove an instruction > type > >>>>> from the enum in future versions. > >>>>> > >>>>> Does this make sense? > >>>>> -John > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> PS: > >>>>> We can define null keys for streaming tables, but it's tricky. > >>>>> > >>>>> Specifically, you'd want to define some concept of null keys that > >>>>> allows all null keys to be unique, but _also_ to have a fixed > >>>>> identity, so that a particular null-key can be updated later. One > >>>>> example could be to union the existing keyspace with a new > >>>>> null-keyspace, where normal keys are like "key" and null-keys are > like > >>>>> "null(identity)". Then given a query like > >>>>> "KTable.rightJoin(KTable)", and > >>>>> inputs like: > >>>>> LHS: > >>>>> "a": 1 > >>>>> "b": 2 > >>>>> > >>>>> RHS: > >>>>> 1: true > >>>>> 3: false > >>>>> > >>>>> a full outer join would produce: > >>>>> "a": (1, true) > >>>>> "b": (2, null) > >>>>> null(3): (null, false) > >>>>> > >>>>> which can be correctly updated later if we get an update on the LHS: > >>>>> PUT("c": 3) > >>>>> > >>>>> We'd emit for the results: > >>>>> DELETE(null(e)) > >>>>> EMIT("c": (3, false)) > >>>>> > >>>>> Resulting in the correct result table of: > >>>>> "a": (1, true) > >>>>> "b": (2, null) > >>>>> "c": (3, false) > >>>>> > >>>>> As mentioned, this is tricky, and I would avoid it until we have > >>>>> evidence that it's actually useful to cover this part of the design > >>>>> space. Certainly, it would be a separate KIP if it came to that. > >>>>> > >>>>> On Wed, Jun 26, 2019 at 8:57 PM Adam Bellemare < > >> adam.bellemare@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>> Hi John > >>>>>> > >>>>>> Good thinking with regards to upgrade path between versions > regarding > >>>>>> over-the-wire instructions in SubscriptionWrapper. At this point in > >> time > >>>>> I > >>>>>> can't think of any new wire message instructions, but I would > >> appreciate > >>>>> as > >>>>>> many eyes on it as possible. I have just included the LEFT join in > the > >>>>> last > >>>>>> commit (about 10 min ago) along with INNER join. I do not think that > >>>>> RIGHT > >>>>>> join and OUTER are possible given that there is no LHS key > available, > >> so > >>>>>> LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. > >> This > >>>>> is > >>>>>> in contrast to the current LHSTable.outerJoin(RHSTable), as they are > >> both > >>>>>> keyed on the same key. I have buffed up the Integration tests and > have > >>>>>> tried to make them more readable to ensure that we're covering all > the > >>>>>> scenarios. I think that if we can get more eyes on the workflow > >> showing > >>>>> the > >>>>>> various LHS and RHS events and outputs then that may help us > validate > >>>>> that > >>>>>> we have all the scenarios covered. > >>>>>> > >>>>>> With regards to the 2.3->2.4 scenario you described, I'm not > entirely > >>>>> sure > >>>>>> I follow. If they want to add a FK-join, they will need to rework > >> their > >>>>>> code in the KStreams app and make a new release, since the > underlying > >>>>>> topology would be different and new internal topics would need to be > >>>>>> created. In other words, I don't think a rolling upgrade where the > >> user > >>>>>> introduces a FK join would be possible since their topology would > >>>>>> necessitate a full KStreams reset. Is this what you meant? > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Jun 26, 2019 at 4:10 PM John Roesler > >> wrote: > >>>>>> > >>>>>>> Thanks, Adam! > >>>>>>> > >>>>>>> One unrelated thought that has just now occurred to me is that > >> (unlike > >>>>>>> the equi-joins we currently have), this join logic is potentially > >>>>>>> spread over multiple Streams instances, which in general means that > >>>>>>> the instances may be running different versions of Kafka Streams. > >>>>>>> > >>>>>>> This means that if we discover a bug that requires us to again > change > >>>>>>> the wire message (as you did in this proposal update), we need to > >>>>>>> consider what should happen if the PK instance is newer than the FK > >>>>>>> instance, or vice-versa, during a rolling upgrade. We should think > >>>>>>> ahead to this condition and make sure the logic is forward > >> compatible. > >>>>>>> > >>>>>>> Related: what about the initial case, when we release this feature > >>>>>>> (let's say in 2.4)? What will happen if I decide to adopt 2.4 and > add > >>>>>>> a FK join together in one upgrade. Thus, the 2.4 member of the > >> cluster > >>>>>>> is producing the SubscriptionWrapper messages, and some 2.3 members > >>>>>>> get the subscription topic assigned to them, but they have no idea > >>>>>>> what to do with it? I'm not sure this is a problem; hopefully they > >>>>>>> just do nothing. If it is a problem, it would be fine to say you > have > >>>>>>> to upgrade completely to 2.4 before deploying a FK join. > >>>>>>> > >>>>>>> Just want to make sure we anticipate these issues in case it > affects > >>>>>>> the design at all. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> -John > >>>>>>> > >>>>>>> On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare < > >>>>> adam.bellemare@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Sigh... Forgot the link: > >>>>>>>> > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=78&selectedPageVersions=74 > >>>>>>>> > >>>>>>>> I'll update it when I validate that there are no issues with > >>>>> removing the > >>>>>>>> SubscriptionResponseWrapper boolean. > >>>>>>>> > >>>>>>>> On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare < > >>>>> adam.bellemare@gmail.com > >>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>>> Maybe just call it as (k, leftval, null) or (k, null, rightval)? > >>>>>>>>> Done. > >>>>>>>>> > >>>>>>>>>> if you update the KIP, you might want to send a new "diff link" > >>>>> to > >>>>>>> this > >>>>>>>>> thread > >>>>>>>>> Here it is: > >>>>>>>>> > >>>>>>>>>> Looking closely at the proposal, can you explain more about the > >>>>>>>>> propagateIfNull field in SubscriptionResponseWrapper? It sort of > >>>>> looks > >>>>>>> like > >>>>>>>>> it's always going to be equal to (RHS-result != null). > >>>>>>>>> I believe you are correct, and I missed the forest for the trees. > >>>>> They > >>>>>>> are > >>>>>>>>> effectively the same thing, and I can simply remove the flag. I > >>>>> will > >>>>>>> code > >>>>>>>>> it up and try it out locally just to be sure. > >>>>>>>>> > >>>>>>>>> Thanks again for your help, it is greatly appreciated! > >>>>>>>>> > >>>>>>>>> On Wed, Jun 26, 2019 at 2:54 PM John Roesler > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I think the "scenario trace" is very nice, but has one point > that > >>>>> I > >>>>>>>>>> found confusing: > >>>>>>>>>> > >>>>>>>>>> You indicate a retraction in the join output as (k,null) and a > >>>>> join > >>>>>>>>>> result as (k, leftval, rightval), but confusingly, you also > write > >>>>> a > >>>>>>>>>> join result as (k, JoinResult) when one side is null. Maybe just > >>>>> call > >>>>>>>>>> it as (k, leftval, null) or (k, null, rightval)? That way the > >>>>> readers > >>>>>>>>>> can more easily determine if the results meet their expectations > >>>>> for > >>>>>>>>>> each join type. > >>>>>>>>>> > >>>>>>>>>> (procedural note: if you update the KIP, you might want to send > a > >>>>> new > >>>>>>>>>> "diff link" to this thread, since the one I posted at the > >>>>> beginning > >>>>>>>>>> would not automatically show your latest changes) > >>>>>>>>>> > >>>>>>>>>> I was initially concerned that the proposed algorithm would wind > >>>>> up > >>>>>>>>>> propagating something that looks like a left join (k, leftval, > >>>>> null) > >>>>>>>>>> under the case that Joe pointed out, but after reviewing your > >>>>>>>>>> scenario, I see that it will emit a tombstone (k, null) instead. > >>>>> This > >>>>>>>>>> is appropriate, and unavoidable, since we have to retract the > join > >>>>>>>>>> result from the logical view (the join result is a logical > Table). > >>>>>>>>>> > >>>>>>>>>> Looking closely at the proposal, can you explain more about the > >>>>>>>>>> propagateIfNull field in SubscriptionResponseWrapper? > >>>>>>>>>> It sort of looks like it's always going to be equal to > >>>>> (RHS-result != > >>>>>>>>>> null). > >>>>>>>>>> > >>>>>>>>>> In other words, can we drop that field and just send back > >>>>> RHS-result > >>>>>>>>>> or null, and then handle it on the left-hand side like: > >>>>>>>>>> if (rhsOriginalValueHash doesn't match) { > >>>>>>>>>> emit nothing, just drop the update > >>>>>>>>>> } else if (joinType==inner && rhsValue == null) { > >>>>>>>>>> emit tombstone > >>>>>>>>>> } else { > >>>>>>>>>> emit joiner(lhsValue, rhsValue) > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> To your concern about emitting extra tombstones, personally, I > >>>>> think > >>>>>>>>>> it's fine. Clearly, we should try to avoid unnecessary > >>>>> tombstones, but > >>>>>>>>>> all things considered, it's not harmful to emit some unnecessary > >>>>>>>>>> tombstones: their payload is small, and they are trivial to > handle > >>>>>>>>>> downstream. If users want to, they can materialize the join > >>>>> result to > >>>>>>>>>> suppress any extra tombstones, so there's a way out. > >>>>>>>>>> > >>>>>>>>>> Thanks for the awesome idea. It's better than what I was > thinking. > >>>>>>>>>> -john > >>>>>>>>>> > >>>>>>>>>> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Thanks John. > >>>>>>>>>>> > >>>>>>>>>>> I'm looking forward to any feedback on this. In the meantime I > >>>>> will > >>>>>>>>>> work on > >>>>>>>>>>> the unit tests to ensure that we have well-defined and readable > >>>>>>>>>> coverage. > >>>>>>>>>>> > >>>>>>>>>>> At the moment I cannot see a way around emitting (k,null) > >>>>> whenever > >>>>>>> we > >>>>>>>>>> emit > >>>>>>>>>>> an event that lacks a matching foreign key on the RHS, except > >>>>> in the > >>>>>>>>>>> (k,null) -> (k,fk) case. > >>>>>>>>>>> If this LHS oldValue=null, we know we would have emitted a > >>>>> deletion > >>>>>>> and > >>>>>>>>>> so > >>>>>>>>>>> (k,null) would be emitted out of the join. In this case we > don't > >>>>>>> need to > >>>>>>>>>>> send another null. > >>>>>>>>>>> > >>>>>>>>>>> Adam > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Jun 26, 2019 at 11:53 AM John Roesler < > >>>>> john@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Adam, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the proposed revision to your KIP > >>>>>>>>>>>> ( > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836&selectedPageVersions=77&selectedPageVersions=74 > >>>>>>>>>>>> ) > >>>>>>>>>>>> > >>>>>>>>>>>> in response to the concern pointed out during code review > >>>>>>>>>>>> ( > >>>>> https://github.com/apache/kafka/pull/5527#issuecomment-505137962 > >>>>>>> ) > >>>>>>>>>>>> > >>>>>>>>>>>> We should have a brief discussion thread (here) in the mailing > >>>>>>> list to > >>>>>>>>>>>> make sure everyone who wants to gets a chance to consider the > >>>>>>>>>>>> modification to the design. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> -John > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >> > >> > > > > --00000000000015ed15058cc82ccf--