From dev-return-99783-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Nov 14 02:58:04 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 25ADA18062B for ; Wed, 14 Nov 2018 02:58:03 +0100 (CET) Received: (qmail 20579 invoked by uid 500); 14 Nov 2018 01:58:02 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 20568 invoked by uid 99); 14 Nov 2018 01:58:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Nov 2018 01:58:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 6A9F318C4C5 for ; Wed, 14 Nov 2018 01:58:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -110.301 X-Spam-Level: X-Spam-Status: No, score=-110.301 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id dNpvlJpJr9hV for ; Wed, 14 Nov 2018 01:58:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 011B362402 for ; Wed, 14 Nov 2018 01:58:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 85E4DE0C57 for ; Wed, 14 Nov 2018 01:58:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4486E2526C for ; Wed, 14 Nov 2018 01:58:00 +0000 (UTC) Date: Wed, 14 Nov 2018 01:58:00 +0000 (UTC) From: "Bill Bejeck (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Resolved] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-4601?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-4601. -------------------------------- Resolution: Fixed Marking this resolved with=C2=A0[https://github.com/apache/kafka/pull/5451.= ] As=C2=A0[~guozhang]=C2=A0said we will address follow up work with individua= l tickets. > Avoid duplicated repartitioning in KStream DSL > ---------------------------------------------- > > Key: KAFKA-4601 > URL: https://issues.apache.org/jira/browse/KAFKA-4601 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Guozhang Wang > Priority: Major > Labels: performance > > Consider the following DSL: > {code} > Stream source =3D builder.stream(Serdes.String(), Serdes.= String(), "topic1"); > Stream mapped =3D source.map(..); > KTable counts =3D mapped > .groupByKey() > .count("Counts"); > KStream sink =3D mapped.leftJoin(counts, ..); > {code} > The resulted topology looks like this: > {code} > ProcessorTopology: > =09=09=09=09KSTREAM-SOURCE-0000000000: > =09=09=09=09=09topics:=09=09[topic1] > =09=09=09=09=09children:=09[KSTREAM-MAP-0000000001] > =09=09=09=09KSTREAM-MAP-0000000001: > =09=09=09=09=09children:=09[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-000= 0000007] > =09=09=09=09KSTREAM-FILTER-0000000004: > =09=09=09=09=09children:=09[KSTREAM-SINK-0000000003] > =09=09=09=09KSTREAM-SINK-0000000003: > =09=09=09=09=09topic:=09=09X-Counts-repartition > =09=09=09=09KSTREAM-FILTER-0000000007: > =09=09=09=09=09children:=09[KSTREAM-SINK-0000000006] > =09=09=09=09KSTREAM-SINK-0000000006: > =09=09=09=09=09topic:=09=09X-KSTREAM-MAP-0000000001-repartition > ProcessorTopology: > =09=09=09=09KSTREAM-SOURCE-0000000008: > =09=09=09=09=09topics:=09=09[X-KSTREAM-MAP-0000000001-repartition] > =09=09=09=09=09children:=09[KSTREAM-LEFTJOIN-0000000009] > =09=09=09=09KSTREAM-LEFTJOIN-0000000009: > =09=09=09=09=09states:=09=09[Counts] > =09=09=09=09KSTREAM-SOURCE-0000000005: > =09=09=09=09=09topics:=09=09[X-Counts-repartition] > =09=09=09=09=09children:=09[KSTREAM-AGGREGATE-0000000002] > =09=09=09=09KSTREAM-AGGREGATE-0000000002: > =09=09=09=09=09states:=09=09[Counts] > {code} > I.e. there are two repartition topics, one for the aggregate and one for = the join, which not only introduce unnecessary overheads but also mess up t= he processing ordering (users are expecting each record to go through aggre= gation first then the join operator). And in order to get the following sim= pler topology users today need to add a {{through}} operator after {{map}} = manually to enforce repartitioning. > {code} > Stream source =3D builder.stream(Serdes.String(), Serdes.= String(), "topic1"); > Stream repartitioned =3D source.map(..).through("topic2")= ; > KTable counts =3D repartitioned > .groupByKey() > .count("Counts"); > KStream sink =3D repartitioned.leftJoin(counts, .= .); > {code} > The resulted topology then will look like this: > {code} > ProcessorTopology: > =09=09=09=09KSTREAM-SOURCE-0000000000: > =09=09=09=09=09topics:=09=09[topic1] > =09=09=09=09=09children:=09[KSTREAM-MAP-0000000001] > =09=09=09=09KSTREAM-MAP-0000000001: > =09=09=09=09=09children:=09[KSTREAM-SINK-0000000002] > =09=09=09=09KSTREAM-SINK-0000000002: > =09=09=09=09=09topic:=09=09topic 2 > ProcessorTopology: > =09=09=09=09KSTREAM-SOURCE-0000000003: > =09=09=09=09=09topics:=09=09[topic 2] > =09=09=09=09=09children:=09[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOI= N-0000000005] > =09=09=09=09KSTREAM-AGGREGATE-0000000004: > =09=09=09=09=09states:=09=09[Counts] > =09=09=09=09KSTREAM-LEFTJOIN-0000000005: > =09=09=09=09=09states:=09=09[Counts] > {code}=20 > This kind of optimization should be automatic in Streams, which we can co= nsider doing when extending from one-operator-at-a-time translation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)