kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.
Date Wed, 05 Sep 2018 00:36:34 GMT

I am just catching up on this thread. I did not read everything so far,
but want to share couple of initial thoughts:

Headers: I think there is a fundamental difference between header usage
in this KIP and KP-258. For 258, we add headers to changelog topic that
are owned by Kafka Streams and nobody else is supposed to write into
them. In fact, no user header are written into the changelog topic and
thus, there are not conflicts.

Nevertheless, I don't see a big issue with using headers within Streams.
As long as we document it, we can have some "reserved" header keys and
users are not allowed to use when processing data with Kafka Streams.
IMHO, this should be ok.

> I think there is a safe way to avoid conflicts, since these headers are
> only needed in internal topics (I think):
> For internal and changelog topics, we can namespace all headers:
> * user-defined headers are namespaced as "external." + headerKey
> * internal headers are namespaced as "internal." + headerKey

While name spacing would be possible, it would require to deserialize
user headers what implies a runtime overhead. I would suggest to no
namespace for now to avoid the overhead. If this becomes a problem in
the future, we can still add name spacing later on.

My main concern about the design it the type of the result KTable: If I
understood the proposal correctly,

KTable<K1,V1> table1 = ...
KTable<K2,V2> table2 = ...

KTable<K1,V3> joinedTable = table1.join(table2,...);

implies that the `joinedTable` has the same key as the left input table.
IMHO, this does not work because if table2 contains multiple rows that
join with a record in table1 (what is the main purpose of a foreign key
join), the result table would only contain a single join result, but not


table1 input stream: <A,X>
table2 input stream: <a,(A,1)>, <b,(A,2)>

We use table2 value a foreign key to table1 key (ie, "A" joins). If the
result key is the same key as key of table1, this implies that the
result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
Because the share the same key, whatever result record we emit later,
overwrite the previous result.

This is the reason why Jan originally proposed to use a combination of
both primary keys of the input tables as key of the output table. This
makes the keys of the output table unique and we can store both in the
output table:

Result would be <A-a, join(X,1)>, <A-b, join(X,2)>



On 9/4/18 1:36 PM, Jan Filipiak wrote:
> Just on remark here.
> The high-watermark could be disregarded. The decision about the forward
> depends on the size of the aggregated map.
> Only 1 element long maps would be unpacked and forwarded. 0 element maps
> would be published as delete. Any other count
> of map entries is in "waiting for correct deletes to arrive"-state.
> On 04.09.2018 21:29, Adam Bellemare wrote:
>> It does look like I could replace the second repartition store and
>> highwater store with a groupBy and reduce.  However, it looks like I
>> would
>> still need to store the highwater value within the materialized store, to
>> compare the arrival of out-of-order records (assuming my understanding of
>> THIS is correct...). This in effect is the same as the design I have now,
>> just with the two tables merged together.

View raw message