kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
Date Tue, 21 Jun 2016 01:01:09 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340814#comment-15340814
] 

Guozhang Wang commented on KAFKA-3705:
--------------------------------------

Thanks for the feedbacks!

Re 1: Not sure I fully understand this. I thought you can pass a {{StreamPartitioner}} when
calling {{addSink}} which should be sufficient?

Re 2: We are aware of this, and as discussed in the wiki our current proposal is that we can
use sth. similar to what you mentioned as {{range(K1 prefix)}} and check if {{key.startsWith(prefix)}}
to stop iterating. There are some optimizations with prefix seeking in RocksDB but we need
to contribute back to RocksDB's JNI to make use of it.

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly

Re 3: The idea is that for the repatitioning, we are going to first transform the old key-value
pair <AK, AV> into <PK, <AK, AV>>, but when sending the key-value pair to
the re-partition topic, specify the {{StreamPartitioner}} to partition based on combo <PK,
AK> (remember its assign API takes both the key and value), and let the joiner after the
repartitioning to be applied on <AV> only.

When old value needs to be sent as well, we are going to send the {new, old} pair separately
as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK, AV-old>>,
and still partition on combo <PK-new, AK> and <PK-old, AK>. These two records
may be sent to two different partitions and hence processed by two different processors, which
are expected behavior. Does that look reasonable to you?

> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join
a KTable A by key {{a}} with another KTable B by key {{b}} but with a "foreign key" {{a}},
and assuming they are read from two topics which are partitioned on {{a}} and {{b}} respectively,
they need to do the following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned
on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already partitioned on {{a}},
users still need to do the pre-aggregation in order to make the two joining streams to be
on the same key. This is a draw-back from programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message