kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner
Date Mon, 15 May 2017 09:35:48 GMT
Thanks for the KIP.

I'm not convinced on the `RichFunction` approach. Do we really want to give
every DSL method access to the `ProcessorContext` ? It has a bunch of
methods on it that seem in-appropriate for some of the DSL methods, i.e,
`register`, `getStateStore`, `forward`, `schedule` etc. It is far too
broad. I think it would be better to have a narrower interface like the
`RecordContext`  - remembering it is easier to add methods/interfaces later
than to remove them

On Sat, 13 May 2017 at 22:26 Matthias J. Sax <matthias@confluent.io> wrote:

> Jeyhun,
>
> I am not an expert on Lambdas. Can you elaborate a little bit? I cannot
> follow the explanation in the KIP to see what the problem is.
>
> For updating the KIP title I don't know -- guess it's up to you. Maybe a
> committer can comment on this?
>
>
> Further comments:
>
>  - The KIP get a little hard to read -- can you maybe reformat the wiki
> page a little bit? I think using `CodeBlock` would help.
>
>  - What about KStream-KTable joins? You don't have overlaods added for
> them. Why? (Even if I still hope that we don't need to add any new
> overloads)
>
>  - Why do we need `AbstractRichFunction`?
>
>  - What about interfaces Initializer, ForeachAction, Merger, Predicate,
> Reducer? I don't want to say we should/need to add to all, but we should
> discuss all of them and add where it does make sense (e.g.,
> RichForachAction does make sense IMHO)
>
>
> Btw: I like the hierarchy `ValueXX` -- `ValueXXWithKey` -- `RichValueXX`
> in general -- but why can't we do all this with interfaces only?
>
>
>
> -Matthias
>
>
>
> On 5/11/17 7:00 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for your comments. I think we cannot extend the two interfaces if
> we
> > want to keep lambdas. I updated the KIP [1]. Maybe I should change the
> > title, because now we are not limiting the KIP to only ValueMapper,
> > ValueTransformer and ValueJoiner.
> > Please feel free to comment.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new
> >> overlaod.
> >>
> >> And yes, we need to do one check -- but this happens when building the
> >> topology. At runtime (I mean after KafkaStream#start() we don't need any
> >> check as we will always use `ValueMapperWithKey`)
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/9/17 2:55 AM, Jeyhun Karimov wrote:
> >>> Hi,
> >>>
> >>> Thanks for feedback.
> >>> Then we need to overload method
> >>>   <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ?
extends VR>
> >>> mapper);
> >>> with
> >>>   <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super
V, ? extends
> >> VR>
> >>> mapper);
> >>>
> >>> and in runtime (inside processor) we still have to check it is
> >> ValueMapper
> >>> or ValueMapperWithKey before wrapping it into the rich function.
> >>>
> >>>
> >>> Please correct me if I am wrong.
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> >>> michal.borowiecki@openbet.com> wrote:
> >>>
> >>>> +1 :)
> >>>>
> >>>>
> >>>> On 08/05/17 23:52, Matthias J. Sax wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I was reading the updated KIP and I am wondering, if we should do
the
> >>>>> design a little different.
> >>>>>
> >>>>> Instead of distinguishing between a RichFunction and non-RichFunction
> >> at
> >>>>> runtime level, we would use RichFunctions all the time. Thus, on
the
> >> DSL
> >>>>> entry level, if a user provides a non-RichFunction, we wrap it by
a
> >>>>> RichFunction that is fully implemented by Streams. This RichFunction
> >>>>> would just forward the call omitting the key:
> >>>>>
> >>>>> Just to sketch the idea (incomplete code snippet):
> >>>>>
> >>>>>> public StreamsRichValueMapper implements RichValueMapper() {
> >>>>>>    private ValueMapper userProvidedMapper; // set by constructor
> >>>>>>
> >>>>>>    public VR apply(final K key, final V1 value1, final V2 value2)
{
> >>>>>>        return userProvidedMapper(value1, value2);
> >>>>>>    }
> >>>>>> }
> >>>>>
> >>>>>  From a performance point of view, I am not sure if the
> >>>>> "if(isRichFunction)" including casts etc would have more overhead
> than
> >>>>> this approach (we would do more nested method call for
> non-RichFunction
> >>>>> which should be more common than RichFunctions).
> >>>>>
> >>>>> This approach should not effect lambdas (or do I miss something?)
and
> >>>>> might be cleaner, as we could have one more top level interface
> >>>>> `RichFunction` with methods `init()` and `close()` and also
> interfaces
> >>>>> for `RichValueMapper` etc. (thus, no abstract classes required).
> >>>>>
> >>>>>
> >>>>> Any thoughts?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thanks for comments. I extended PR and KIP to include rich
> functions.
> >> I
> >>>>>> will still have to evaluate the cost of deep copying of keys.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Jeyhun
> >>>>>>
> >>>>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
> >>>> mathieu.fenniak@replicon.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hey Matthias,
> >>>>>>>
> >>>>>>> My opinion would be that documenting the immutability of
the key is
> >> the
> >>>>>>> best approach available.  Better than requiring the key
to be
> >>>> serializable
> >>>>>>> (as with Jeyhun's last pass at the PR), no performance risk.
> >>>>>>>
> >>>>>>> It'd be different if Java had immutable type constraints
of some
> >> kind.
> >>>> :-)
> >>>>>>>
> >>>>>>> Mathieu
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
> >>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Agreed about RichFunction. If we follow this path, it
should cover
> >>>>>>>> all(?) DSL interfaces.
> >>>>>>>>
> >>>>>>>> About guarding the key -- I am still not sure what to
do about
> it...
> >>>>>>>> Maybe it might be enough to document it (and name the
key
> parameter
> >>>> like
> >>>>>>>> `readOnlyKey` to make is very clear). Ultimately, I
would prefer
> to
> >>>>>>>> guard against any modification, but I have no good idea
how to do
> >>>> this.
> >>>>>>>> Not sure what others think about the risk of corrupted
> partitioning
> >>>>>>>> (what would be a user error and we could say, well,
bad luck, you
> >> got
> >>>> a
> >>>>>>>> bug in your code, that's not our fault), vs deep copy
with a
> >> potential
> >>>>>>>> performance hit (that we can't quantity atm without
any
> performance
> >>>>>>> test).
> >>>>>>>> We do have a performance system test. Maybe it's worth
for you to
> >>>> apply
> >>>>>>>> the deep copy strategy and run the test. It's very basic
> performance
> >>>>>>>> test only, but might give some insight. If you want
to do this,
> look
> >>>>>>>> into folder "tests" for general test setup, and into
> >>>>>>>> "tests/kafaktests/benchmarks/streams" to find find the
perf test.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> I think extending KIP to include RichFunctions totally
 makes
> >> sense.
> >>>>>>> So,
> >>>>>>>>>   we don't want to guard the keys because it is
costly.
> >>>>>>>>> If we introduce RichFunctions I think it should
not be limited
> only
> >>>>>>> the 3
> >>>>>>>>> interfaces proposed in KIP but to wide range of
interfaces.
> >>>>>>>>> Please correct me if I am wrong.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Jeyhun
> >>>>>>>>>
> >>>>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax
<
> >>>> matthias@confluent.io
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> One follow up. There was this email on the user
list:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
> >>>>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
> >>>>>>>>>> It might make sense so include Initializer,
Adder, and
> Substractor
> >>>>>>>>>> inferface, too.
> >>>>>>>>>>
> >>>>>>>>>> And we should double check if there are other
interface we might
> >>>> miss
> >>>>>>>> atm.
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
> >>>>>>>>>>> Thanks for updating the KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> Deep copying the key will work for sure,
but I am actually a
> >> little
> >>>>>>> bit
> >>>>>>>>>>> worried about performance impact... We might
want to do some
> test
> >>>> to
> >>>>>>>>>>> quantify this impact.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Btw: this remind me about the idea of `RichFunction`
interface
> >> that
> >>>>>>>>>>> would allow users to access record metadata
(like timestamp,
> >>>> offset,
> >>>>>>>>>>> partition etc) within DSL. This would be
a similar concept.
> >> Thus, I
> >>>>>>> am
> >>>>>>>>>>> wondering, if it would make sense to enlarge
the scope of this
> >> KIP
> >>>> by
> >>>>>>>>>>> that? WDYT?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
> >>>>>>>>>>>> Hi Mathieu,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for feedback. I followed similar
approach and updated
> PR
> >>>> and
> >>>>>>>> KIP
> >>>>>>>>>>>> accordingly. I tried to guard the key
in Processors sending a
> >> copy
> >>>>>>> of
> >>>>>>>> an
> >>>>>>>>>>>> actual key.
> >>>>>>>>>>>> Because I am doing deep copy of an object,
I think memory can
> be
> >>>>>>>>>> bottleneck
> >>>>>>>>>>>> in some use-cases.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu
Fenniak <
> >>>>>>>>>> mathieu.fenniak@replicon.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Jeyhun,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This approach would change ValueMapper
(...etc) to be
> classes,
> >>>>>>> rather
> >>>>>>>>>> than
> >>>>>>>>>>>>> interfaces, which is also a backwards
incompatible change.
> An
> >>>>>>>>>> alternative
> >>>>>>>>>>>>> approach that would be backwards
compatible would be to
> define
> >>>> new
> >>>>>>>>>>>>> interfaces, and provide overrides
where those interfaces are
> >>>> used.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Unfortunately, making the key parameter
as "final" doesn't
> >> change
> >>>>>>>> much
> >>>>>>>>>>>>> about guarding against key change.
 It only prevents the
> >>>> parameter
> >>>>>>>>>> variable
> >>>>>>>>>>>>> from being reassigned.  If the key
type is a mutable object
> >> (eg.
> >>>>>>>>>> byte[]),
> >>>>>>>>>>>>> it can still be mutated. (eg. key[0]
= 0).  But I'm not
> really
> >>>> sure
> >>>>>>>>>> there's
> >>>>>>>>>>>>> much that can be done about that.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Mathieu
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM,
Jeyhun Karimov <
> >>>>>>> je.karimov@gmail.com
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for comments.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The concerns makes sense. Although
we can guard for
> immutable
> >>>> keys
> >>>>>>>> in
> >>>>>>>>>>>>>> current implementation (with
few changes), I didn't consider
> >>>>>>>> backward
> >>>>>>>>>>>>>> compatibility.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In this case 2 solutions come
to my mind. In both cases,
> user
> >>>>>>>> accesses
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> key in Object type, as passing
extra type parameter will
> break
> >>>>>>>>>>>>>> backwards-compatibility.  So
user has to cast to actual key
> >>>> type.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Firstly, We can overload
apply method with 2 argument
> (key
> >>>> and
> >>>>>>>>>> value)
> >>>>>>>>>>>>>> and force key to be *final*.
By doing this,  I think we can
> >>>>>>> address
> >>>>>>>>>> both
> >>>>>>>>>>>>>> backward-compatibility and guarding
against key change.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. Secondly, we can create class
KeyAccess like:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> public class KeyAccess {
> >>>>>>>>>>>>>>      Object key;
> >>>>>>>>>>>>>>      public void beforeApply(final
Object key) {
> >>>>>>>>>>>>>>          this.key = key;
> >>>>>>>>>>>>>>      }
> >>>>>>>>>>>>>>      public Object getKey()
{
> >>>>>>>>>>>>>>          return key;
> >>>>>>>>>>>>>>      }
> >>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We can extend *ValueMapper,
ValueJoiner* and
> >> *ValueTransformer*
> >>>>>>> from
> >>>>>>>>>>>>>> *KeyAccess*. Inside processor
(for example
> >>>>>>>> *KTableMapValuesProcessor*)
> >>>>>>>>>>>>>> before calling *mapper.apply(value)*
we can set the *key* by
> >>>>>>>>>>>>>> *mapper.beforeApply(key)*. As
a result, user can use
> >> *getKey()*
> >>>> to
> >>>>>>>>>> access
> >>>>>>>>>>>>>> the key inside *apply(value)*
method.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, May 1, 2017 at 7:24
PM Matthias J. Sax <
> >>>>>>>> matthias@confluent.io
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jeyhun,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> thanks a lot for the KIP!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think there are two issues
we need to address:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (1) The KIP does not consider
backward compatibility. Users
> >> did
> >>>>>>>>>>>>> complain
> >>>>>>>>>>>>>>> about this in past releases
already, and as the user base
> >>>> grows,
> >>>>>>> we
> >>>>>>>>>>>>>>> should not break backward
compatibility in future releases
> >>>>>>> anymore.
> >>>>>>>>>>>>>>> Thus, we should think of
a better way to allow key access.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Mathieu's comment goes into
the same direction
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On the other hand,
the number of compile failures that
> >> would
> >>>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> fixed from this
change is unfortunate. :-)
> >>>>>>>>>>>>>>> (2) Another concern is,
that there is no guard to prevent
> >> user
> >>>>>>> code
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> modify the key. This might
corrupt partitioning if users do
> >>>> alter
> >>>>>>>> the
> >>>>>>>>>>>>>>> key (accidentally -- or
users are just not aware that they
> >> are
> >>>>>>> not
> >>>>>>>>>>>>>>> allowed to modify the provided
key object) and thus break
> the
> >>>>>>>>>>>>>>> application. (This was the
original motivation to not
> provide
> >>>> the
> >>>>>>>> key
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>> the first place -- it's
guards against modification.)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu
Fenniak wrote:
> >>>>>>>>>>>>>>>> Hi Jeyhun,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I just want to add my
voice that, I too, have wished for
> >>>> access
> >>>>>>> to
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> record key during a
mapValues or similar operation.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On the other hand, the
number of compile failures that
> would
> >>>>>>> need
> >>>>>>>> to
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> fixed from this change
is unfortunate. :-)  But at least
> it
> >>>>>>> would
> >>>>>>>>>> all
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> pretty clear and easy
change.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Mathieu
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, May 1, 2017
at 6:55 AM, Jeyhun Karimov <
> >>>>>>>>>> je.karimov@gmail.com
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>> Dear community,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I want to share
KIP-149 [1] based on issues KAFKA-4218
> [2],
> >>>>>>>>>>>>> KAFKA-4726
> >>>>>>>>>>>>>>> [3],
> >>>>>>>>>>>>>>>>> KAFKA-3745 [4].
The related PR can be found at [5].
> >>>>>>>>>>>>>>>>> I would like to
get your comments.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> >>>>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
> >>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
> >>>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
> >>>>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
> >>>>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> -Cheers
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -Cheers
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>> -Cheers
> >>>>>>>>>
> >>>>>>>>> Jeyhun
> >>>>>>>>>
> >>>>>>>>
> >>>>
> >>>> --
> >>> -Cheers
> >>>
> >>> Jeyhun
> >>>
> >>
> >> --
> > -Cheers
> >
> > Jeyhun
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message