kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jan Filipiak (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-3705) Support non-key joining in KTable
Date Fri, 01 Jul 2016 07:28:11 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566
] 

Jan Filipiak edited comment on KAFKA-3705 at 7/1/16 7:28 AM:
-------------------------------------------------------------

Hi, yes that is kinda where I am coming from. I completely understand where you are. 
Doing the change log case ( logging Change<> objects) is just one implementation of
this repartitioning and mine is another one. I am very familiar with my approach as I wrote
some Samza apps using this approach. It has many benefits that may or may not be of interest.
(repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make
state HA, see previous) etc.). What we are still missing here is a mutual understanding of
what I think keywidening does and how to expose that to users in a non insane manner.

Maybe I try it with your Json syntax. This is the very example we have and where this tickets
feature would allow me to build it in the dsl level of the api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => <C,List<Join<A,B>>
this will then be read by our application servers and servers them as a faster way to retrieves
this than lets say the original mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined<B,A>

as of your previous comment:
{quote}
Then a join result of
\{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined<B,A>
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List<Joined<A,B>>

How would this aggregator looks now?

{code:java}
List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>>
current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>);
   if(value == null)
   { 
      m.remove(key)
   }else
   {
     m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes<Joined<A,B>> only the remove
and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the A.PK field
of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the new value to
the new partition. Then we do the join. then we repartition on the non changed C.PK. This
will make out code above see B.PK => null /remove B.PK => Joined<A,B> /add in
no particular order. Hence the output is undefined. If we had forcefully by api widened the
key to be Joined<A.PK,B.PK> the error would not happen and users would be aware of what
happens on repartitioning. I thought this through and it also happens with logging Change<>,
as it is really just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, maybe we should
have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as it is
just more resource intensive and less efficient also making the api more complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are
applicable to streams aswell. Maybe have them in the back of your head.








was (Author: jfilipiak):
Hi, yes that is kinda where I am coming from. I completely understand where you are. 
Doing the change log case ( logging Change<> objects) is just one implementation of
this repartitioning and mine is another one. I am very familiar with my approach as I wrote
some Samza apps using this approach. It has many benefits that may or may not be of interest.
(repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make
state HA, see previous) etc.). What we are still missing here is a mutual understanding of
what I think keywidening does and how to expose that to users in a non insane manner.

Maybe I try it with your Json syntax. This is the very example we have and where this tickets
feature would allow me to build it in the dsl level of the api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => <C,List<Join<A,B>>
this will then be read by our application servers and servers them as a faster way to retrieves
this than lets say the original mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined<B,A>

as of your previous comment:
{quote}
Then a join result of
{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined<B,A>
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List<Joined<A,B>>

How would this aggregator looks now?

{code:java}
List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>>
current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>);
   if(value == null)
   { 
      m.remove(key)
   }else
   {
     m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes<Joined<A,B>> only the remove
and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the A.PK field
of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the new value to
the new partition. Then we do the join. then we repartition on the non changed C.PK. This
will make out code above see B.PK => null /remove B.PK => Joined<A,B> /add in
no particular order. Hence the output is undefined. If we had forcefully by api widened the
key to be Joined<A.PK,B.PK> the error would not happen and users would be aware of what
happens on repartitioning. I thought this through and it also happens with logging Change<>,
as it is really just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, maybe we should
have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as it is
just more resource intensive and less efficient also making the api more complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are
applicable to streams aswell. Maybe have them in the back of your head.







> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join
a KTable A by key {{a}} with another KTable B by key {{b}} but with a "foreign key" {{a}},
and assuming they are read from two topics which are partitioned on {{a}} and {{b}} respectively,
they need to do the following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned
on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already partitioned on {{a}},
users still need to do the pre-aggregation in order to make the two joining streams to be
on the same key. This is a draw-back from programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message