kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jan Filipiak (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
Date Tue, 28 Jun 2016 00:12:57 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352103#comment-15352103

Jan Filipiak commented on KAFKA-3705:

I will just shoot a quick reply now, time somehow became sparse recently. Anyhow. The bottom
line of our misunderstandings is always the same thing. My bad that I didn't see the wiki
page, if that Range-query interface is addressed that's nice :D.

Point 3 is the one that causes the most confusion I guess. In the repartition case we follow
different pathes, where I am not sure that I was able to communicate mine well enough. I <3
the idea of having everything a derived store. ITE all this is beeing used to tail -F mysql-XXXX.bin
| kafka | XXX | redis, therefore Redis become a derived store of mysql wich can be used for
NoSql style reads. I infact am such a great fan of this concept that I tend to treat everything
a derived store. For me this means a repartitioned topic is a derived store of the source
topic. This stands in contrast to make a changelog out of it and materialize the changelog
in say RocksDb. This leads to the "problem" that the changelog topic is not a derived store
anymore. Wich gives me a personally bad feeling, it just pushes me out of my comfort zone.
Confluent peeps seem to be in their comfort zone with change logging topics. In my narrative
shit hits the fan when the property of beeing a derived store is lost. It leads to all the
nasty things like beeing in the need of change logging your say RocksDbs as the intermidate
topic wont hold stuff forever. 

In contrast to having a change-logging topic that I re-materialize and then changecapture
again, I prefer todo the change capturing first and only maintain the state to wich downstream
partitions a record is currently published. This works clean and nicely but brings with it
what I call "key widening". Say I have KTable A and i want to repartition it to A' so that
the topic containing A' is a derived store & logcompacted. Then I cant use Key<A>
todo this for 2 reasons. The Stream partition, can only access the key to determine the partition
to delete from  (deletes come as null values), wich means the fields going to determine the
partitions need to be in the key no matter what. Snippet:

		topology.addSink(name, repartitionTopicName, new StreamPartitioner<K, VR>(){
			private Serializer<KL> intermediateSerializer = intermediateSerde.serializer();
			public Integer partition(K key, VR value, int numPartitions) {
				KL newKey = intermideateKeyExtractor.apply(key);
				//Copied from Default Partitioner, didn't want to create a CLUSTER object here to reuse
				return (Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey))
% numPartitions )& 0x7fffffff;
		}, repartitionProcessorName);

As you can see the result Key K contains the KL ( the key of the not repatitioned table).

the second reason why this key must be there is that one needs to be able to build a derived
stream A''. But since in A' a record can "move" from partition X to Y there is a race condition
between the "insert" in Y and the delete in X. The repartitioner Processor repartitioning
for A'' needs to treat them as different keys. If it would be the same key the delete would
wipe the new value maybe. This puts downstream consumers of A'' also in the wired position
that at any point in time there can be as many A-keys with the same value as there are A'
partitons -1 or a specific A key might vanish completly and then reappear. Wich is sometimes
wanky to work around in the end application. But there is enough strategies to solve at least
the multiple Akeys case, not so much for the complete fanish case. I hope this clarrifies

> Support non-key joining in KTable
> ---------------------------------
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For:
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join
a KTable A by key {{a}} with another KTable B by key {{b}} but with a "foreign key" {{a}},
and assuming they are read from two topics which are partitioned on {{a}} and {{b}} respectively,
they need to do the following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned
on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already partitioned on {{a}},
users still need to do the pre-aggregation in order to make the two joining streams to be
on the same key. This is a draw-back from programability and we should fix it.

This message was sent by Atlassian JIRA

View raw message