kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tomas Forsman (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
Date Thu, 02 Sep 2021 14:35:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408786#comment-17408786
] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 2:34 PM:
----------------------------------------------------------------

Hi [~guozhang] and [~abellemare], thank you for your answers.

Yes, A and B use a partitioner that use the same value from respective topic key.

In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate
events would not come - it would be fine. We want the final result. What we're seeing though
is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal topics created
by the join. Expected is that all would have 66 ids. As said, running with same data with
one partition create a perfect match where all events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
Attached KafkaTest.java, but we've written several junit test cases with the TopologyTestDriver
and different amount of test data but are unable to reproduce the problem. (does the test
driver consider several partitions?)

However, in a local docker environment we can reproduce the above scenario every time when
using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id "ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate
events would not come - it would be fine. We want the final result. What we're seeing though
is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal topics created
by the join. Expected is that all would have 66 ids. As said, running with same data with
one partition create a perfect match where all events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
Attached KafkaTest.java, but we've written several junit test cases with the TopologyTestDriver
and different amount of test data but are unable to reproduce the problem. (does the test
driver consider several partitions?)

However, in a local docker environment we can reproduce the above scenario every time when
using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id "ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Priority: Major
>         Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put both A and
B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem like it
has something to do with the foreign key join in combination with several partitions. 
> One suspicion would be that it is not possible to define what partitioner to use for
the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
>   return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = Repartitioned.as("driverperiod");
>     return repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner()
{
>     return (topic, key, value, numPartitions) -> Math.abs(key.getKeyB().hashCode())
% numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> joinMaterialized(String
name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>>
table = Materialized.as(name);
>     return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message