Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 595A72009F8 for ; Fri, 3 Jun 2016 20:56:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 581C2160A3B; Fri, 3 Jun 2016 18:56:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A8B08160A50 for ; Fri, 3 Jun 2016 20:56:00 +0200 (CEST) Received: (qmail 47418 invoked by uid 500); 3 Jun 2016 18:55:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 47174 invoked by uid 99); 3 Jun 2016 18:55:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jun 2016 18:55:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 3AC152C1F69 for ; Fri, 3 Jun 2016 18:55:59 +0000 (UTC) Date: Fri, 3 Jun 2016 18:55:59 +0000 (UTC) From: "Guozhang Wang (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 03 Jun 2016 18:56:01 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314606#comment-15314606 ] Guozhang Wang commented on KAFKA-3775: -------------------------------------- Hi [~mjsax] [~kawamuray], great discussion regarding 1) above! Personally I think it is flexible to let users specify which partitions among all the topics the topology defines as sources to be processed, which is better from a user experience point of view. Actually it is not completely true that "users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here)." In fact, the user customizable {{PartitionGrouper}} is used exactly for that, which takes the list of all topic-partitions as input, generates the tasks with each task assigned with some topic-partitions. The {{DefaultPartitionGrouper}} of course tries to capture all topic-partitions and generates multiple tasks for them. But users can also customize it by, for example, generating only one task which takes one partition for each of the input topic, and this single task will be assigned to the ONLY instance in your case. NOTE that this partition grouper is global, such that if you have two instances, both of them will execute the same {{PartitionGrouper}}, and if only one task is generated, some instance will become completely idle, and this need to be communicated clearly to users. Does that sound good? > Throttle maximum number of tasks assigned to a single KafkaStreams > ------------------------------------------------------------------ > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which consists of single KafkaStreams instance, that instance gets all partitions of the target topic assigned. > As we're using it to process topics which has huge number of partitions and message traffic, it is a problem that we don't have a way of throttling the maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has more than 10MB/sec traffic of each partition we saw that all partitions assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution framework, there's no predefined procedure of starting Kafka Streams apps so some users might wanna take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here). Users should be allowed to provide the maximum amount of partitions that is considered as possible to be processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter {{max.tasks.assigned}}, which limits the number of tasks(a notion of partition) assigned to the processId(which is the notion of single KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to tolerate the incomplete assignment. That is, Kafka Streams should continue working for the part of partitions even there are some partitions left unassigned, in order to satisfy this> "user may want to take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)