Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 19E0D200B40 for ; Fri, 1 Jul 2016 09:28:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 185E2160A61; Fri, 1 Jul 2016 07:28:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3BF17160A5D for ; Fri, 1 Jul 2016 09:28:13 +0200 (CEST) Received: (qmail 60836 invoked by uid 500); 1 Jul 2016 07:28:11 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 60545 invoked by uid 99); 1 Jul 2016 07:28:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2016 07:28:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 58E922C02A2 for ; Fri, 1 Jul 2016 07:28:11 +0000 (UTC) Date: Fri, 1 Jul 2016 07:28:11 +0000 (UTC) From: "Jan Filipiak (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (KAFKA-3705) Support non-key joining in KTable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 01 Jul 2016 07:28:14 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566 ] Jan Filipiak edited comment on KAFKA-3705 at 7/1/16 7:28 AM: ------------------------------------------------------------- Hi, yes that is kinda where I am coming from. I completely understand where you are. Doing the change log case ( logging Change<> objects) is just one implementation of this repartitioning and mine is another one. I am very familiar with my approach as I wrote some Samza apps using this approach. It has many benefits that may or may not be of interest. (repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make state HA, see previous) etc.). What we are still missing here is a mutual understanding of what I think keywidening does and how to expose that to users in a non insane manner. Maybe I try it with your Json syntax. This is the very example we have and where this tickets feature would allow me to build it in the dsl level of the api. So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => > this will then be read by our application servers and servers them as a faster way to retrieves this than lets say the original mysql. B has foreign keys in A and C. All tables start of as one topic. keyed by this tables primary key Topic mysq__jadajadajada_A A.PK => A Topic mysq_B B.PK => B Topic mysq_C C.PK => C I am going to repartition B to A.PK now. In the first example without a widened key. Then it stays B.PK => B but partitioned by A.PK accordingly. then I can do the join with A and get B.PK => joined as of your previous comment: {quote} Then a join result of \{a="a1", joined = join("a1-pre", "c1")} {quote} Note the Key stays B.PK (unwindened). Now I am going to repartition based on C.PK still maintaining B.PK => joined as the topic layout. Now, shit hits the fan. As I am doing my aggregation to become C,PK => List> How would this aggregator looks now? {code:java} List> apply(B key, Joined value, List> current) { Map m = listToMap(current, bKeyExtractorValueMapper,B.PK>); if(value == null) { m.remove(key) }else { m.put(key,value) } return m.entrySet.asList } {code} This wouldn't be much different with logged Changes> only the remove and add would be to methods. The problem is, that it doesn't look wrong. But this code now has race conditions. Think about an update to the A.PK field of a B record that forces it to switch partitions. (the C.PK value remains) then we publish a delete to the old partition and the new value to the new partition. Then we do the join. then we repartition on the non changed C.PK. This will make out code above see B.PK => null /remove B.PK => Joined /add in no particular order. Hence the output is undefined. If we had forcefully by api widened the key to be Joined the error would not happen and users would be aware of what happens on repartitioning. I thought this through and it also happens with logging Change<>, as it is really just another implementation. I hope this finally clarifies that key widening I am talking about. If not, maybe we should have a small skype or something. My recommendation is further to not implement this joins as logged Changes<> as it is just more resource intensive and less efficient also making the api more complicated. PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are applicable to streams aswell. Maybe have them in the back of your head. was (Author: jfilipiak): Hi, yes that is kinda where I am coming from. I completely understand where you are. Doing the change log case ( logging Change<> objects) is just one implementation of this repartitioning and mine is another one. I am very familiar with my approach as I wrote some Samza apps using this approach. It has many benefits that may or may not be of interest. (repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make state HA, see previous) etc.). What we are still missing here is a mutual understanding of what I think keywidening does and how to expose that to users in a non insane manner. Maybe I try it with your Json syntax. This is the very example we have and where this tickets feature would allow me to build it in the dsl level of the api. So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => > this will then be read by our application servers and servers them as a faster way to retrieves this than lets say the original mysql. B has foreign keys in A and C. All tables start of as one topic. keyed by this tables primary key Topic mysq__jadajadajada_A A.PK => A Topic mysq_B B.PK => B Topic mysq_C C.PK => C I am going to repartition B to A.PK now. In the first example without a widened key. Then it stays B.PK => B but partitioned by A.PK accordingly. then I can do the join with A and get B.PK => joined as of your previous comment: {quote} Then a join result of {a="a1", joined = join("a1-pre", "c1")} {quote} Note the Key stays B.PK (unwindened). Now I am going to repartition based on C.PK still maintaining B.PK => joined as the topic layout. Now, shit hits the fan. As I am doing my aggregation to become C,PK => List> How would this aggregator looks now? {code:java} List> apply(B key, Joined value, List> current) { Map m = listToMap(current, bKeyExtractorValueMapper,B.PK>); if(value == null) { m.remove(key) }else { m.put(key,value) } return m.entrySet.asList } {code} This wouldn't be much different with logged Changes> only the remove and add would be to methods. The problem is, that it doesn't look wrong. But this code now has race conditions. Think about an update to the A.PK field of a B record that forces it to switch partitions. (the C.PK value remains) then we publish a delete to the old partition and the new value to the new partition. Then we do the join. then we repartition on the non changed C.PK. This will make out code above see B.PK => null /remove B.PK => Joined /add in no particular order. Hence the output is undefined. If we had forcefully by api widened the key to be Joined the error would not happen and users would be aware of what happens on repartitioning. I thought this through and it also happens with logging Change<>, as it is really just another implementation. I hope this finally clarifies that key widening I am talking about. If not, maybe we should have a small skype or something. My recommendation is further to not implement this joins as logged Changes<> as it is just more resource intensive and less efficient also making the api more complicated. PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are applicable to streams aswell. Maybe have them in the back of your head. > Support non-key joining in KTable > --------------------------------- > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join a KTable A by key {{a}} with another KTable B by key {{b}} but with a "foreign key" {{a}}, and assuming they are read from two topics which are partitioned on {{a}} and {{b}} respectively, they need to do the following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already partitioned on {{a}}, users still need to do the pre-aggregation in order to make the two joining streams to be on the same key. This is a draw-back from programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)