kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Mon, 11 Mar 2019 03:31:43 GMT
I agree that the LHS side must encode this information and tell the RHS
if a tombstone requires a reply or not.

>> Would this pose some sort of verbosity problem in the internal topics,
>> especially if we have to rebuild state off of them?

I don't see an issue atm. Can you elaborate how this relates to rebuild
state?


-Matthias

On 3/10/19 12:25 PM, Adam Bellemare wrote:
> Hi Matthias
> 
> I have been mulling over the unsubscribe / delete optimization, and I have
> one main concern. I believe that the RHS can only determine whether to
> propagate the tombstone or not based on the value passed over from the LHS.
> This value would need to be non-null, and so wouldn't the internal
> repartition topics end up containing many non-null "tombstone" values?
> 
> ie:
> Normal tombstone (propagate):     (key=123, value=null)
> Don't-propagate-tombstone:          (key=123, value=("don't propagate me,
> but please delete state"))
> 
> Would this pose some sort of verbosity problem in the internal topics,
> especially if we have to rebuild state off of them?
> 
> Thanks
> 
> Adam
> 
> 
> 
> 
> 
> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> SGTM.
>>
>> I also had the impression that those duplicates are rather an error than
>> an case of eventual consistency. Using hashing to avoid sending the
>> payload is a good idea IMHO.
>>
>> @Adam: can you update the KIP accordingly?
>>
>>  - add the optimization to not send a reply from RHS to LHS on
>> unsubscribe (if not a tombstone)
>>  - explain why using offsets to avoid duplicates does not work
>>  - add hashing to avoid duplicates
>>
>> Beside this, I don't have any further comments. Excited to finally get
>> this in!
>>
>> Let us know when you have updated the KIP so we can move forward with
>> the VOTE. Thanks a lot for your patience! This was a very loooong shot!
>>
>>
>> -Matthias
>>
>> On 3/8/19 8:47 AM, John Roesler wrote:
>>> Hi all,
>>>
>>> This proposal sounds good to me, especially since we observe that people
>>> are already confused when the see duplicate results coming out of 1:1
>> joins
>>> (which is a bug). I take this as "evidence" that we're better off
>>> eliminating those duplicates from the start. Guozhang's proposal seems
>> like
>>> a lightweight solution to the problem, so FWIW, I'm in favor.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <adam.bellemare@gmail.com>
>>> wrote:
>>>
>>>> Hi Guozhang
>>>>
>>>> That would certainly work for eliminating those duplicate values. As it
>>>> stands right now, this would be consistent with swallowing changes due
>> to
>>>> out-of-order processing with multiple threads, and seems like a very
>>>> reasonable way forward. Thank you for the suggestion!
>>>>
>>>> I have been trying to think if there are any other scenarios where we
>> can
>>>> end up with duplicates, though I have not been able to identify any
>> others
>>>> at the moment. I will think on it a bit more, but if anyone else has any
>>>> ideas, please chime in.
>>>>
>>>> Thanks,
>>>> Adam
>>>>
>>>>
>>>>
>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangguoz@gmail.com>
>> wrote:
>>>>
>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to
>> separate
>>>>> this issue with the more general issue that today (not only
>> foreign-key,
>>>>> but also co-partition primary-key) table-table joins is still not
>>>> strictly
>>>>> respecting the timestamp ordering since the two changelog streams may
>> be
>>>>> fetched and hence processed out-of-order and we do not allow a record
>> to
>>>> be
>>>>> joined with the other table at any given time snapshot yet. So ideally
>>>> when
>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2))
>> coming
>>>>> at the left hand table and one record (f-k1, v3) at the right hand
>> table,
>>>>> depending on the processing ordering we may get:
>>>>>
>>>>> (k1, (f-k1, v2-v3))
>>>>>
>>>>> or
>>>>>
>>>>> (k1, (f-k1, v1-v3))
>>>>> (k1, (f-k1, v2-v3))
>>>>>
>>>>> And this is not to be addressed by this KIP.
>>>>>
>>>>> What I would advocate is to fix the issue that is introduced in this
>> KIP
>>>>> alone, that is we may have
>>>>>
>>>>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
>>>>> (k1, (f-k1, v2-v3))
>>>>>
>>>>> I admit that it does not have correctness issue from the semantics
>> along,
>>>>> comparing it with "discarding the first result", but it may be
>> confusing
>>>>> from user's observation who do not expect to see the seemingly
>>>> duplicates.
>>>>> On the other hand, I think there's a light solution to avoid it, which
>> is
>>>>> that we can still optimize away to not send the full payload of "v1"
>> from
>>>>> left hand side to right hand side, but instead of just trimming off the
>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5
>>>>> here just as an example, we can definitely replace it with other
>>>>> functions), by doing which we can discard the join operation if the
>> hash
>>>>> value sent back from the right hand side does not match with the left
>>>> hand
>>>>> side any more, i.e. we will only send:
>>>>>
>>>>> (k1, (f-k1, v2-v3))
>>>>>
>>>>> to down streams once.
>>>>>
>>>>> WDYT?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <
>> adam.bellemare@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ah yes, I recall it all now. That answers that question as to why
I
>> had
>>>>>> caching disabled. I can certainly re-enable it since I believe the
>> main
>>>>>> concern was simply about reconciling those two iterators. A lack
of
>>>>>> knowledge there on my part.
>>>>>>
>>>>>>
>>>>>> Thank you John for weighing in - we certainly both do appreciate
it. I
>>>>>> think that John hits it on the head though with his comment of "If
it
>>>>> turns
>>>>>> out we're wrong about this, then it should be possible to fix the
>>>>> semantics
>>>>>> in place, without messing with the API."
>>>>>>
>>>>>> If anyone else would like to weigh in, your thoughts would be greatly
>>>>>> appreciated.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matthias@confluent.io
>>>
>>>>>> wrote:
>>>>>>
>>>>>>>>> I dont know how to range scan over a caching store, probably
one
>>>> had
>>>>>>>>> to open 2 iterators and merge them.
>>>>>>>
>>>>>>> That happens automatically. If you query a cached KTable, it
ranges
>>>>> over
>>>>>>> the cache and the underlying RocksDB and performs the merging
under
>>>> the
>>>>>>> hood.
>>>>>>>
>>>>>>>>> Other than that, I still think even the regualr join
is broken
>>>> with
>>>>>>>>> caching enabled right?
>>>>>>>
>>>>>>> Why? To me, if you use the word "broker", it implies conceptually
>>>>>>> incorrect; I don't see this.
>>>>>>>
>>>>>>>> I once files a ticket, because with caching
>>>>>>>>>> enabled it would return values that havent been published
>>>>> downstream
>>>>>>> yet.
>>>>>>>
>>>>>>> For the bug report, if found
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need
to
>>>> fix
>>>>>>> this, but it is a regular bug as any other, and we should not
change
>>>> a
>>>>>>> design because of a bug.
>>>>>>>
>>>>>>> That range() returns values that have not been published downstream
>>>> if
>>>>>>> caching is enabled is how caching works and is intended behavior.
Not
>>>>>>> sure why say it's incorrect?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
>>>>>>>>> Thanks Adam,
>>>>>>>>>
>>>>>>>>> *> Q) Range scans work with caching enabled, too.
Thus, there is
>>>> no
>>>>>>>>> functional/correctness requirement to disable caching.
I cannot
>>>>>>>>> remember why Jan's proposal added this? It might be an
>>>>>>>>> implementation detail though (maybe just remove it from
the KIP?
>>>>>>>>> -- might be miss leading).
>>>>>>>>
>>>>>>>> I dont know how to range scan over a caching store, probably
one
>>>> had
>>>>>>>> to open 2 iterators and merge them.
>>>>>>>>
>>>>>>>> Other than that, I still think even the regualr join is broken
with
>>>>>>>> caching enabled right? I once files a ticket, because with
caching
>>>>>>>> enabled it would return values that havent been published
>>>> downstream
>>>>>> yet.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>
>>
> 


Mime
View raw message