kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL
Date Fri, 03 Mar 2017 17:57:45 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894770#comment-15894770
] 

Matthias J. Sax commented on KAFKA-4601:
----------------------------------------

Thanks!

> Avoid duplicated repartitioning in KStream DSL
> ----------------------------------------------
>
>                 Key: KAFKA-4601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4601
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: performance
>
> Consider the following DSL:
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(),
"topic1").map(..);
>         KTable<String, Long> counts = source
>                 .groupByKey()
>                 .count("Counts");
>         KStream<String, String> sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000000:
> 					topics:		[topic1]
> 					children:	[KSTREAM-MAP-0000000001]
> 				KSTREAM-MAP-0000000001:
> 					children:	[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
> 				KSTREAM-FILTER-0000000004:
> 					children:	[KSTREAM-SINK-0000000003]
> 				KSTREAM-SINK-0000000003:
> 					topic:		X-Counts-repartition
> 				KSTREAM-FILTER-0000000007:
> 					children:	[KSTREAM-SINK-0000000006]
> 				KSTREAM-SINK-0000000006:
> 					topic:		X-KSTREAM-MAP-0000000001-repartition
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000008:
> 					topics:		[X-KSTREAM-MAP-0000000001-repartition]
> 					children:	[KSTREAM-LEFTJOIN-0000000009]
> 				KSTREAM-LEFTJOIN-0000000009:
> 					states:		[Counts]
> 				KSTREAM-SOURCE-0000000005:
> 					topics:		[X-Counts-repartition]
> 					children:	[KSTREAM-AGGREGATE-0000000002]
> 				KSTREAM-AGGREGATE-0000000002:
> 					states:		[Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the join, which
not only introduce unnecessary overheads but also mess up the processing ordering (users are
expecting each record to go through aggregation first then the join operator). And in order
to get the following simpler topology users today need to add a {{through}} operator after
{{map}} manually to enforce repartitioning.
> {code}
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000000:
> 					topics:		[topic1]
> 					children:	[KSTREAM-MAP-0000000001]
> 				KSTREAM-MAP-0000000001:
> 					children:	[KSTREAM-SINK-0000000002]
> 				KSTREAM-SINK-0000000002:
> 					topic:		topic 2
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000003:
> 					topics:		[topic 2]
> 					children:	[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
> 				KSTREAM-AGGREGATE-0000000004:
> 					states:		[Counts]
> 				KSTREAM-LEFTJOIN-0000000005:
> 					states:		[Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can consider doing
when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message