kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
Date Sun, 17 Sep 2017 20:39:58 GMT
Thanks for updating the KIP.

You are of course right, that we internally need access to
InternalStreamBuilder, but that should not be too hard and effectively
be an internal implementation detail.

Two more comments:

the new method should be

> KStream<K,V> merge(KStream<K,V> stream);

and not

> <K,V> KStream<K,V> merge(KStream<K,V> streams);

as in the KIP? The prefix `<K,V>` is not required for non-static methods
and it should be singular (not plural) as parameter name?

Can you also add an explicit sentence, that the new method does not use
varargs anymore but a single KStream parameter (in contrast to the old
method). And mention that this is no limitation as calls to new merge()
can be chained.

Thanks a lot!


On 9/17/17 10:32 AM, Richard Yu wrote:
> Correction: When the current merge() method is called with multiple
> streams, a warning will be printed (or logged), but this should not hinder
> ability to read the log.
> There is a missing unchecked warning suppression for the old method.
> However, it is not high priority due to deprecation of the old merge()
> method.
> On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu <yohan.richard.yu@gmail.com>
> wrote:
>> With regards to Xavier's comment, this practice I do no think applies to
>> this PR. There is not much potential here for warnings to be thrown. Note
>> that in StreamsBuilder's merge, their is no @SuppressWarnings("unchecked")--indicating
>> that warnings is sparse, if not nonexistent.
>> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu <yohan.richard.yu@gmail.com>
>> wrote:
>>> KIP-202 has been changed according to the conditions of your suggestion.
>>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu <yohan.richard.yu@gmail.com>
>>> wrote:
>>>> I added StreamsBuilder under the assumption that InternalStreamBuilder
>>>> would be required to merge
>>>> two streams. However, if that is not the case, then I would still need a
>>>> couple of things:
>>>> 1) An InternalStreamBuilder instance to instantiate a new KStream
>>>> 2) The merge_name that the merged streams will be given
>>>> 3) Need access to the corresponding InternalStreamBuilder's
>>>> InternalTopologyBuilder to add a processor (for the new KStreams)
>>>> All these parameters are associated with InternalStreamsBuilder, thus it
>>>> is essential towards merging the streams.
>>>> We are left with three options (taking into account the restriction that
>>>> InternalStreamsBuilder's reference scope is mostly limited to within the
>>>> org.apache.kafka.streams.kstream.internals package):
>>>> a) Find a way to pass InternalStreamsBuilder indirectly into the class.
>>>> (using StreamsBuilder)
>>>> b) Find the matching InternalStreamBuilder within the method that
>>>> corresponds to the streams about to be merged.
>>>> or c) Use the local InternalStreamsBuilder inherited from
>>>> AbstractStream, assuming that it is the correct builder
>>>> From your suggestion, that would mean using the c option I mentioned
>>>> earlier. This choice of implementation works, but it could also include the
>>>> risk that the local InternalStreamsBuilder might not be the correct one
>>>> (just something one might want to keep in mind, since I will change it)
>>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax <matthias@confluent.io
>>>>> wrote:
>>>>> Hi Richard,
>>>>> Thanks a lot for the KIP!
>>>>> I have three question:
>>>>>  - why is the new merge() method static?
>>>>>  - why does the new merge() method take StreamsBuilder as a parameter?
>>>>>  - did you think about Xavier's comment (see the JIRA in case you did
>>>>> not notice it yet) about varargs vs adding some overloads to merge
>>>>> stream?
>>>>> My personal take is that merge() should not be static and not take
>>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API:
>>>>> // old
>>>>> KStream merged = StreamsBuilder.merge(stream1, stream2);
>>>>> // new
>>>>> KStream merge = stream1.merge(stream2);
>>>>> Having pointed out the second pattern, it should actually be fine to
>>>>> rid of varargs in merger() at all, as users could chain multiple calls
>>>>> to merge() after each other:
>>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4);
>>>>> -Matthias
>>>>> On 9/16/17 9:36 PM, Richard Yu wrote:
>>>>>> Hi,
>>>>>> Please take a look at:
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream
>>>>>> Thanks

View raw message