kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Marcel Silberhorn (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
Date Thu, 22 Jun 2017 06:41:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058849#comment-16058849
] 

Marcel Silberhorn commented on KAFKA-5488:
------------------------------------------

Hi, I will start some scribbles before publishing it on cwiki ... https://gitlab.com/childno.de/apache_kafka/snippets/1665751
But yet, I don't have a "great idea" to solve this resp. want some feedback from you ... so
I should start a discussion on the mailinglist ... another todo ;)

> KStream.branch should not return a Array of streams we have to access by known index
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5488
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5488
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.11.0.0, 0.10.2.1
>            Reporter: Marcel Silberhorn
>              Labels: needs-kip
>
> long story short: it's a mess to get a {{KStream<>[]}} out from {{KStream<>branch(Predicate<>...)}}.
It breaks the fluent API and it produces bad code which is not that good to maintain since
you have to know the right index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
>     public static void main(String... args) {
>         KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             (k, v) -> EventType::validData
>             (k, v) -> true
>         );
>         
>         branchedStreams[0]
>         .to("topicValidData");
>         
>         branchedStreams[1]
>         .to("topicInvalidData");
>     }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, Consumer<KStream<>>>...
branchPredicatesAndHandlers);}} where you can write branches/streams code nested where it
belongs to
> so it would be possible to write code like
> {code:lang=java}
>         new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             Branch.create(
>                 (k, v) -> EventType::validData,
>                 stream -> stream.to("topicValidData")
>             ),
>             Branch.create(
>                 (k, v) -> true,
>                 stream -> stream.to("topicInvalidData")
>             )
>         );
> {code}
> I'll go forward to evaluate some ideas:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message