kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeyhun Karimov <je.kari...@gmail.com>
Subject Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams
Date Tue, 04 Jul 2017 22:29:48 GMT
Hi Matthias,

Actually my intend was to provide to RichInitializer and later on we could
provide the context of the record as you also mentioned.
I remove that not to confuse the users.
Regarding the RecordContext and ProcessorContext interfaces, I just
realized the InternalProcessorContext class. Can't we pass this as a
parameter to init() method of processors? Then we would be able to get
RecordContext easily with just a method call.


Cheers,
Jeyhun

On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matthias@confluent.io>
wrote:

> One more thing:
>
> I don't think `RichInitializer` does make sense. As we don't have any
> input record, there is also no context. We could of course provide the
> context of the record that triggers the init call, but this seems to be
> semantically questionable. Also, the context for this first record will
> be provided by the consecutive call to aggregate anyways.
>
>
> -Matthias
>
> On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > Thanks for updating the KIP.
> >
> > I have one concern with regard to backward compatibility. You suggest to
> > use RecrodContext as base interface for ProcessorContext. This will
> > break compatibility.
> >
> > I think, we should just have two independent interfaces. Our own
> > ProcessorContextImpl class would implement both. This allows us to cast
> > it to `RecordContext` and thus limit the visible scope.
> >
> >
> > -Matthias
> >
> >
> >
> > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> >> Hi all,
> >>
> >> I updated the KIP w.r.t. discussion and comments.
> >> Basically I eliminated overloads for particular method if they are more
> >> than 3.
> >> As we can see there are a lot of overloads (and more will come with
> KIP-149
> >> :) )
> >> So, is it wise to
> >> wait the result of constructive DSL thread or
> >> extend KIP to address this issue as well or
> >> continue as it is?
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >>
> >>> LGTM. Thanks!
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <je.karimov@gmail.com>
> >>> wrote:
> >>>
> >>>> Thanks for the comment Matthias. After all the discussion (thanks to
> all
> >>>> participants), I think this (single method that passes in a
> RecordContext
> >>>> object) is the best alternative.
> >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated into
> the
> >>>> KIP by adding related method inside RecordContext interface.
> >>>>
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
> >>>>
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>>
> >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <
> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I would like to push this discussion further. It seems we got nice
> >>>>> alternatives (thanks for the summary Jeyhun!).
> >>>>>
> >>>>> With respect to RichFunctions and allowing them to be stateful,
I
> have
> >>>>> my doubt as expressed already. From my understanding, the idea was
to
> >>>>> give access to record metadata information only. If you want to
do a
> >>>>> stateful computation you should rather use #transform().
> >>>>>
> >>>>> Furthermore, as pointed out, we would need to switch to a
> >>>>> supplier-pattern introducing many more overloads.
> >>>>>
> >>>>> For those reason, I advocate for a simple interface with a single
> >>> method
> >>>>> that passes in a RecordContext object.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote:
> >>>>>> Thanks for the comprehensive summary!
> >>>>>>
> >>>>>> Personally I'd prefer the option of passing RecordContext as
an
> >>>>> additional
> >>>>>> parameter into he overloaded function. But I'm also open to
other
> >>>>> arguments
> >>>>>> if there are sth. that I have overlooked.
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <
> je.karimov@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Thanks for your comments Matthias and Guozhang.
> >>>>>>>
> >>>>>>> Below I mention the quick summary of the main alternatives
we
> looked
> >>>> at
> >>>>> to
> >>>>>>> introduce the Rich functions (I will refer to it as Rich
functions
> >>>>> until we
> >>>>>>> find better/another name). Initially the proposed alternatives
was
> >>> not
> >>>>>>> backwards-compatible, so I will not mention them.
> >>>>>>> The related discussions are spread in KIP-149 and in this
KIP
> >>>> (KIP-159)
> >>>>>>> discussion threads.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 1. The idea of rich functions came into the stage with KIP-149,
in
> >>>>>>> discussion thread. As a result we extended KIP-149 to support
Rich
> >>>>>>> functions as well.
> >>>>>>>
> >>>>>>> 2.  To as part of the Rich functions, we provided init
> >>>>> (ProcessorContext)
> >>>>>>> method. Afterwards, Dammian suggested that we should not
provide
> >>>>>>> ProcessorContext to users. As a result, we separated the
two
> >>> problems
> >>>>> into
> >>>>>>> two separate KIPs, as it seems they can be solved in parallel.
> >>>>>>>
> >>>>>>> - One approach we considered was :
> >>>>>>>
> >>>>>>> public interface ValueMapperWithKey<K, V, VR> {
> >>>>>>>     VR apply(final K key, final V value);
> >>>>>>> }
> >>>>>>>
> >>>>>>> public interface RichValueMapper<K, V, VR> extends
RichFunction{
> >>>>>>> }
> >>>>>>>
> >>>>>>> public interface RichFunction {
> >>>>>>>     void init(RecordContext recordContext);
> >>>>>>>     void close();
> >>>>>>> }
> >>>>>>>
> >>>>>>> public interface RecordContext {
> >>>>>>>     String applicationId();
> >>>>>>>     TaskId taskId();
> >>>>>>>     StreamsMetrics metrics();
> >>>>>>>     String topic();
> >>>>>>>     int partition();
> >>>>>>>     long offset();
> >>>>>>>     long timestamp();
> >>>>>>>     Map<String, Object> appConfigs();
> >>>>>>>     Map<String, Object> appConfigsWithPrefix(String
prefix);
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> public interface ProcessorContext extends RecordContext
{
> >>>>>>>    // all methods but the ones in RecordContext
> >>>>>>> }
> >>>>>>>
> >>>>>>> As a result:
> >>>>>>> * . All "withKey" and "withoutKey" interfaces can be converted
to
> >>>> their
> >>>>>>> Rich counterparts (with empty init() and close() methods)
> >>>>>>> *. All related Processors will accept Rich interfaces in
their
> >>>>>>> constructors.
> >>>>>>> *. So, we convert the related "withKey" or "withoutKey"
interfaces
> >>> to
> >>>>> Rich
> >>>>>>> interface while building the topology and initialize the
related
> >>>>> processors
> >>>>>>> with Rich interfaces only.
> >>>>>>> *. We will not need to overloaded methods for rich functions
as
> Rich
> >>>>>>> interfaces extend withKey interfaces. We will just check
the object
> >>>> type
> >>>>>>> and act accordingly.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 3. There was some thoughts that the above approach does
not support
> >>>>> lambdas
> >>>>>>> so we should support only one method, only init(RecordContext),
as
> >>>> part
> >>>>> of
> >>>>>>> Rich interfaces.
> >>>>>>> This is still in discussion. Personally I think Rich interfaces
are
> >>> by
> >>>>>>> definition lambda-free and we should not care much about
it.
> >>>>>>>
> >>>>>>>
> >>>>>>> 4. Thanks to Matthias's discussion, an alternative we considered
> was
> >>>> to
> >>>>>>> pass in the RecordContext as method parameter.  This might
even
> >>> allow
> >>>> to
> >>>>>>> use Lambdas and we could keep the name RichFunction as we
preserve
> >>> the
> >>>>>>> nature of being a function.
> >>>>>>> "If you go with `init()` and `close()` we basically
> >>>>>>> allow users to have an in-memory state for a function. Thus,
we
> >>> cannot
> >>>>>>> share a single instance of RichValueMapper (etc) over multiple
> tasks
> >>>> and
> >>>>>>> we would need a supplier pattern similar to #transform().
And this
> >>>> would
> >>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier
would not
> >>>>>>> inherit from ValueMapper and thus we would need many new
overload
> >>> for
> >>>>>>> KStream/KTable classes". (Copy paste from Matthias's email)
> >>>>>>>
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Jeyhun
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <
> >>> matthias@confluent.io
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Yes, we did consider this, and there is no consensus
yet what the
> >>>> best
> >>>>>>>> alternative is.
> >>>>>>>>
> >>>>>>>> @Jeyhun: the email thread got pretty long. Maybe you
can give a
> >>> quick
> >>>>>>>> summary of the current state of the discussion?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote:
> >>>>>>>>> Thanks for the explanation Jeyhun and Matthias.
> >>>>>>>>>
> >>>>>>>>> I have just read through both KIP-149 and KIP-159
and am
> wondering
> >>>> if
> >>>>>>> you
> >>>>>>>>> guys have considered a slight different approach
for rich
> >>> function,
> >>>>>>> that
> >>>>>>>> is
> >>>>>>>>> to add the `RecordContext` into the apply functions
as an
> >>> additional
> >>>>>>>>> parameter. For example:
> >>>>>>>>>
> >>>>>>>>> ---------------------------
> >>>>>>>>>
> >>>>>>>>> interface RichValueMapper<V, VR> {
> >>>>>>>>>
> >>>>>>>>> VR apply(final V value, final RecordContext context);
> >>>>>>>>>
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> ...
> >>>>>>>>>
> >>>>>>>>> // then in KStreams
> >>>>>>>>>
> >>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<?
super V, ? extends
> VR>
> >>>>>>>> mapper);
> >>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper
<? super
> >>>> V, ?
> >>>>>>>>> extends VR> mapper);
> >>>>>>>>>
> >>>>>>>>> -------------------------------
> >>>>>>>>>
> >>>>>>>>> The caveat is that it will introduces more overloads;
but I think
> >>>> the
> >>>>>>>>> #.overloads are mainly introduced by 1) serde overrides
and 2)
> >>>>>>>>> state-store-supplier overides, both of which can
be reduced in
> the
> >>>>> near
> >>>>>>>>> future, and I felt this overloading is still worthwhile,
as it
> has
> >>>> the
> >>>>>>>>> following benefits:
> >>>>>>>>>
> >>>>>>>>> 1) still allow lambda expressions.
> >>>>>>>>> 2) clearer code path (do not need to "convert" from
non-rich
> >>>> functions
> >>>>>>> to
> >>>>>>>>> rich functions)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Maybe this approach has already been discussed and
I may have
> >>>>>>> overlooked
> >>>>>>>> in
> >>>>>>>>> the email thread; anyways, lmk.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax
<
> >>>>>>> matthias@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> I agree with Jeyhun. As already mention, the
overall API
> >>>> improvement
> >>>>>>>>>> ideas are overlapping and/or contradicting each
other. For this
> >>>>>>> reason,
> >>>>>>>>>> not all ideas can be accomplished and some Jira
might just be
> >>>> closed
> >>>>>>> as
> >>>>>>>>>> "won't fix".
> >>>>>>>>>>
> >>>>>>>>>> For this reason, we try to do those KIP discussion
with are
> large
> >>>>>>> scope
> >>>>>>>>>> to get an overall picture to converge to an
overall consisted
> >>> API.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> @Jeyhun: about the overloads. Yes, we might
get more overload.
> It
> >>>>>>> might
> >>>>>>>>>> be sufficient though, to do a single xxxWithContext()
overload
> >>> that
> >>>>>>> will
> >>>>>>>>>> provide key+value+context. Otherwise, if might
get too messy
> >>> having
> >>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
> >>>>>>>>>> ValueMapperWithKeyWithContext.
> >>>>>>>>>>
> >>>>>>>>>> On the other hand, we also have the "builder
pattern" idea as an
> >>>> API
> >>>>>>>>>> change and this might mitigate the overload
problem. Not for
> >>> simple
> >>>>>>>>>> function like map/flatMap etc but for joins
and aggregations.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On the other hand, as I mentioned in an older
email, I am
> >>>> personally
> >>>>>>>>>> fine to break the pure functional interface,
and add
> >>>>>>>>>>
> >>>>>>>>>>   - interface WithRecordContext with method
> `open(RecordContext)`
> >>>> (or
> >>>>>>>>>> `init(...)`, or any better name) -- but not
`close()`)
> >>>>>>>>>>
> >>>>>>>>>>   - interface ValueMapperWithRecordContext extends
ValueMapper,
> >>>>>>>>>> WithRecordContext
> >>>>>>>>>>
> >>>>>>>>>> This would allow us to avoid any overload. Of
course, we don't
> >>> get
> >>>> a
> >>>>>>>>>> "pure function" interface and also sacrifices
Lambdas.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I am personally a little bit undecided what
the better option
> >>> might
> >>>>>>> be.
> >>>>>>>>>> Curious to hear what other think about this
trade off.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
> >>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>
> >>>>>>>>>>> It subsumes partially. Initially the idea
was to support
> >>>>>>> RichFunctions
> >>>>>>>>>> as a
> >>>>>>>>>>> separate interface. Throughout the discussion,
however, we
> >>>>> considered
> >>>>>>>>>> maybe
> >>>>>>>>>>> overloading the related methods (with RecodContext
param) is
> >>>> better
> >>>>>>>>>>> approach than providing a separate RichFunction
interface.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Jeyhun
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang
Wang <
> >>> wangguoz@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Does this KIP subsume this ticket as
well?
> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun
Karimov <
> >>>>>>> je.karimov@gmail.com
> >>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Dear community,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As we discussed in KIP-149 [DISCUSS]
thread [1], I would like
> >>> to
> >>>>>>>>>> initiate
> >>>>>>>>>>>>> KIP for rich functions (interfaces)
[2].
> >>>>>>>>>>>>> I would like to get your comments.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
> >>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
> >>>>>>>>>> ValueTransformer+ValueMapper+
> >>>>>>>>>>>>> and+ValueJoiner
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -Cheers
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>> -Cheers
> >>>>>>>
> >>>>>>> Jeyhun
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>> -Cheers
> >>>>
> >>>> Jeyhun
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >
>
> --
-Cheers

Jeyhun

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message