From user-return-31589-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Dec 19 17:59:58 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1A58418037A for ; Thu, 19 Dec 2019 18:59:58 +0100 (CET) Received: (qmail 84455 invoked by uid 500); 19 Dec 2019 17:59:56 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 84445 invoked by uid 99); 19 Dec 2019 17:59:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Dec 2019 17:59:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 5D93CC0365 for ; Thu, 19 Dec 2019 17:59:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.109 X-Spam-Level: X-Spam-Status: No, score=0.109 tagged_above=-999 required=6.31 tests=[KAM_DMARC_STATUS=0.01, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_PASS=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=0.1] autolearn=disabled Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id iDC3J1CYLlHA for ; Thu, 19 Dec 2019 17:59:53 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=5.57.41.100; helo=email-login.eu; envelope-from=theo.diefenthal@scoop-software.de; receiver= Received: from email-login.eu (email-login.eu [5.57.41.100]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id A63F77DC06 for ; Thu, 19 Dec 2019 17:59:52 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by email-login.eu (Postfix) with ESMTP id 28E121C4A25 for ; Thu, 19 Dec 2019 18:59:44 +0100 (CET) Received: from email-login.eu ([127.0.0.1]) by localhost (email-login.eu [127.0.0.1]) (amavisd-new, port 10032) with ESMTP id LMDgMD2EK6Mn for ; Thu, 19 Dec 2019 18:59:44 +0100 (CET) Received: from localhost (localhost [127.0.0.1]) by email-login.eu (Postfix) with ESMTP id 0F75A1C4A29 for ; Thu, 19 Dec 2019 18:59:44 +0100 (CET) X-Virus-Scanned: amavisd-new at email-login.eu Received: from email-login.eu ([127.0.0.1]) by localhost (email-login.eu [127.0.0.1]) (amavisd-new, port 10026) with ESMTP id 5itFv3gcyV3Q for ; Thu, 19 Dec 2019 18:59:43 +0100 (CET) Received: from email-login.eu (zcs-mbx1.email-login.eu [5.57.41.100]) by email-login.eu (Postfix) with ESMTP id ED7AC1C4A25 for ; Thu, 19 Dec 2019 18:59:43 +0100 (CET) From: Theo Diefenthal To: References: <1576622116566-0.post@n4.nabble.com> In-Reply-To: <1576622116566-0.post@n4.nabble.com> Subject: RE: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot Thread-Topic: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot Date: Thu, 19 Dec 2019 18:59:43 +0100 (CET) Message-ID: <000601d5b696$17ee4ae0$47cae0a0$@scoop-software.de> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit X-Mailer: Microsoft Outlook 16.0 X-Mailer: Zimbra 8.8.15_GA_3888 (Zimbra-ZCO/8.8.9.1775 (10.0.18363 de-DE) Pc80 T3054 R38) Thread-Index: AQLGlg0Eo/HEjsuB320m0Ppzz6w7z6Xf/xqw Content-Language: de Hi Krzysztof, You can just key your stream by transaction id. If you have lots of different transaction ids, you can expect the load to be evenly distributed. All events with the same key (==transaction id) will be processed by the same task slot. If you only have a few kafka partitions, you could key by transaction id as early as possible in order to fully utilize your cluster. Remember, however, that each keyby will cause a network shuffle, so it's probably not worth it to fist key by transaction id, then by traded, and afterwards again by transaction id. Best regards Theo -----Original Message----- From: KristoffSC Sent: Dienstag, 17. Dezember 2019 23:35 To: user@flink.apache.org Subject: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot Hi community, I'm trying to build a PoC pipeline for my project and I have few questions regarding load balancing between task managers and ensuring that keyed stream events for the same key will go to the same Task Manager (hence the same task slot). Lets assume that we have 3 task managers, 3 task slot each. So it gives us 9 task slots in total. The source is a Kafka topic with N partitions. Events are "linked" with each other by transactionId (long) field. So they can be keyed by this field. Events for particular transactionId can be spanned across many partitions (we don't have control over this). The pipeline is: 1. Kafka Source -> produces RawEvents (map operator). 2. Enrichment with AsuncFuntion(simple DB/cache call) produces EnrichedEvents with map operator. 3. Key EnrichedEvents by tradeId, buffer events for some time, sort them by sequenceNumber (Window aggregation) and emit a new event based on those. N sorted EnrichedEvents produces one TransactionEvent for this transactionId. 4. Sink TransactionEvents Requirements: 1. Have high task slot utilization (Low number of idle/un-addressed task slots). 2. EnrichedEvents for the same transactionId should go to the same TaskSlot (hence the same TaskManager). Question: How this can be achieved? How parallelism value for each operator should be set? Note: Probably I can already key the original RawEvents on transactionId. Thanks, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/