Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E536200B0F for ; Fri, 3 Jun 2016 06:53:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8D069160A52; Fri, 3 Jun 2016 04:53:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D5DD5160A51 for ; Fri, 3 Jun 2016 06:53:00 +0200 (CEST) Received: (qmail 83986 invoked by uid 500); 3 Jun 2016 04:52:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 83973 invoked by uid 99); 3 Jun 2016 04:52:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jun 2016 04:52:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 60B802C1F69 for ; Fri, 3 Jun 2016 04:52:59 +0000 (UTC) Date: Fri, 3 Jun 2016 04:52:59 +0000 (UTC) From: "Yuto Kawamura (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 03 Jun 2016 04:53:01 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3775?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1531= 3577#comment-15313577 ]=20 Yuto Kawamura commented on KAFKA-3775: -------------------------------------- Thanks for feedback [~mjsax] . > 1) a KStreams application should process the whole topic and not parts of= it =E2=80=93 limiting the number of partitions is kinda artificial from my= point of view So the question is what "KStreams application" consists of. I know that Kaf= ka Streams is designed to work evenly with standalone but the main purpose = of making it able to work as standalone is about easy development and testi= ng IIUC. Practially, if we try to run it with the production traffic which = consists of hundreads of partitions, it is practially impossible to assign = all partitions to a single instance transparently. Indeed restricting the m= aximum number of partition per instance is an artificial control but that s= hould be given as Kafka Streams is not an execution framework as I said. Us= ers have almost full control of how to construct the Kafka Streams app clus= ter, that is, it should be allowed to run instances gradually one by one in= stead of starting necessary number of instances at once, but it's impossibl= e with the existing impl by the reason I described. > 2) even if we limit the number of partitions, it is quite random which wo= uld get processed which not =E2=80=93 I would assume that users would like = to have a more transparent assignment I think Kafka Streams partition assignment already isn't transparent. Unles= s the sticky partition assignment strategy enabled, StreamPartitionAssignor= chooses which task(partition) assigned to which instance in round robin wi= th intorducing randomness. That is, we have no control of which partition a= ssigned to which instance by nature. At least you can ensure that all partitions are being assigned if you start= instances more than {{partitions / `max.assigned.tasks`}}, and also it's r= emain possible to not take this option by leaving the configuration with de= fault value(Interger.MAX_VALUE) which guarantees that single instance still= accepts all tasks(partitions) assigned. > 3) last but not least, under the hood we are using the standard Java Kafk= aConsumer: looking at your patch (just briefly), it seems you changed the t= ask assignment =E2=80=93 however, this is independent from the partitions a= ssignment of the used consumer =E2=80=93 thus, the consumer would still pol= l all partitions but would not be able to assign records for some partition= s as the corresponding tasks are missing. Hmm, not sure if I'm understanding your explanation correctly but this soun= ds different from what I know. First, KafkaStreams is providing custom PartitionAssignor; StreamPartitionA= ssignor which takes full control of which partition to assign which consume= r thread of which instance. Second, the consuemr polls only partitions which it gets assigned by group = coordinator that relies on PartitionAssignor to decide the actual assignmen= t. So that is, an instance will never get a record from the partition which= isn't being assigned to it, therefore what you've concerned will never hap= pend IIUC. Am I misunderstand something? > Throttle maximum number of tasks assigned to a single KafkaStreams > ------------------------------------------------------------------ > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which con= sists of single KafkaStreams instance, that instance gets all partitions of= the target topic assigned. > As we're using it to process topics which has huge number of partitions a= nd message traffic, it is a problem that we don't have a way of throttling = the maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which= has more than 10MB/sec traffic of each partition we saw that all partition= s assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > =3D> Maybe works. but as Kafka Streams is a library but not an executio= n framework, there's no predefined procedure of starting Kafka Streams apps= so some users might wanna take an option to start the first single instanc= e and check if it works as expected with lesster number of partitions(I wan= t :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, {{= max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap pre= ssure. > =3D> Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it a= ccepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired t= hat users don't have a away to control maximum "partitions" assigned to a s= ingle shard(an instance of KafkaStreams here). Users should be allowed to p= rovide the maximum amount of partitions that is considered as possible to b= e processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter {{max.tasks.ass= igned}}, which limits the number of tasks(a notion of partition) assigned t= o the processId(which is the notion of single KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) = to tolerate the incomplete assignment. That is, Kafka Streams should contin= ue working for the part of partitions even there are some partitions left u= nassigned, in order to satisfy this> "user may want to take an option to st= art the first single instance and check if it works as expected with lesste= r number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will= continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)