kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jan Filipiak (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-3705) Support non-key joining in KTable
Date Tue, 21 Jun 2016 00:36:57 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340779#comment-15340779
] 

Jan Filipiak edited comment on KAFKA-3705 at 6/21/16 12:36 AM:
---------------------------------------------------------------

A few things I came accross building the current implementation based on the processor API.

1. Partitioning
I ended up with the need of passing an additional ValueMapper<K,K1> into the method.
I had to use it in the Sinks partitioner to extract the _partition/join-key_ from the key
that is used for the repartition topic. It had to be extracted from the key as I still need
to be able to pass nullvalues to the correct partition for deletes. This came from not knowing
the number of partitions in the processor but only in the partitoner, this made the "API"
kinda complicated. 

2. Range Select
This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator<K,V>
range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator<K,V>
range(K1 prefix) where Serde<K1> needs to produce prefixbytes of Serde<K>

3. Key expansion
After a join in this fashion, the key is what I started refering to as widened. Say you have
KTable<AK,AV> and it is the table that needs to be repartitioned and KP is the repartition
key, then, independently on the other table the new Key of the table must include KP and AK,
wich is a wired thing compared to the traditonal relational database way. Imagin having a
result table as KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique
key AK is not unique anymore, the processor might see the insert in the one partition before
the delete in the other (eg when the rows KP was update). I think this should be embrased,
because that is how it is. It should just be apparent for the user maybe as it needs to be
dealt with in downstream processors.

Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate
the beauty of the threading model but stiching graphs together based on processornames and
strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels
and looks so much better than what is out there spark or storm. Watching their desprate attempts
to put state in is a joy. Nice work. As soon as our implementation is hardend in production,
Ill probably can share.



was (Author: jfilipiak):
A few things I came accross building the current implementation based on the processor API.

1. Partitioning
I ended up with the need of passing an additional ValueMapper<K,K1> into the method.
I had to use it in the Sinks partitioner to extract the _partition/join-key_ from the key
that is used for the repartition topic. It had to be extracted from the key as I still need
to be able to pass nullvalues to the correct partition for deletes. This came from not knowing
the number of partitions in the processor but only in the partitoner, this made the "API"
kinda complicated. 

2. Range Select
This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator<K,V>
range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator<K,V>
range(K1 prefix) where Serde<K1> needs to produce prefixbytes of Serde<K>

3. Key expansion
After a join in this fashion, the key is what I started refering to as widened. Say you have
KTable<AK,AV> and it is the table that needs to be repartitioned and KP is the repartition
key, then, independently on the other table the new Key of the table must include KR and AK,
wich is a wired thing compared to the traditonal relational database way. Imagin having a
result table as KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique
key AK is not unique anymore, the processor might see the insert in the one partition before
the delete in the other (eg when the rows KP was update). I think this should be embrased,
because that is how it is. It should just be apparent for the user maybe as it needs to be
dealt with in downstream processors.

Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate
the beauty of the threading model but stiching graphs together based on processornames and
strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels
and looks so much better than what is out there spark or storm. Watching their desprate attempts
to put state in is a joy. Nice work. As soon as our implementation is hardend in production,
Ill probably can share.


> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join
a KTable A by key {{a}} with another KTable B by key {{b}} but with a "foreign key" {{a}},
and assuming they are read from two topics which are partitioned on {{a}} and {{b}} respectively,
they need to do the following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned
on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already partitioned on {{a}},
users still need to do the pre-aggregation in order to make the two joining streams to be
on the same key. This is a draw-back from programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message