kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams
Date Wed, 05 Jul 2017 08:39:59 GMT
HI Jeyhun,

Is the intention that these methods are new overloads on the KStream,
KTable, etc?

It is worth noting that a ProcessorContext is not a RecordContext. A
RecordContext, as it stands, only exists during the processing of a single
record. Whereas the ProcessorContext exists for the lifetime of the
Processor. Sot it doesn't make sense to cast a ProcessorContext to a
RecordContext.
You mentioned above passing the InternalProcessorContext to the init()
calls. It is internal for a reason and i think it should remain that way.
It might be better to move the recordContext() method from
InternalProcessorContext to ProcessorContext.

In the KIP you have an example showing:
richMapper.init((RecordContext) processorContext);
But the interface is:
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContext recordContext);
}
i.e., there is no init(...), besides as above this wouldn't make sense.

Thanks,
Damian

On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.karimov@gmail.com> wrote:

> Hi Matthias,
>
> Actually my intend was to provide to RichInitializer and later on we could
> provide the context of the record as you also mentioned.
> I remove that not to confuse the users.
> Regarding the RecordContext and ProcessorContext interfaces, I just
> realized the InternalProcessorContext class. Can't we pass this as a
> parameter to init() method of processors? Then we would be able to get
> RecordContext easily with just a method call.
>
>
> Cheers,
> Jeyhun
>
> On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > One more thing:
> >
> > I don't think `RichInitializer` does make sense. As we don't have any
> > input record, there is also no context. We could of course provide the
> > context of the record that triggers the init call, but this seems to be
> > semantically questionable. Also, the context for this first record will
> > be provided by the consecutive call to aggregate anyways.
> >
> >
> > -Matthias
> >
> > On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > > Thanks for updating the KIP.
> > >
> > > I have one concern with regard to backward compatibility. You suggest
> to
> > > use RecrodContext as base interface for ProcessorContext. This will
> > > break compatibility.
> > >
> > > I think, we should just have two independent interfaces. Our own
> > > ProcessorContextImpl class would implement both. This allows us to cast
> > > it to `RecordContext` and thus limit the visible scope.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> > >> Hi all,
> > >>
> > >> I updated the KIP w.r.t. discussion and comments.
> > >> Basically I eliminated overloads for particular method if they are
> more
> > >> than 3.
> > >> As we can see there are a lot of overloads (and more will come with
> > KIP-149
> > >> :) )
> > >> So, is it wise to
> > >> wait the result of constructive DSL thread or
> > >> extend KIP to address this issue as well or
> > >> continue as it is?
> > >>
> > >> Cheers,
> > >> Jeyhun
> > >>
> > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >>
> > >>> LGTM. Thanks!
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <
> je.karimov@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Thanks for the comment Matthias. After all the discussion (thanks
to
> > all
> > >>>> participants), I think this (single method that passes in a
> > RecordContext
> > >>>> object) is the best alternative.
> > >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated
into
> > the
> > >>>> KIP by adding related method inside RecordContext interface.
> > >>>>
> > >>>>
> > >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
> > >>>>
> > >>>>
> > >>>> Cheers,
> > >>>> Jeyhun
> > >>>>
> > >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <
> > matthias@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> I would like to push this discussion further. It seems we got
nice
> > >>>>> alternatives (thanks for the summary Jeyhun!).
> > >>>>>
> > >>>>> With respect to RichFunctions and allowing them to be stateful,
I
> > have
> > >>>>> my doubt as expressed already. From my understanding, the idea
was
> to
> > >>>>> give access to record metadata information only. If you want
to do
> a
> > >>>>> stateful computation you should rather use #transform().
> > >>>>>
> > >>>>> Furthermore, as pointed out, we would need to switch to a
> > >>>>> supplier-pattern introducing many more overloads.
> > >>>>>
> > >>>>> For those reason, I advocate for a simple interface with a
single
> > >>> method
> > >>>>> that passes in a RecordContext object.
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>>
> > >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote:
> > >>>>>> Thanks for the comprehensive summary!
> > >>>>>>
> > >>>>>> Personally I'd prefer the option of passing RecordContext
as an
> > >>>>> additional
> > >>>>>> parameter into he overloaded function. But I'm also open
to other
> > >>>>> arguments
> > >>>>>> if there are sth. that I have overlooked.
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <
> > je.karimov@gmail.com
> > >>>>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> Thanks for your comments Matthias and Guozhang.
> > >>>>>>>
> > >>>>>>> Below I mention the quick summary of the main alternatives
we
> > looked
> > >>>> at
> > >>>>> to
> > >>>>>>> introduce the Rich functions (I will refer to it as
Rich
> functions
> > >>>>> until we
> > >>>>>>> find better/another name). Initially the proposed alternatives
> was
> > >>> not
> > >>>>>>> backwards-compatible, so I will not mention them.
> > >>>>>>> The related discussions are spread in KIP-149 and in
this KIP
> > >>>> (KIP-159)
> > >>>>>>> discussion threads.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 1. The idea of rich functions came into the stage with
KIP-149,
> in
> > >>>>>>> discussion thread. As a result we extended KIP-149
to support
> Rich
> > >>>>>>> functions as well.
> > >>>>>>>
> > >>>>>>> 2.  To as part of the Rich functions, we provided init
> > >>>>> (ProcessorContext)
> > >>>>>>> method. Afterwards, Dammian suggested that we should
not provide
> > >>>>>>> ProcessorContext to users. As a result, we separated
the two
> > >>> problems
> > >>>>> into
> > >>>>>>> two separate KIPs, as it seems they can be solved in
parallel.
> > >>>>>>>
> > >>>>>>> - One approach we considered was :
> > >>>>>>>
> > >>>>>>> public interface ValueMapperWithKey<K, V, VR>
{
> > >>>>>>>     VR apply(final K key, final V value);
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> public interface RichValueMapper<K, V, VR> extends
RichFunction{
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> public interface RichFunction {
> > >>>>>>>     void init(RecordContext recordContext);
> > >>>>>>>     void close();
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> public interface RecordContext {
> > >>>>>>>     String applicationId();
> > >>>>>>>     TaskId taskId();
> > >>>>>>>     StreamsMetrics metrics();
> > >>>>>>>     String topic();
> > >>>>>>>     int partition();
> > >>>>>>>     long offset();
> > >>>>>>>     long timestamp();
> > >>>>>>>     Map<String, Object> appConfigs();
> > >>>>>>>     Map<String, Object> appConfigsWithPrefix(String
prefix);
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> public interface ProcessorContext extends RecordContext
{
> > >>>>>>>    // all methods but the ones in RecordContext
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> As a result:
> > >>>>>>> * . All "withKey" and "withoutKey" interfaces can be
converted to
> > >>>> their
> > >>>>>>> Rich counterparts (with empty init() and close() methods)
> > >>>>>>> *. All related Processors will accept Rich interfaces
in their
> > >>>>>>> constructors.
> > >>>>>>> *. So, we convert the related "withKey" or "withoutKey"
> interfaces
> > >>> to
> > >>>>> Rich
> > >>>>>>> interface while building the topology and initialize
the related
> > >>>>> processors
> > >>>>>>> with Rich interfaces only.
> > >>>>>>> *. We will not need to overloaded methods for rich
functions as
> > Rich
> > >>>>>>> interfaces extend withKey interfaces. We will just
check the
> object
> > >>>> type
> > >>>>>>> and act accordingly.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 3. There was some thoughts that the above approach
does not
> support
> > >>>>> lambdas
> > >>>>>>> so we should support only one method, only init(RecordContext),
> as
> > >>>> part
> > >>>>> of
> > >>>>>>> Rich interfaces.
> > >>>>>>> This is still in discussion. Personally I think Rich
interfaces
> are
> > >>> by
> > >>>>>>> definition lambda-free and we should not care much
about it.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 4. Thanks to Matthias's discussion, an alternative
we considered
> > was
> > >>>> to
> > >>>>>>> pass in the RecordContext as method parameter.  This
might even
> > >>> allow
> > >>>> to
> > >>>>>>> use Lambdas and we could keep the name RichFunction
as we
> preserve
> > >>> the
> > >>>>>>> nature of being a function.
> > >>>>>>> "If you go with `init()` and `close()` we basically
> > >>>>>>> allow users to have an in-memory state for a function.
Thus, we
> > >>> cannot
> > >>>>>>> share a single instance of RichValueMapper (etc) over
multiple
> > tasks
> > >>>> and
> > >>>>>>> we would need a supplier pattern similar to #transform().
And
> this
> > >>>> would
> > >>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier
would
> not
> > >>>>>>> inherit from ValueMapper and thus we would need many
new overload
> > >>> for
> > >>>>>>> KStream/KTable classes". (Copy paste from Matthias's
email)
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Jeyhun
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <
> > >>> matthias@confluent.io
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Yes, we did consider this, and there is no consensus
yet what
> the
> > >>>> best
> > >>>>>>>> alternative is.
> > >>>>>>>>
> > >>>>>>>> @Jeyhun: the email thread got pretty long. Maybe
you can give a
> > >>> quick
> > >>>>>>>> summary of the current state of the discussion?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote:
> > >>>>>>>>> Thanks for the explanation Jeyhun and Matthias.
> > >>>>>>>>>
> > >>>>>>>>> I have just read through both KIP-149 and KIP-159
and am
> > wondering
> > >>>> if
> > >>>>>>> you
> > >>>>>>>>> guys have considered a slight different approach
for rich
> > >>> function,
> > >>>>>>> that
> > >>>>>>>> is
> > >>>>>>>>> to add the `RecordContext` into the apply functions
as an
> > >>> additional
> > >>>>>>>>> parameter. For example:
> > >>>>>>>>>
> > >>>>>>>>> ---------------------------
> > >>>>>>>>>
> > >>>>>>>>> interface RichValueMapper<V, VR> {
> > >>>>>>>>>
> > >>>>>>>>> VR apply(final V value, final RecordContext
context);
> > >>>>>>>>>
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>> ...
> > >>>>>>>>>
> > >>>>>>>>> // then in KStreams
> > >>>>>>>>>
> > >>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<?
super V, ? extends
> > VR>
> > >>>>>>>> mapper);
> > >>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper
<?
> super
> > >>>> V, ?
> > >>>>>>>>> extends VR> mapper);
> > >>>>>>>>>
> > >>>>>>>>> -------------------------------
> > >>>>>>>>>
> > >>>>>>>>> The caveat is that it will introduces more
overloads; but I
> think
> > >>>> the
> > >>>>>>>>> #.overloads are mainly introduced by 1) serde
overrides and 2)
> > >>>>>>>>> state-store-supplier overides, both of which
can be reduced in
> > the
> > >>>>> near
> > >>>>>>>>> future, and I felt this overloading is still
worthwhile, as it
> > has
> > >>>> the
> > >>>>>>>>> following benefits:
> > >>>>>>>>>
> > >>>>>>>>> 1) still allow lambda expressions.
> > >>>>>>>>> 2) clearer code path (do not need to "convert"
from non-rich
> > >>>> functions
> > >>>>>>> to
> > >>>>>>>>> rich functions)
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Maybe this approach has already been discussed
and I may have
> > >>>>>>> overlooked
> > >>>>>>>> in
> > >>>>>>>>> the email thread; anyways, lmk.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J.
Sax <
> > >>>>>>> matthias@confluent.io>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> I agree with Jeyhun. As already mention,
the overall API
> > >>>> improvement
> > >>>>>>>>>> ideas are overlapping and/or contradicting
each other. For
> this
> > >>>>>>> reason,
> > >>>>>>>>>> not all ideas can be accomplished and some
Jira might just be
> > >>>> closed
> > >>>>>>> as
> > >>>>>>>>>> "won't fix".
> > >>>>>>>>>>
> > >>>>>>>>>> For this reason, we try to do those KIP
discussion with are
> > large
> > >>>>>>> scope
> > >>>>>>>>>> to get an overall picture to converge to
an overall consisted
> > >>> API.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> @Jeyhun: about the overloads. Yes, we might
get more overload.
> > It
> > >>>>>>> might
> > >>>>>>>>>> be sufficient though, to do a single xxxWithContext()
overload
> > >>> that
> > >>>>>>> will
> > >>>>>>>>>> provide key+value+context. Otherwise, if
might get too messy
> > >>> having
> > >>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
> > >>>>>>>>>> ValueMapperWithKeyWithContext.
> > >>>>>>>>>>
> > >>>>>>>>>> On the other hand, we also have the "builder
pattern" idea as
> an
> > >>>> API
> > >>>>>>>>>> change and this might mitigate the overload
problem. Not for
> > >>> simple
> > >>>>>>>>>> function like map/flatMap etc but for joins
and aggregations.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On the other hand, as I mentioned in an
older email, I am
> > >>>> personally
> > >>>>>>>>>> fine to break the pure functional interface,
and add
> > >>>>>>>>>>
> > >>>>>>>>>>   - interface WithRecordContext with method
> > `open(RecordContext)`
> > >>>> (or
> > >>>>>>>>>> `init(...)`, or any better name) -- but
not `close()`)
> > >>>>>>>>>>
> > >>>>>>>>>>   - interface ValueMapperWithRecordContext
extends
> ValueMapper,
> > >>>>>>>>>> WithRecordContext
> > >>>>>>>>>>
> > >>>>>>>>>> This would allow us to avoid any overload.
Of course, we don't
> > >>> get
> > >>>> a
> > >>>>>>>>>> "pure function" interface and also sacrifices
Lambdas.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> I am personally a little bit undecided
what the better option
> > >>> might
> > >>>>>>> be.
> > >>>>>>>>>> Curious to hear what other think about
this trade off.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
> > >>>>>>>>>>> Hi Guozhang,
> > >>>>>>>>>>>
> > >>>>>>>>>>> It subsumes partially. Initially the
idea was to support
> > >>>>>>> RichFunctions
> > >>>>>>>>>> as a
> > >>>>>>>>>>> separate interface. Throughout the
discussion, however, we
> > >>>>> considered
> > >>>>>>>>>> maybe
> > >>>>>>>>>>> overloading the related methods (with
RecodContext param) is
> > >>>> better
> > >>>>>>>>>>> approach than providing a separate
RichFunction interface.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Jeyhun
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang
Wang <
> > >>> wangguoz@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Does this KIP subsume this ticket
as well?
> > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM,
Jeyhun Karimov <
> > >>>>>>> je.karimov@gmail.com
> > >>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Dear community,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> As we discussed in KIP-149
[DISCUSS] thread [1], I would
> like
> > >>> to
> > >>>>>>>>>> initiate
> > >>>>>>>>>>>>> KIP for rich functions (interfaces)
[2].
> > >>>>>>>>>>>>> I would like to get your comments.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
> > >>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
> > >>>>>>>>>> ValueTransformer+ValueMapper+
> > >>>>>>>>>>>>> and+ValueJoiner
> > >>>>>>>>>>>>> [2]
> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>> Jeyhun
> > >>>>>>>>>>>>> --
> > >>>>>>>>>>>>> -Cheers
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Jeyhun
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> --
> > >>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>> -Cheers
> > >>>>>>>
> > >>>>>>> Jeyhun
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>> --
> > >>>> -Cheers
> > >>>>
> > >>>> Jeyhun
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >
> >
> > --
> -Cheers
>
> Jeyhun
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message