Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 55E942009F8 for ; Fri, 3 Jun 2016 10:18:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 546CB160A2A; Fri, 3 Jun 2016 08:18:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 74231160A25 for ; Fri, 3 Jun 2016 10:18:00 +0200 (CEST) Received: (qmail 15255 invoked by uid 500); 3 Jun 2016 08:17:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 15238 invoked by uid 99); 3 Jun 2016 08:17:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jun 2016 08:17:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 5BE7C2C1F60 for ; Fri, 3 Jun 2016 08:17:59 +0000 (UTC) Date: Fri, 3 Jun 2016 08:17:59 +0000 (UTC) From: "Yuto Kawamura (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 03 Jun 2016 08:18:01 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313828#comment-15313828 ] Yuto Kawamura edited comment on KAFKA-3775 at 6/3/16 8:17 AM: -------------------------------------------------------------- Thanks for feedback [~BigAndy] . > With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. Some partitions would remain without a consumer *if the number of living instances become lower than the number of {{num of partitions / max.tasks.assigned}}*. Let's say you have 100 partitions and launching 50 KafkaStreams instances with setting {{max.tasks.assigned=5}}. When you started all 50 instances each instance might get 2 partitions assigned, which is the desired distribution. Then what will happen when an instance failed? 2 partitions which were held by the dead instance will be reassigned to remaining instances without any problem as other instances still have plenty number of {{max.tasks.assigned}}. If more than 31 instances dead at the moment, yes, some partitions will be remain unassigned, but this is out of consideration as the value of {{max.tasks.assigned}} was determined with the consideration to the amount of system resources(CPU, mem, network bandwidth), which means these unassigned partitions could never be processed normally even they reassigned to the living instances because of hardware resource is limited. > This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. BTW, may I ask what you meant by "Kafka's current model" and what risk could you expect much concretely?(user won't noticed unassigned partitions existence?) > Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... Because even I set {{max.poll.records}} to lower, it reduced the number of records fetched by single Fetch request but instead the number of Fetch request will be increased. That means the total throughput wouldn't chagne which still leads traffic bursting. At the same time, it doesn't make sense to me that adjusting the value of {{max.poll.records}} with expecting that a single instance gets all partitions assigned, as I can set that value to much higher practically when other instances join the group and partitions are evenly distributed. was (Author: kawamuray): Thanks for feedback [~BigAndy] . > With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. Some partitions would remain without a consumer *if the number of living instances become lower than the number of {{num of partitions / max.tasks.assigned}}*. Let's say you have 100 partitions and launching 50 KafkaStreams instances with setting {{max.tasks.assigned=5}}. When you started all 50 instances each instance might get 2 partitions assigned, which is the desired distribution. Then what will happen when an instance failed? 2 partitions which were held by the dead instance will be reassigned to remaining instances without any problem as other instances still have plenty number of {{max.tasks.assigned}}. If more than 31 instances dead at the moment, yes, some partitions will be remain unassigned, but this is out of consideration as the value of {{max.tasks.assigned}} was determined with the consideration to the amount of system resources(CPU, mem, network bandwidth), which means these unassigned partitions could never be processed normally even they reassigned to the living instances because of hardware resource is limited. > This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. BTW, may I ask what you meant by "Kafka's current model" and what risk could you expect much concretely?(user won't noticed unassigned partitions existence?) > Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... Because even I set {{max.poll.records}} to lower, it reduced the number of records fetched by single Fetch request but instead the number of Fetch request will be increased. That means the total throughput wouldn't chagne which still leads traffic bursting. At the same time, it doesn't make sense to me that adjusting the value of {{max.poll.records}} with expecting that a single gets all partitions assigned, as I can set that value to much higher practically when other instances join the group and partitions are evenly distributed. > Throttle maximum number of tasks assigned to a single KafkaStreams > ------------------------------------------------------------------ > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which consists of single KafkaStreams instance, that instance gets all partitions of the target topic assigned. > As we're using it to process topics which has huge number of partitions and message traffic, it is a problem that we don't have a way of throttling the maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has more than 10MB/sec traffic of each partition we saw that all partitions assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution framework, there's no predefined procedure of starting Kafka Streams apps so some users might wanna take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here). Users should be allowed to provide the maximum amount of partitions that is considered as possible to be processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter {{max.tasks.assigned}}, which limits the number of tasks(a notion of partition) assigned to the processId(which is the notion of single KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to tolerate the incomplete assignment. That is, Kafka Streams should continue working for the part of partitions even there are some partitions left unassigned, in order to satisfy this> "user may want to take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)