Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E7079200D33 for ; Wed, 8 Nov 2017 11:57:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E56BF160BE0; Wed, 8 Nov 2017 10:57:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 379DB160BDA for ; Wed, 8 Nov 2017 11:57:48 +0100 (CET) Received: (qmail 52170 invoked by uid 500); 8 Nov 2017 10:57:47 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 52160 invoked by uid 99); 8 Nov 2017 10:57:47 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Nov 2017 10:57:47 +0000 Received: from Tzu-Lis-MBP.mail (36-230-69-123.dynamic-ip.hinet.net [36.230.69.123]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id D89761A0015; Wed, 8 Nov 2017 10:57:45 +0000 (UTC) Date: Wed, 8 Nov 2017 18:57:42 +0800 From: "Tzu-Li (Gordon) Tai" To: user@flink.apache.org, yunfan123 Message-ID: In-Reply-To: <1510137529273-0.post@n4.nabble.com> References: <1510126533719-0.post@n4.nabble.com> <1510137529273-0.post@n4.nabble.com> Subject: Re: What happened if my parallelism more than kafka partitions. X-Mailer: Airmail (457) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="5a02e326_597fdfef_137" archived-at: Wed, 08 Nov 2017 10:57:49 -0000 --5a02e326_597fdfef_137 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline The =60KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)= =60 method returns the index of the target subtask for a given Kafka part= ition. The implementation in that method ensures that the same subtask index wil= l always be returned for the same partition. Each consumer subtask will locally invoke this assignment method for each= Kafka partition. If the returned subtask index doesn=E2=80=99t equal the subtask=E2=80=99s= index, that partition will be filtered out and not be read by the subtas= k. On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfighting=40foxmail.com= ) wrote: The code of kafka partition assign is like follows: =20 public static int assign(KafkaTopicPartition partition, int =20 numParallelSubtasks) =7B =20 int startIndex =3D ((partition.getTopic().hashCode() * 31) & 0x7=46=46=46= =46=46=46=46) % =20 numParallelSubtasks; =20 // here, the assumption is that the id of Kafka partitions are always =20 ascending =20 // starting from 0, and therefore can be used directly as the offset =20 clockwise from the start index =20 return (startIndex + partition.getPartition()) % numParallelSubtasks; =20 =7D =20 It seems it will assign to multi sub tasks. =20 I wonder how flink ensure some subtasks will simply remain idle =20 -- =20 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabbl= e.com/ =20 --5a02e326_597fdfef_137 Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline