kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS] KIP-213 Support non-key joining in KTable
Date Mon, 18 Dec 2017 11:59:13 GMT
Hi Guozhang,

  thanks for the update.

On 15.12.2017 22:54, Guozhang Wang wrote:
> Jan,
> Thanks for the updated KIP, and the raised questions. Here are my thoughts
> around the "back and forth mapper" approach on your wiki:
> 1) regarding the key-value types of KTableValueGetter, we do not
> necessarily enforce its template K, V to be the same as its calling
> Processor<K, V>, although today in all implementations we happen to do so.
> So I think it is ok to extend this internal implementation to allow getter
> and forwarding with different types.
I am not entirely sure how you mean this. The dependencies is only there 
the downstream processor is going to invoke the ValueGetter with the 
downstream key.
if this key is of different type we would run into a Runtime Exception. 
We had to introduce
a fourth generic type. The "new key type" wich would also be the access 
type of the
Value Getter. I like introducing this a lot but I don't think it works 
w/o 4th generic type
and then we have
KTableProcessorSupplier<KEY_IN,K_OUT,VALUE_IN,VALUE_OUT> extends 

view ValueGetterSupplier<KEY_OUT,VALUE_OUT>()
processor ProcessorSupplier<KEY_IN,VALUE_IN>()


This would conveniently allow for some flatmap() on Ktable wich is a 
neat thing IMO
> 2) regarding the KTableProcessorSupplier enforcing to return the same
> key/value types of its "KTableValueGetterSupplier<K, T> view();" the key
> observation to note is that "ProcessorSupplier<?, ?>" inside "KTableImpl<K,
> V>" does not enforce to have the same key-value types of the KTable, i.e.
> we can use a "ProcessorSupplier<K1, V1>" inside the impl of a `KTable<K,
> V>`. I think that should help getting around the issue.
I think it has nothing really todo with places where the 
ProccessorSupllier is referenced
but this is quircked inside KTableProccessorSuplier. regardless of the 
scope of usage I cannot
come up with a KTableProccessorSupplier that changes keys while 
maintaining all invariants (beeing querieable). One
can jump across with a ProcessorSupplier where its obvious that you 
can't have a ValueGetterSupplier, but this is
rather a hack.

Why is it only inside KTableProcessorSupplier? We process key K, and 
then we forward K1, but our ValueGetterSupplier
can only have K as Generic and therefore is will crash if we invoke the 

> 3) About the alternative KTable::mapKeys(), I think the major issue is that
> this mapKeys() cannot enforce users to always call it to get the
> "non-combined" Key, and hence users may still need to consider the serde of
> "CombinedKey" if they do not call mapKeys and then directly pipe it to the
> output, while this approach enforce them to always "map" it before trying
> to write it to anywhere externally exposable.
It cannot force them, but folks who want this can do it. People that are
fine with any Combinedkey type could just let it be forwarded as such.

A new aspect that I had not thought of as yet is that of course in an
to() call they could pass in a CombinedKeySerde on their own. I think
this flexibility is a plus rather than a minus. What do you think?
> 4) A very minor comment on the wiki page itself, about the "back and forth
> mapper" section: the parameter names "customCombinedKey" and "combinedKey"
> seems a bit hard to understand to normal users; should we consider renaming
> them to something more understandable? For example, "outputKeyCombiner" and
> "outputKeySpliter"?
yes your naming is superior.
> Guozhang
> On Thu, Dec 7, 2017 at 3:58 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>> On 05.12.2017 00:42, Matthias J. Sax wrote:
>>> Jan,
>>> The KTableValueGetter thing is a valid point. I think we would need a
>>> backwards mapper (or merge both into one and sacrifices lambdas?).
>>> Another alternative would be, to drop the optimization and materialize
>>> the KTable.operator() result... (not a great solution either). I am
>>> personally fine with a backwards mapper (we should call it KeySplitter).
>>> 2. I am not sure if we can pull it of w/o said forth generic type in
>>>>> KTable (that I am in favour of btw)
>>>> Not sure if I can follow here. I am personally not worried about the
>>> number of generic types -- it's just to have a clear definition what
>>> each passed parameter does.
>> I need to double check this again. Its good that we are open to introduce
>> a new one
>> I think it will not work currently as a KTableProcessorSupplier when asked
>> for a
>> ValueGetterSupplier it can only return a ValueGetter Supplier that has the
>> same Keytype
>> as the key it receives in the process method. Even though it would forward
>> a different
>> key type and therefore KTables key Type can't change. I am thinking how to
>> pull this off but I see little chance
>> But I am always in big favour of introducing the forth type OutputKey, it
>> would become
>> straight forward then. I hope you can follow.
>> + It won't solves peoples problem having CombinedKey on the wire and not
>>>> being able to inspect the topic with say there default tools.
>>> I see your point, but do we not have this issue always? To make range
>>> scan work, we need to serialize the prefix (K1) and suffix (K)
>>> independently from each other. IMHO, it would be too much of a burden to
>>> the user, to provide a single serialized for K0 that guaranteed the
>>> ordering we need. Still, advanced user can provide custom Serde for the
>>> changelog topic via `Joined` -- and they can serialize as they wish (ie,
>>> get CombinedKey<K1,K>, convert internally to K0 and serialized -- but
>>> this is an opt-in).
>>> I think, this actually aligns with what you are saying. However, I think
>>> the #prefix() call is not the best idea. We can just use Serde<K1> for
>>> this (if users overwrite CombinedKey-Serde, it must overwrite Serde<K1>
>>> too and can return the proper perfix (or do I miss something?).
>> I can't follow. For the stock implementation user would get
>> they wouldn't need prefix. Users had not to define it we can implement
>> that ourself by just getting K1 Serde.
>> But to Override with a custom Serde that prefix method is needed as an
>> indicator if only a prefix or the full thing is to be rendered.
>>>    - Id rather introduce KTable::mapKeys() or something (4th generic in
>>>> Ktable?) than overloading. It is better SOCs wise.
>>> What overload are you talking about? From my understanding, we want to
>>> add one single method (or maybe one for inner,left,outter each), but I
>>> don't see any overloads atm?
>> The back and forth mapper would get an overload
>>> Also, `KTable.mapKeys()` would have the issue, that one could create an
>>> invalid KTable with key collisions. I would rather shield users to shoot
>>> themselves in the foot.
>> This mapkeys would not be used to remove the actual values but to get rid
>> of the CombinedKey-type.
>> Users can shoot themself with the proposed back and forth mapper you
>> suggested.
>>> Side remark:
>>> In the KIP, in the Step-by-Step table (that I really like a lot!) I
>>> think in line 5 (input A, with key A2 arrives, the columns "state B
>>> materialized" and "state B other task" should not be empty but the same
>>> as in line 4?
>> Will double check tonight. totally plausible i messed this up!
>> best Jan
>>> -Matthias
>>> On 11/25/17 8:56 PM, Jan Filipiak wrote:
>>>> Hi Matthias,
>>>> 2 things that pop into my mind sunday morning. Can we provide an
>>>> KTableValueGetter when key in the store is different from the key
>>>> forwarded?
>>>> 1. we would need a backwards mapper
>>>> 2. I am not sure if we can pull it of w/o said forth generic type in
>>>> KTable (that I am in favour of btw)
>>>> + It won't solves peoples problem having CombinedKey on the wire and not
>>>> beeing able to inspect the topic with say there default tools.
>>>>    - Id rather introduce KTable::mapKeys() or something (4th generic in
>>>> Ktable?) than overloading. It is better SOCs wise.
>>>> I am thinking more into an overload where we replace the Comined key
>>>> Serde. So people can use a default CombinedKey Serde
>>>> but could provide an own implementation that would internally use K0 vor
>>>> serialisation and deserialisation. One could implement
>>>> a ##prefix() into this call to make explicit that we only want the
>>>> prefix rendered. This would take CombinedKey logic out of publicly
>>>> visible
>>>> data. A Stock CombinedKey Serde that would be used by default could also
>>>> handle the JSON users correctly.
>>>> Users would still get CombinedKey back. The downside of getting these
>>>> nested deeply is probably mitgated by users doing a group by
>>>> in the very next step to get rid of A's key again.
>>>> That is what I was able to come up with so far.
>>>> Let me know. what you think
>>>> On 22.11.2017 00:14, Matthias J. Sax wrote:
>>>>> Jan,
>>>>> Thanks for explaining the Serde issue! This makes a lot of sense.
>>>>> I discussed with Guozhang about this issue and came up with the
>>>>> following idea that bridges both APIs:
>>>>> We still introduce CombinedKey as a public interface and exploit it to
>>>>> manage the key in the store and the changelog topic. For this case we
>>>>> can construct a suitable Serde internally based on the Serdes of both
>>>>> keys that are combined.
>>>>> However, the type of the result table is user defined and can be
>>>>> anything. To bridge between the CombinedKey and the user defined result
>>>>> type, users need to hand in a `ValueMapper<CombinedKey, KO>` that
>>>>> convert the CombinedKey into the desired result type.
>>>>> Thus, the method signature would be something like
>>>>> <KO, VO, K1, V1> KTable<KO,VO> oneToManyJoin(>     KTable<K1,
V1> other,
>>>>>>        ValueMapper<V1, K> keyExtractor,>     ValueJoiner<V,
V1, VO>
>>>>>> joiner,
>>>>>>        ValueMapper<CombinedKey<K,K1>, KO> resultKeyMapper);
>>>>> The interface parameters are still easy to understand and don't leak
>>>>> implementation details IMHO.
>>>>> WDYT about this idea?
>>>>> -Matthias
>>>>> On 11/19/17 11:28 AM, Guozhang Wang wrote:
>>>>>> Hello Jan,
>>>>>> I think I get your point about the cumbersome that CombinedKey would
>>>>>> introduce for serialization and tooling based on serdes. What I'm
>>>>>> wondering is the underlying of joinPrefixFakers mapper: from your
>>>>>> latest
>>>>>> comment it seems this mapper will be a one-time mapper: we use this
>>>>>> to map
>>>>>> the original resulted KTable<combined<K1, K2>, V0> to
KTable<K0, V0>
>>>>>> and
>>>>>> then that mapper can be thrown away and be forgotten. Is that true?
>>>>>> original thought is that you propose to carry this mapper all the
>>>>>> along
>>>>>> the rest of the topology to "abstract" the underlying combined keys.
>>>>>> If it is the other way (i.e. the former approach), then the diagram
>>>>>> these two approaches would be different: for the less intrusive
>>>>>> approach we
>>>>>> would add one more step in this diagram to always do a mapping after
>>>>>> the
>>>>>> "task perform join" block.
>>>>>> Also another minor comment on the internal topic: I think many
>>>>>> readers may
>>>>>> not get the schema of this topic, so it is better to indicate that
>>>>>> would be the key of this internal topic used for compaction, and
>>>>>> would
>>>>>> be used as the partition-key.
>>>>>> Guozhang
>>>>>> On Sat, Nov 18, 2017 at 2:30 PM, Jan Filipiak<Jan.Filipiak@trivago.com
>>>>>> wrote:
>>>>>> -> it think the relationships between the different used types,
>>>>>>> K0,K1,KO
>>>>>>> should be explains explicitly (all information is there implicitly,
>>>>>>> but
>>>>>>> one need to think hard to figure it out)
>>>>>>> I'm probably blind for this. can you help me here? how would
>>>>>>> formulate
>>>>>>> this?
>>>>>>> Thanks,
>>>>>>> Jan
>>>>>>> On 16.11.2017 23:18, Matthias J. Sax wrote:
>>>>>>> Hi,
>>>>>>>> I am just catching up on this discussion and did re-read
the KIP and
>>>>>>>> discussion thread.
>>>>>>>> In contrast to you, I prefer the second approach with CombinedKey
>>>>>>>> return type for the following reasons:
>>>>>>>>      1) the oneToManyJoin() method had less parameter
>>>>>>>>      2) those parameters are easy to understand
>>>>>>>>      3) we hide implementation details (joinPrefixFaker,
>>>>>>>> leftKeyExtractor,
>>>>>>>> and the return type KO leaks internal implementation details
from my
>>>>>>>> point of view)
>>>>>>>>      4) user can get their own KO type by extending CombinedKey
>>>>>>>> interface
>>>>>>>> (this would also address the nesting issue Trevor pointed
>>>>>>>> That's unclear to me is, why you care about JSON serdes?
What is the
>>>>>>>> problem with regard to prefix? It seems I am missing something
>>>>>>>> I also don't understand the argument about "the user can
stick with
>>>>>>>> his
>>>>>>>> default serde or his standard way of serializing"? If we
>>>>>>>> `CombinedKey` as output, the use just provide the serdes
for both
>>>>>>>> input
>>>>>>>> combined-key types individually, and we can reuse both internally
>>>>>>>> to do
>>>>>>>> the rest. This seems to be a way simpler API. With the KO
output type
>>>>>>>> approach, users need to write an entirely new serde for KO
>>>>>>>> contrast.
>>>>>>>> Finally, @Jan, there are still some open comments you did
not address
>>>>>>>> and the KIP wiki page needs some updates. Would be great
if you
>>>>>>>> could do
>>>>>>>> this.
>>>>>>>> Can you also explicitly describe the data layout of the store
that is
>>>>>>>> used to do the range scans?
>>>>>>>> Additionally:
>>>>>>>> -> some arrows in the algorithm diagram are missing
>>>>>>>> -> was are those XXX in the diagram
>>>>>>>> -> can you finish the "Step by Step" example
>>>>>>>> -> it think the relationships between the different used
>>>>>>>> K0,K1,KO
>>>>>>>> should be explains explicitly (all information is there implicitly,
>>>>>>>> but
>>>>>>>> one need to think hard to figure it out)
>>>>>>>> Last but not least:
>>>>>>>> But noone is really interested.
>>>>>>>> Don't understand this statement...
>>>>>>>> -Matthias
>>>>>>>> On 11/16/17 9:05 AM, Jan Filipiak wrote:
>>>>>>>> We are running this perfectly fine. for us the smaller table
>>>>>>>>> rather infrequent say. only a few times per day. The
>>>>>>>>> of the
>>>>>>>>> flush is way lower than the computing power you need
to bring to the
>>>>>>>>> table to account for all the records beeing emmited after
the one
>>>>>>>>> single
>>>>>>>>> update.
>>>>>>>>> On 16.11.2017 18:02, Trevor Huey wrote:
>>>>>>>>> Ah, I think I see the problem now. Thanks for the explanation.
>>>>>>>>>> That is
>>>>>>>>>> tricky. As you said, it seems the easiest solution
would just be to
>>>>>>>>>> flush the cache. I wonder how big of a performance
hit that'd be...
>>>>>>>>>> On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak
>>>>>>>>>> <Jan.Filipiak@trivago.com
>>>>>>>>>> <mailto:Jan.Filipiak@trivago.com>> wrote:
>>>>>>>>>>         Hi Trevor,
>>>>>>>>>>         I am leaning towards the less intrusive approach
>>>>>>>>>> Infact
>>>>>>>>>>         that is how we implemented our Internal API
for this and
>>>>>>>>>> how we
>>>>>>>>>>         run it in production.
>>>>>>>>>>         getting more voices towards this solution
makes me really
>>>>>>>>>> happy.
>>>>>>>>>>         The reason its a problem for Prefix and not
for Range is the
>>>>>>>>>>         following. Imagine the intrusive approach.
They key of the
>>>>>>>>>> RockDB
>>>>>>>>>>         would be CombinedKey<A,B> and the prefix
scan would take an
>>>>>>>>>> A, and
>>>>>>>>>>         the range scan would take an CombinedKey<A,B>
still. As you
>>>>>>>>>> can
>>>>>>>>>>         see with the intrusive approach the keys
are actually
>>>>>>>>>> different
>>>>>>>>>>         types for different queries. With the less
>>>>>>>>>> apporach we
>>>>>>>>>>         use the same type and rely on Serde Invariances.
For us
>>>>>>>>>> this works
>>>>>>>>>>         nice (protobuf) might bite some JSON users.
>>>>>>>>>>         Hope it makes it clear
>>>>>>>>>>         Best Jan
>>>>>>>>>>         On 16.11.2017 16:39, Trevor Huey wrote:
>>>>>>>>>>         1. Going over KIP-213, I am leaning toward
the "less
>>>>>>>>>>> intrusive"
>>>>>>>>>>>         approach. In my use case, I am planning
on performing a
>>>>>>>>>>> sequence
>>>>>>>>>>>         of several oneToMany joins, From my understanding,
the more
>>>>>>>>>>>         intrusive approach would result in several
nested levels of
>>>>>>>>>>>         CombinedKey's. For example, consider
Tables A, B, C, D with
>>>>>>>>>>>         corresponding keys KA, KB, KC. Joining
A and B would
>>>>>>>>>>> produce
>>>>>>>>>>>         CombinedKey<KA, KB>. Then joining
that result on C would
>>>>>>>>>>> produce
>>>>>>>>>>>         CombinedKey<KC, CombinedKey<KA,
KB>>. My "keyOtherSerde"
>>>>>>>>>>> in this
>>>>>>>>>>>         case would need to be capable of deserializing
>>>>>>>>>>> CombinedKey<KA,
>>>>>>>>>>>         KB>. This would just get worse the
more tables I join. I
>>>>>>>>>>> realize
>>>>>>>>>>>         that it's easier to shoot yourself in
the foot with the
>>>>>>>>>>> less
>>>>>>>>>>>         intrusive approach, but as you said,
" the user can stick
>>>>>>>>>>> with
>>>>>>>>>>>         his default serde or his standard way
of serializing". In
>>>>>>>>>>> the
>>>>>>>>>>>         simplest case where the keys are just
strings, they can do
>>>>>>>>>>> simple
>>>>>>>>>>>         string concatenation and Serdes.String().
It also allows
>>>>>>>>>>> the user
>>>>>>>>>>>         to create and use their own version of
CombinedKey if they
>>>>>>>>>>> feel
>>>>>>>>>>>         so inclined.
>>>>>>>>>>>         2. Why is there a problem for prefix,
but not for range?
>>>>>>>>>>>        https://github.com/apache/kafk
>>>>>>>>>>> a/pull/3720/files#diff-8f863b7
>>>>>>>>>>> 4c3c5a0b989e89d00c149aef1L162
>>>>>>>>>>>         On Thu, Nov 16, 2017 at 2:57 AM Jan Filipiak
>>>>>>>>>>>         <Jan.Filipiak@trivago.com  <mailto:Jan.Filipiak@trivago.c
>>>>>>>>>>> om>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>             Hi Trevor,
>>>>>>>>>>>             thank you very much for your interested.
Too keep
>>>>>>>>>>> discussion
>>>>>>>>>>>             mailing list focused and not Jira
or Confluence I
>>>>>>>>>>> decided to
>>>>>>>>>>>             reply here.
>>>>>>>>>>>             1. its tricky activity is indeed
very low. In the
>>>>>>>>>>> KIP-213
>>>>>>>>>>>             there are 2 proposals about the return
type of the
>>>>>>>>>>> join. I
>>>>>>>>>>>             would like to settle on one.
>>>>>>>>>>>             Unfortunatly its controversal and
I don't want to have
>>>>>>>>>>> the
>>>>>>>>>>>             discussion after I settled on one
way and implemented
>>>>>>>>>>> it. But
>>>>>>>>>>>             noone is really interested.
>>>>>>>>>>>             So discussing with YOU, what your
preferred return
>>>>>>>>>>> type would
>>>>>>>>>>>             look would be very helpfull already.
>>>>>>>>>>>             2.
>>>>>>>>>>>             The most difficult part is implementing
>>>>>>>>>>>             this
>>>>>>>>>>>           https://github.com/apache/kaf
>>>>>>>>>>> ka/pull/3720/files#diff-ac41b4d
>>>>>>>>>>> fb9fc6bb707d966477317783cR68
>>>>>>>>>>>             here
>>>>>>>>>>>           https://github.com/apache/kaf
>>>>>>>>>>> ka/pull/3720/files#diff-8f863b7
>>>>>>>>>>> 4c3c5a0b989e89d00c149aef1R244
>>>>>>>>>>>             and here
>>>>>>>>>>>           https://github.com/apache/kaf
>>>>>>>>>>> ka/pull/3720/files#diff-b1a1281
>>>>>>>>>>> dce5219fd0cb5afad380d9438R207
>>>>>>>>>>>             One can get an easy shot by just
flushing the
>>>>>>>>>>> underlying
>>>>>>>>>>>             rocks and using Rocks for range scan.
>>>>>>>>>>>             But as you can see the implementation
depends on the
>>>>>>>>>>> API. For
>>>>>>>>>>>             wich way the API discussion goes
>>>>>>>>>>>             I would implement this differently.
>>>>>>>>>>>             3.
>>>>>>>>>>>             I only have so and so much time to
work on this. I
>>>>>>>>>>> filed the
>>>>>>>>>>>             KIP because I want to pull it through
and I am pretty
>>>>>>>>>>>             confident that I can do it.
>>>>>>>>>>>             But I am still waiting for the full
discussion to
>>>>>>>>>>> happen on
>>>>>>>>>>>             this. To get the discussion forward
it seems to be
>>>>>>>>>>> that I
>>>>>>>>>>>             need to fill out the table in
>>>>>>>>>>>             the KIP entirly (the one describing
the events, change
>>>>>>>>>>>             modifications and output). Feel free
to continue the
>>>>>>>>>>>             discussion w/o the table. I want
>>>>>>>>>>>             to finish the table during next week.
>>>>>>>>>>>             Best Jan thank you for your interest!
>>>>>>>>>>>             _____ Jira Quote ______
>>>>>>>>>>>             Jan Filipiak
>>>>>>>>>>>           <https://issues.apache.org/ji
>>>>>>>>>>> ra/secure/ViewProfile.jspa?name
>>>>>>>>>>> =jfilipiak>
>>>>>>>>>>>             Please bear with me while I try to
get caught up. I'm
>>>>>>>>>>> not yet
>>>>>>>>>>>             familiar with the Kafka code base.
I have a few
>>>>>>>>>>> questions to
>>>>>>>>>>>             try to figure out how I can get involved:
>>>>>>>>>>>             1. It seems like we need to get buy-in
on your
>>>>>>>>>>> KIP-213? It
>>>>>>>>>>>             doesn't seem like there's been much
activity on it
>>>>>>>>>>> besides
>>>>>>>>>>>             yourself in a while. What's your
current plan of
>>>>>>>>>>> attack for
>>>>>>>>>>>             getting that approved?
>>>>>>>>>>>             2. I know you said that the most
difficult part is yet
>>>>>>>>>>> to be
>>>>>>>>>>>             done. Is there some code you can
point me toward so I
>>>>>>>>>>> can
>>>>>>>>>>>             start digging in and better understand
why this is so
>>>>>>>>>>> difficult?
>>>>>>>>>>>             3. This issue has been open since
May '16. How far out
>>>>>>>>>>> do you
>>>>>>>>>>>             think we are from getting this implemented?

View raw message