kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams
Date Fri, 27 Oct 2017 10:39:36 GMT
I am personally still not convinced, that we should add `commit()` at all.

@Guozhang: you created the original Jira. Can you elaborate a little
bit? Isn't requesting commits a low level API that should not be exposed
in the DSL? Just want to understand the motivation better. Why would
anybody that uses the DSL ever want to request a commit? To me,
requesting commits is useful if you manipulated state explicitly, ie,
via Processor API.

Also, for the solution: it seem rather unnatural to me, that we add
`commit()` to `RecordContext` -- from my understanding, `RecordContext`
is an helper object that provide access to record meta data. Requesting
a commit is something quite different. Additionally, a commit does not
commit a specific record but a `RecrodContext` is for a specific record.

To me, this does not seem to be a sound API design if we follow this path.


-Matthias



On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
> Hi,
> 
> Thanks for your suggestions.
> 
> I have some comments, to make sure that there is no misunderstanding.
> 
> 
> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to enforce
>> user to consolidate this call as
>> "processorContext.recordContext().commit()". And internal implementation
>> of
>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also changed to
>> this call.
> 
> 
> - I think we should not deprecate `ProcessorContext.commit()`. The main
> intuition that we introduce `commit()` in `RecordContext` is that,
> `RecordContext` is the one which is provided in Rich interfaces. So if user
> wants to commit, then there should be some method inside `RecordContext` to
> do so. Internally, `RecordContext.commit()` calls
> `ProcessorContext.commit()`  (see the last code snippet in KIP-159):
> 
> @Override
>     public void process(final K1 key, final V1 value) {
> 
>         recordContext = new RecordContext() {               //
> recordContext initialization is added in this KIP
>             @Override
>             public void commit() {
>                 context().commit();
>             }
> 
>             @Override
>             public long offset() {
>                 return context().recordContext().offset();
>             }
> 
>             @Override
>             public long timestamp() {
>                 return context().recordContext().timestamp();
>             }
> 
>             @Override
>             public String topic() {
>                 return context().recordContext().topic();
>             }
> 
>             @Override
>             public int partition() {
>                 return context().recordContext().partition();
>             }
>       };
> 
> 
> So, we cannot deprecate `ProcessorContext.commit()` in this case IMO.
> 
> 
> 2. Add the `task` reference to the impl class, `ProcessorRecordContext`, so
>> that it can implement the commit call itself.
> 
> 
> - Actually, I don't think that we need `commit()` in
> `ProcessorRecordContext`. The main intuition is to "transfer"
> `ProcessorContext.commit()` call to Rich interfaces, to support
> user-specific committing.
>  To do so, we introduce `commit()` method in `RecordContext()` just only to
> call ProcessorContext.commit() inside. (see the above code snippet)
> So, in Rich interfaces, we are not dealing with  `ProcessorRecordContext`
> at all, and we leave all its methods as it is.
> In this KIP, we made `RecordContext` to be the parent class of
> `ProcessorRecordContext`, just because of they share quite amount of
> methods and it is logical to enable inheritance between those two.
> 
> 3. In the wiki page, the statement that "However, call to a commit() method,
>> is valid only within RecordContext interface (at least for now), we throw
>> an exception in ProcessorRecordContext.commit()." and the code snippet
>> below would need to be updated as well.
> 
> 
> - I think above explanation covers this as well.
> 
> 
> I want to gain some speed to this KIP, as it has gone though many changes
> based on user/developer needs, both in documentation-/implementation-wise.
> 
> 
> Cheers,
> Jeyhun
> 
> 
> 
> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <wangguoz@gmail.com> wrote:
> 
>> Thanks for the information Jeyhun. I had also forgot about KAFKA-3907 with
>> this KIP..
>>
>> Thinking a bit more, I'm now inclined to go with what we agreed before, to
>> add the commit() call to `RecordContext`. A few minor tweaks on its
>> implementation:
>>
>> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to enforce
>> user to consolidate this call as
>> "processorContext.recordContext().commit()". And internal implementation
>> of
>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also changed to
>> this call.
>>
>> 2. Add the `task` reference to the impl class, `ProcessorRecordContext`, so
>> that it can implement the commit call itself.
>>
>> 3. In the wiki page, the statement that "However, call to a commit()
>> method,
>> is valid only within RecordContext interface (at least for now), we throw
>> an exception in ProcessorRecordContext.commit()." and the code snippet
>> below would need to be updated as well.
>>
>>
>> Guozhang
>>
>>
>>
>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>
>>> Fair point. This is a long discussion and I totally forgot that we
>>> discussed this.
>>>
>>> Seems I changed my opinion about including KAFKA-3907...
>>>
>>> Happy to hear what others think.
>>>
>>>
>>> -Matthias
>>>
>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
>>>> Hi Matthias,
>>>>
>>>> It is probably my bad, the discussion was a bit long in this thread. I
>>>> proposed the related issue in the related KIP discuss thread [1] and
>> got
>>> an
>>>> approval [2,3].
>>>> Maybe I misunderstood.
>>>>
>>>> [1]
>>>> http://search-hadoop.com/m/Kafka/uyzND19Asmg1GKKXT1?subj=
>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>> [2]
>>>> http://search-hadoop.com/m/Kafka/uyzND1kpct22GKKXT1?subj=
>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>> [3]
>>>> http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj=
>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>
>>>>
>>>> On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax <matthias@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Interesting.
>>>>>
>>>>> I thought that https://issues.apache.org/jira/browse/KAFKA-4125 is
>> the
>>>>> main motivation for this KIP :)
>>>>>
>>>>> I also think, that we should not expose the full ProcessorContext at
>> DSL
>>>>> level.
>>>>>
>>>>> Thus, overall I am not even sure if we should fix KAFKA-3907 at all.
>>>>> Manual commits are something DSL users should not worry about -- and
>> if
>>>>> one really needs this, an advanced user can still insert a dummy
>>>>> `transform` to request a commit from there.
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 10/18/17 5:39 AM, Jeyhun Karimov wrote:
>>>>>> Hi,
>>>>>>
>>>>>> The main intuition is to solve [1], which is part of this KIP.
>>>>>> I agree with you that this might not seem semantically correct as we
>>> are
>>>>>> not committing record state.
>>>>>> Alternatively, we can remove commit() from RecordContext and add
>>>>>> ProcessorContext (which has commit() method) as an extra argument to
>>> Rich
>>>>>> methods:
>>>>>>
>>>>>> instead of
>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>     VR apply(final V value,
>>>>>>              final K key,
>>>>>>              final RecordContext recordContext);
>>>>>> }
>>>>>>
>>>>>> we can adopt
>>>>>>
>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>     VR apply(final V value,
>>>>>>              final K key,
>>>>>>              final RecordContext recordContext,
>>>>>>              final ProcessorContext processorContext);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> However, in this case, a user can get confused as ProcessorContext
>> and
>>>>>> RecordContext share some methods with the same name.
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> Jeyhun
>>>>>>
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <wangguoz@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Regarding #6 above, I'm still not clear why we would need `commit()`
>>> in
>>>>>>> both ProcessorContext and RecordContext, could you elaborate a bit
>>> more?
>>>>>>>
>>>>>>> To me `commit()` is really a processor context not a record context
>>>>>>> logically: when you call that function, it means we would commit the
>>>>> state
>>>>>>> of the whole task up to this processed record, not only that single
>>>>> record
>>>>>>> itself.
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <
>> je.karimov@gmail.com
>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks for the feedback.
>>>>>>>>
>>>>>>>>
>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - Fixed.
>>>>>>>>
>>>>>>>>
>>>>>>>>  I'd suggest moving the key parameter in the RichValueXX and
>>>>> RichReducer
>>>>>>>>> after the value parameters, as well as in the templates; e.g.
>>>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>     VR apply(final V1 value1, final V2 value2, final K key, final
>>>>>>>>> RecordContext
>>>>>>>>> recordContext);
>>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - Fixed.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. Some of the listed functions are not necessary since their
>> pairing
>>>>>>> APIs
>>>>>>>>> are being deprecated in 1.0 already:
>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<?
>> super
>>> K,
>>>>>>> ?
>>>>>>>>> super V, KR> selector,
>>>>>>>>>                                    final Serde<KR> keySerde,
>>>>>>>>>                                    final Serde<V> valSerde);
>>>>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
>>>>>>>>>                                  final RichValueJoiner<? super K,
>> ?
>>>>>>> super
>>>>>>>>> V,
>>>>>>>>> ? super VT, ? extends VR> joiner,
>>>>>>>>>                                  final Serde<K> keySerde,
>>>>>>>>>                                  final Serde<V> valSerde);
>>>>>>>>
>>>>>>>>
>>>>>>>> -Fixed
>>>>>>>>
>>>>>>>> 3. For a few functions where we are adding three APIs for a combo
>> of
>>>>> both
>>>>>>>>> mapper / joiner, or both initializer / aggregator, or adder /
>>>>>>> subtractor,
>>>>>>>>> I'm wondering if we can just keep one that use "rich" functions
>> for
>>>>>>> both;
>>>>>>>>> so that we can have less overloads and let users who only want to
>>>>>>> access
>>>>>>>>> one of them to just use dummy parameter declarations. For example:
>>>>>>>>>
>>>>>>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
>>>>>>> globalKTable,
>>>>>>>>>                                  final RichKeyValueMapper<? super
>>> K, ?
>>>>>>>>> super
>>>>>>>>>  V, ? extends GK> keyValueMapper,
>>>>>>>>>                                  final RichValueJoiner<? super K,
>> ?
>>>>>>> super
>>>>>>>>> V,
>>>>>>>>> ? super GV, ? extends RV> joiner);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Agreed. Fixed.
>>>>>>>>
>>>>>>>>
>>>>>>>> 4. For TimeWindowedKStream, I'm wondering why we do not make its
>>>>>>>>> Initializer also "rich" functions? I.e.
>>>>>>>>
>>>>>>>>
>>>>>>>> - It was a typo. Fixed.
>>>>>>>>
>>>>>>>>
>>>>>>>> 5. We need to move "RecordContext" from o.a.k.processor.internals
>> to
>>>>>>>>> o.a.k.processor.
>>>>>>>>>
>>>>>>>>> 6. I'm not clear why we want to move `commit()` from
>>> ProcessorContext
>>>>>>> to
>>>>>>>>> RecordContext?
>>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> Because it makes sense logically and  to reduce code maintenance
>>> (both
>>>>>>>> interfaces have offset() timestamp() topic() partition()
>> methods),  I
>>>>>>>> inherit ProcessorContext from RecordContext.
>>>>>>>> Since we need commit() method both in ProcessorContext and in
>>>>>>> RecordContext
>>>>>>>> I move commit() method to parent class (RecordContext).
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Jeyhun
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Jeyhun,
>>>>>>>>>
>>>>>>>>> Thanks for the updated KIP, here are my comments.
>>>>>>>>>
>>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>>
>>>>>>>>> 1. I'd suggest moving the key parameter in the RichValueXX and
>>>>>>>> RichReducer
>>>>>>>>> after the value parameters, as well as in the templates; e.g.
>>>>>>>>>
>>>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>     VR apply(final V1 value1, final V2 value2, final K key, final
>>>>>>>>> RecordContext
>>>>>>>>> recordContext);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> My motivation is that for lambda expression in J8, users that
>> would
>>>>> not
>>>>>>>>> care about the key but only the context, or vice versa, is likely
>> to
>>>>>>>> write
>>>>>>>>> it as (value1, value2, dummy, context) -> ... than putting the
>> dummy
>>>>> at
>>>>>>>> the
>>>>>>>>> beginning of the parameter list. Generally speaking we'd like to
>>> make
>>>>>>> all
>>>>>>>>> the "necessary" parameters prior to optional ones.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2. Some of the listed functions are not necessary since their
>>> pairing
>>>>>>>> APIs
>>>>>>>>> are being deprecated in 1.0 already:
>>>>>>>>>
>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<?
>> super
>>> K,
>>>>>>> ?
>>>>>>>>> super V, KR> selector,
>>>>>>>>>                                    final Serde<KR> keySerde,
>>>>>>>>>                                    final Serde<V> valSerde);
>>>>>>>>>
>>>>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
>>>>>>>>>                                  final RichValueJoiner<? super K,
>> ?
>>>>>>> super
>>>>>>>>> V,
>>>>>>>>> ? super VT, ? extends VR> joiner,
>>>>>>>>>                                  final Serde<K> keySerde,
>>>>>>>>>                                  final Serde<V> valSerde);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 3. For a few functions where we are adding three APIs for a combo
>> of
>>>>>>> both
>>>>>>>>> mapper / joiner, or both initializer / aggregator, or adder /
>>>>>>> subtractor,
>>>>>>>>> I'm wondering if we can just keep one that use "rich" functions
>> for
>>>>>>> both;
>>>>>>>>> so that we can have less overloads and let users who only want to
>>>>>>> access
>>>>>>>>> one of them to just use dummy parameter declarations. For example:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
>>>>>>> globalKTable,
>>>>>>>>>                                  final RichKeyValueMapper<? super
>>> K, ?
>>>>>>>>> super
>>>>>>>>>  V, ? extends GK> keyValueMapper,
>>>>>>>>>                                  final RichValueJoiner<? super K,
>> ?
>>>>>>> super
>>>>>>>>> V,
>>>>>>>>> ? super GV, ? extends RV> joiner);
>>>>>>>>>
>>>>>>>>> <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR>
>>> initializer,
>>>>>>>>>                              final RichAggregator<? super K, ?
>> super
>>>>> V,
>>>>>>>> VR>
>>>>>>>>> aggregator,
>>>>>>>>>                              final Materialized<K, VR,
>>>>>>>> KeyValueStore<Bytes,
>>>>>>>>> byte[]>> materialized);
>>>>>>>>>
>>>>>>>>> Similarly for KGroupedTable, a bunch of aggregate() are deprecated
>>> so
>>>>>>> we
>>>>>>>> do
>>>>>>>>> not need to add its rich functions any more.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4. For TimeWindowedKStream, I'm wondering why we do not make its
>>>>>>>>> Initializer also "rich" functions? I.e.
>>>>>>>>>
>>>>>>>>> <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR,
>> K>
>>>>>>>>> initializer,
>>>>>>>>>                                        final RichAggregator<?
>> super
>>> K,
>>>>>>> ?
>>>>>>>>> super V, VR> aggregator);
>>>>>>>>> <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR,
>> K>
>>>>>>>>> initializer,
>>>>>>>>>                                        final RichAggregator<?
>> super
>>> K,
>>>>>>> ?
>>>>>>>>> super V, VR> aggregator,
>>>>>>>>>                                        final Materialized<K, VR,
>>>>>>>>> WindowStore<Bytes, byte[]>> materialized);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 5. We need to move "RecordContext" from o.a.k.processor.internals
>> to
>>>>>>>>> o.a.k.processor.
>>>>>>>>>
>>>>>>>>> 6. I'm not clear why we want to move `commit()` from
>>> ProcessorContext
>>>>>>> to
>>>>>>>>> RecordContext? Conceptually I think it would better staying in the
>>>>>>>>> ProcessorContext. Do you find this not doable in the internal
>>>>>>>>> implementations?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 22, 2017 at 1:09 PM, Ted Yu <yuzhihong@gmail.com>
>>> wrote:
>>>>>>>>>
>>>>>>>>>>    recordContext = new RecordContext() {               //
>>>>>>> recordContext
>>>>>>>>>> initialization is added in this KIP
>>>>>>>>>>
>>>>>>>>>> This code snippet seems to be standard - would it make sense to
>>> pull
>>>>>>> it
>>>>>>>>>> into a (sample) RecordContext implementation ?
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 22, 2017 at 12:14 PM, Jeyhun Karimov <
>>>>>>> je.karimov@gmail.com
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Ted,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your comments. I added a couple of comments in KIP to
>>>>>>>>> clarify
>>>>>>>>>>> some points.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> bq. provides a hybrd solution
>>>>>>>>>>>> Typo in hybrid.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - My bad. Thanks for the correction.
>>>>>>>>>>>
>>>>>>>>>>> It would be nice if you can name some Value operator as
>> examples.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> - I added the corresponding interface names to KIP.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>>>>>>                              final Aggregator<? super K, ?
>> super
>>>>>>> V,
>>>>>>>>> VR>
>>>>>>>>>>>> adder,
>>>>>>>>>>>> The adder doesn't need to be RichAggregator ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Exactly. However, there are 2 Aggregator-type arguments in the
>>>>>>>>> related
>>>>>>>>>>> method. So, I had to overload all possible their Rich
>>> counterparts:
>>>>>>>>>>>
>>>>>>>>>>> // adder with non-rich, subtrctor is rich
>>>>>>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>>>>>                              final Aggregator<? super K, ? super
>>> V,
>>>>>>>> VR>
>>>>>>>>>>> adder,
>>>>>>>>>>>                              final RichAggregator<? super K, ?
>>>>>>> super
>>>>>>>> V,
>>>>>>>>>> VR>
>>>>>>>>>>> subtractor,
>>>>>>>>>>>                              final Materialized<K, VR,
>>>>>>>>>> KeyValueStore<Bytes,
>>>>>>>>>>> byte[]>> materialized);
>>>>>>>>>>>
>>>>>>>>>>> // adder withrich, subtrctor is non-rich
>>>>>>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>>>>>                              final RichAggregator<? super K, ?
>>>>>>> super
>>>>>>>> V,
>>>>>>>>>> VR>
>>>>>>>>>>> adder,
>>>>>>>>>>>                              final Aggregator<? super K, ? super
>>> V,
>>>>>>>> VR>
>>>>>>>>>>> subtractor,
>>>>>>>>>>>                              final Materialized<K, VR,
>>>>>>>>>> KeyValueStore<Bytes,
>>>>>>>>>>> byte[]>> materialized);
>>>>>>>>>>>
>>>>>>>>>>> // both adder and subtractor are rich
>>>>>>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>>>>>                              final RichAggregator<? super K, ?
>>>>>>> super
>>>>>>>> V,
>>>>>>>>>> VR>
>>>>>>>>>>> adder,
>>>>>>>>>>>                              final RichAggregator<? super K, ?
>>>>>>> super
>>>>>>>> V,
>>>>>>>>>> VR>
>>>>>>>>>>> subtractor,
>>>>>>>>>>>                              final Materialized<K, VR,
>>>>>>>>>> KeyValueStore<Bytes,
>>>>>>>>>>> byte[]>> materialized);
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Can you explain a bit about the above implementation ?
>>>>>>>>>>>>    void commit () {
>>>>>>>>>>>>      throw new UnsupportedOperationException("commit() is not
>>>>>>>>>> supported
>>>>>>>>>>> in
>>>>>>>>>>>> this context");
>>>>>>>>>>>> Is the exception going to be replaced with real code in the PR
>> ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - I added some comments both inside and outside the code
>> snippets
>>>>>>> in
>>>>>>>>> KIP.
>>>>>>>>>>> Specifically, for the code snippet above, we add *commit()*
>> method
>>>>>>> to
>>>>>>>>>>> *RecordContext* interface.
>>>>>>>>>>> However, we want  *commit()* method to be used only for
>>>>>>>> *RecordContext*
>>>>>>>>>>> instances (at least for now), so we add
>>>>>>> UnsupportedOperationException
>>>>>>>>> in
>>>>>>>>>>> all classes/interfaces that extend/implement *RecordContext.*
>>>>>>>>>>> In general, 1) we make RecordContext publicly available within
>>>>>>>>>>> ProcessorContext,  2) initialize its instance within all
>> required
>>>>>>>>>>> Processors and 3) pass it as an argument to the related Rich
>>>>>>>> interfaces
>>>>>>>>>>> inside Processors.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jeyhun
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 22, 2017 at 6:44 PM Ted Yu <yuzhihong@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> bq. provides a hybrd solution
>>>>>>>>>>>>
>>>>>>>>>>>> Typo in hybrid.
>>>>>>>>>>>>
>>>>>>>>>>>> bq. accessing read-only keys within XXXValues operators
>>>>>>>>>>>>
>>>>>>>>>>>> It would be nice if you can name some Value operator as
>> examples.
>>>>>>>>>>>>
>>>>>>>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>>>>>>                              final Aggregator<? super K, ?
>> super
>>>>>>> V,
>>>>>>>>> VR>
>>>>>>>>>>>> adder,
>>>>>>>>>>>>
>>>>>>>>>>>> The adder doesn't need to be RichAggregator ?
>>>>>>>>>>>>
>>>>>>>>>>>>   public RecordContext recordContext() {
>>>>>>>>>>>>     return this.recordContext();
>>>>>>>>>>>>
>>>>>>>>>>>> Can you explain a bit about the above implementation ?
>>>>>>>>>>>>
>>>>>>>>>>>>    void commit () {
>>>>>>>>>>>>      throw new UnsupportedOperationException("commit() is not
>>>>>>>>>> supported
>>>>>>>>>>> in
>>>>>>>>>>>> this context");
>>>>>>>>>>>>
>>>>>>>>>>>> Is the exception going to be replaced with real code in the PR
>> ?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 22, 2017 at 9:28 AM, Jeyhun Karimov <
>>>>>>>>> je.karimov@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Dear community,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I updated the related KIP [1]. Please feel free to comment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 22, 2017 at 12:20 AM Jeyhun Karimov <
>>>>>>>>>> je.karimov@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Damian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the update. I working on it and will provide an
>>>>>>>> update
>>>>>>>>>>> soon.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Sep 21, 2017 at 4:50 PM Damian Guy <
>>>>>>>> damian.guy@gmail.com
>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> All KIP-182 API PRs have now been merged. So you can
>>>>>>> consider
>>>>>>>> it
>>>>>>>>>> as
>>>>>>>>>>>>>>> stable.
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, 21 Sep 2017 at 15:23 Jeyhun Karimov <
>>>>>>>>> je.karimov@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks a lot for your comments. For the single interface
>>>>>>>>>> (RichXXX
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> XXXWithKey) solution, I have already submitted a PR but
>>>>>>>>> probably
>>>>>>>>>>> it
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> outdated (when the KIP first proposed), I need to revisit
>>>>>>>> that
>>>>>>>>>>> one.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Guozhang, from our (offline) discussion, I understood
>>>>>>> that
>>>>>>>> we
>>>>>>>>>> may
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>> it merge this KIP into the upcoming release, as KIP-159 is
>>>>>>>> not
>>>>>>>>>>> voted
>>>>>>>>>>>>> yet
>>>>>>>>>>>>>>>> (because we want both KIP-149 and KIP-159 to be as an
>>>>>>>> "atomic"
>>>>>>>>>>>> merge).
>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>> I decided to wait until KIP-182 gets stable (there are
>>>>>>> some
>>>>>>>>>> minor
>>>>>>>>>>>>>>> updates
>>>>>>>>>>>>>>>> AFAIK) and update the KIP accordingly. Please correct me
>>>>>>> if
>>>>>>>> I
>>>>>>>>> am
>>>>>>>>>>>> wrong
>>>>>>>>>>>>>>> or I
>>>>>>>>>>>>>>>> misunderstood.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Sep 21, 2017 at 4:11 PM Damian Guy <
>>>>>>>>>> damian.guy@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, 21 Sep 2017 at 13:46 Guozhang Wang <
>>>>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> +1 for me as well for collapsing.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Jeyhun, could you update the wiki accordingly to show
>>>>>>>>> what's
>>>>>>>>>>> the
>>>>>>>>>>>>>>> final
>>>>>>>>>>>>>>>>>> updates post KIP-182 that needs to be done in KIP-159
>>>>>>>>>>> including
>>>>>>>>>>>>>>>> KIP-149?
>>>>>>>>>>>>>>>>>> The child page I made is just a suggestion, but you
>>>>>>>> would
>>>>>>>>>>> still
>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>> update your proposal for people to comment and vote
>>>>>>> on.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Sep 14, 2017 at 10:37 PM, Ted Yu <
>>>>>>>>>> yuzhihong@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> One interface is cleaner.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Sep 14, 2017 at 7:26 AM, Bill Bejeck <
>>>>>>>>>>>> bbejeck@gmail.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> +1 for me on collapsing the RichXXXX and
>>>>>>>>>> ValueXXXXWithKey
>>>>>>>>>>>>>>>> interfaces
>>>>>>>>>>>>>>>>>>> into 1
>>>>>>>>>>>>>>>>>>>> interface.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Sep 13, 2017 at 11:31 AM, Jeyhun Karimov <
>>>>>>>>>>>>>>>>> je.karimov@gmail.com
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Damian,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for your feedback. Actually, this (what
>>>>>>> you
>>>>>>>>>>>> propose)
>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>> idea of KIP-149. Then we decided to divide it
>>>>>>> into
>>>>>>>>> two
>>>>>>>>>>>>> KIPs. I
>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>> expressed my opinion that keeping the two
>>>>>>>> interfaces
>>>>>>>>>>> (Rich
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> withKey)
>>>>>>>>>>>>>>>>>>>>> separate would add more overloads. So, email
>>>>>>>>>> discussion
>>>>>>>>>>>>>>> resulted
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> would not be a problem.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Our initial idea was similar to :
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> public abstract class RichValueMapper<K, V, VR>
>>>>>>>>>>>> implements
>>>>>>>>>>>>>>>>>>>>> ValueMapperWithKey<K, V, VR>, RichFunction {
>>>>>>>>>>>>>>>>>>>>> ......
>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> So, we check the type of object, whether it is
>>>>>>>>> RichXXX
>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> XXXWithKey
>>>>>>>>>>>>>>>>>>>> inside
>>>>>>>>>>>>>>>>>>>>> the called method and continue accordingly.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If this is ok with the community, I would like
>>>>>>> to
>>>>>>>>>> revert
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>>>>>> to this again.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 13, 2017 at 3:02 PM Damian Guy <
>>>>>>>>>>>>>>> damian.guy@gmail.com
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for sending out the update. I guess i
>>>>>>> was
>>>>>>>>>>>> thinking
>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>> along
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> lines of option 2 where we collapse the
>>>>>>> RichXXXX
>>>>>>>>> and
>>>>>>>>>>>>>>>>>> ValueXXXXWithKey
>>>>>>>>>>>>>>>>>>>> etc
>>>>>>>>>>>>>>>>>>>>>> interfaces into 1 interface that has all of
>>>>>>> the
>>>>>>>>>>>>> arguments. I
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> only need to add one additional overload for
>>>>>>>> each
>>>>>>>>>>>>> operator?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Wed, 13 Sep 2017 at 10:59 Jeyhun Karimov <
>>>>>>>>>>>>>>>>> je.karimov@gmail.com>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Dear all,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I would like to resume the discussion on
>>>>>>>>> KIP-159.
>>>>>>>>>> I
>>>>>>>>>>>> (and
>>>>>>>>>>>>>>>>>> Guozhang)
>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>> that releasing KIP-149 and KIP-159 in the
>>>>>>> same
>>>>>>>>>>> release
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>> sense
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> avoid a release with "partial" public APIs.
>>>>>>>>> There
>>>>>>>>>>> is a
>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>> Guozhang (and approved by me) to unify both
>>>>>>>>> KIPs.
>>>>>>>>>>>>>>>>>>>>>>> Please feel free to comment on this.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
>>>>>>>>> confluence/pages/viewpage.
>>>>>>>>>>>>>>>>>>>>> action?pageId=73637757
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 21, 2017 at 2:00 AM Jeyhun
>>>>>>>> Karimov <
>>>>>>>>>>>>>>>>>>> je.karimov@gmail.com
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias, Damian, all,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments and sorry for
>>>>>>>>>> super-late
>>>>>>>>>>>>>>> update.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Sure, the DSL refactoring is not blocking
>>>>>>>> for
>>>>>>>>>> this
>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>>>> I made some changes to KIP document based
>>>>>>> on
>>>>>>>>> my
>>>>>>>>>>>>>>> prototype.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Please feel free to comment.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 7, 2017 at 9:35 PM Matthias J.
>>>>>>>>> Sax <
>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I would not block this KIP with regard to
>>>>>>>> DSL
>>>>>>>>>>>>>>> refactoring.
>>>>>>>>>>>>>>>>>> IMHO,
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>> just finish this one and the DSL
>>>>>>>> refactoring
>>>>>>>>>> will
>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> reduce the number of overloads.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On 7/7/17 5:28 AM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> I am following the related thread in
>>>>>>> the
>>>>>>>>>>> mailing
>>>>>>>>>>>>> list
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> looking
>>>>>>>>>>>>>>>>>>>>>>>>> forward
>>>>>>>>>>>>>>>>>>>>>>>>>> for one-shot solution for overloads
>>>>>>>> issue.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 7, 2017 at 10:32 AM Damian
>>>>>>>> Guy
>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>> damian.guy@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> About overrides, what other
>>>>>>> alternatives
>>>>>>>>> do
>>>>>>>>>> we
>>>>>>>>>>>>> have?
>>>>>>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>>>>>>>>>>> backwards-compatibility we have to
>>>>>>> add
>>>>>>>>>> extra
>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> It wasn't clear to me in the KIP if
>>>>>>>> these
>>>>>>>>>> are
>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>> replacing
>>>>>>>>>>>>>>>>>>>>>>>>>>> existing ones.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, we are currently discussing
>>>>>>>> options
>>>>>>>>>> for
>>>>>>>>>>>>>>> replacing
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> overrides.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> About ProcessorContext vs
>>>>>>>> RecordContext,
>>>>>>>>>> you
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> right.
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> implement a prototype to understand
>>>>>>> the
>>>>>>>>>> full
>>>>>>>>>>>>>>> picture
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>> parts
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP might not be as straightforward
>>>>>>> as
>>>>>>>> I
>>>>>>>>>>>> thought.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 5, 2017 at 10:40 AM
>>>>>>> Damian
>>>>>>>>> Guy
>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>> damian.guy@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> HI Jeyhun,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is the intention that these methods
>>>>>>>> are
>>>>>>>>>> new
>>>>>>>>>>>>>>> overloads
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> KStream,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KTable, etc?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It is worth noting that a
>>>>>>>>> ProcessorContext
>>>>>>>>>>> is
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>>>>>>>>>>>> A
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext, as it stands, only
>>>>>>>> exists
>>>>>>>>>>>> during
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> record. Whereas the ProcessorContext
>>>>>>>>>> exists
>>>>>>>>>>>> for
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> lifetime
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Processor. Sot it doesn't make sense
>>>>>>>> to
>>>>>>>>>>> cast a
>>>>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You mentioned above passing the
>>>>>>>>>>>>>>>>> InternalProcessorContext
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> init()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> calls. It is internal for a reason
>>>>>>>> and i
>>>>>>>>>>> think
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>> remain
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> way.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It might be better to move the
>>>>>>>>>>> recordContext()
>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> InternalProcessorContext to
>>>>>>>>>>> ProcessorContext.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the KIP you have an example
>>>>>>>> showing:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> richMapper.init((RecordContext)
>>>>>>>>>>>>> processorContext);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But the interface is:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V,
>>>>>>>> VR>
>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     VR apply(final V value, final
>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> i.e., there is no init(...), besides
>>>>>>>> as
>>>>>>>>>>> above
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> wouldn't
>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>> sense.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 4 Jul 2017 at 23:30 Jeyhun
>>>>>>>>>> Karimov <
>>>>>>>>>>>>>>>>>>>>> je.karimov@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually my intend was to provide
>>>>>>> to
>>>>>>>>>>>>>>> RichInitializer
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provide the context of the record
>>>>>>> as
>>>>>>>>> you
>>>>>>>>>>> also
>>>>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I remove that not to confuse the
>>>>>>>> users.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the RecordContext and
>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>>>>> interfaces, I
>>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realized the
>>>>>>> InternalProcessorContext
>>>>>>>>>>> class.
>>>>>>>>>>>>>>> Can't
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> pass
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter to init() method of
>>>>>>>>> processors?
>>>>>>>>>>>> Then
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> able
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext easily with just a
>>>>>>>> method
>>>>>>>>>>> call.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 29, 2017 at 10:14 PM
>>>>>>>>> Matthias
>>>>>>>>>>> J.
>>>>>>>>>>>>> Sax
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more thing:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think `RichInitializer`
>>>>>>> does
>>>>>>>>>> make
>>>>>>>>>>>>>>> sense. As
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input record, there is also no
>>>>>>>>> context.
>>>>>>>>>> We
>>>>>>>>>>>>>>> could of
>>>>>>>>>>>>>>>>>>> course
>>>>>>>>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> context of the record that
>>>>>>> triggers
>>>>>>>>> the
>>>>>>>>>>> init
>>>>>>>>>>>>>>> call,
>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantically questionable. Also,
>>>>>>> the
>>>>>>>>>>> context
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be provided by the consecutive
>>>>>>> call
>>>>>>>> to
>>>>>>>>>>>>> aggregate
>>>>>>>>>>>>>>>>>> anyways.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/29/17 1:11 PM, Matthias J.
>>>>>>> Sax
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for updating the KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have one concern with regard to
>>>>>>>>>>> backward
>>>>>>>>>>>>>>>>>>> compatibility.
>>>>>>>>>>>>>>>>>>>>> You
>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use RecrodContext as base
>>>>>>> interface
>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> ProcessorContext.
>>>>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> break compatibility.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think, we should just have two
>>>>>>>>>>>> independent
>>>>>>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>>> Our
>>>>>>>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContextImpl class would
>>>>>>>>>>> implement
>>>>>>>>>>>>>>> both.
>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>> allows
>>>>>>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cast
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it to `RecordContext` and thus
>>>>>>>> limit
>>>>>>>>>> the
>>>>>>>>>>>>>>> visible
>>>>>>>>>>>>>>>>>> scope.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/27/17 1:35 PM, Jeyhun
>>>>>>> Karimov
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I updated the KIP w.r.t.
>>>>>>>> discussion
>>>>>>>>>> and
>>>>>>>>>>>>>>> comments.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Basically I eliminated overloads
>>>>>>>> for
>>>>>>>>>>>>>>> particular
>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> than 3.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As we can see there are a lot of
>>>>>>>>>>> overloads
>>>>>>>>>>>>>>> (and
>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> come
>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-149
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :) )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, is it wise to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wait the result of constructive
>>>>>>>> DSL
>>>>>>>>>>> thread
>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extend KIP to address this issue
>>>>>>>> as
>>>>>>>>>> well
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> continue as it is?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 14, 2017 at 11:29 PM
>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>> Wang <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LGTM. Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 13, 2017 at 2:20
>>>>>>> PM,
>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>> Karimov
>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> je.karimov@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comment
>>>>>>> Matthias.
>>>>>>>>>> After
>>>>>>>>>>>> all
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>>>>>>>>> (thanks
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> participants), I think this
>>>>>>>>> (single
>>>>>>>>>>>> method
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> passes
>>>>>>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> object) is the best
>>>>>>> alternative.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just a side note: I think
>>>>>>>>> KAFKA-3907
>>>>>>>>>>> [1]
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>> integrated
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP by adding related method
>>>>>>>>> inside
>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>> interface.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3907
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 13, 2017 at 7:50
>>>>>>> PM
>>>>>>>>>>> Matthias
>>>>>>>>>>>>> J.
>>>>>>>>>>>>>>>> Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> matthias@confluent.io>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to push this
>>>>>>>>>> discussion
>>>>>>>>>>>>>>> further.
>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> got
>>>>>>>>>>>>>>>>
> 


Mime
View raw message