kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuto Kawamura (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
Date Fri, 03 Jun 2016 08:15:59 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313828#comment-15313828
] 

Yuto Kawamura commented on KAFKA-3775:
--------------------------------------

Thanks for feedback [~BigAndy] .

> With the purposed design some partitions would remain with out a consumer. This seems
like a fundamental switch away from Kafka's current model, and a risky one in IMHO.

Some partitions would remain without a consumer *if the number of living instances become
lower than the number of {{num of partitions / max.tasks.assigned}}*.
Let's say you have 100 partitions and launching 50 KafkaStreams instances with setting {{max.tasks.assigned=5}}.
When you started all 50 instances each instance might get 2 partitions assigned, which is
the desired distribution.
Then what will happen when an instance failed? 2 partitions which were held by the dead instance
will be reassigned to remaining instances without any problem as other instances still have
plenty number of {{max.tasks.assigned}}.
If more than 31 instances dead at the moment, yes, some partitions will be remain unassigned,
but this is out of consideration as the value of {{max.tasks.assigned}} was determined with
the consideration to the amount of system resources(CPU, mem, network bandwidth), which means
these unassigned partitions could never be processed normally even they reassigned to the
living instances because of hardware resource is limited.

> This seems like a fundamental switch away from Kafka's current model, and a risky one
in IMHO.

BTW, may I ask what you meant by "Kafka's current model" and what risk could you expect much
concretely?(user won't noticed unassigned partitions existence?)

> Could you also elaborate on why settings such as 'max.poll.records' don't help stop your
initial instance going pop? Maybe there are other alternative solutions here...

Because even I set {{max.poll.records}} to lower, it reduced the number of records fetched
by single Fetch request but instead the number of Fetch request will be increased. That means
the total throughput wouldn't chagne which still leads traffic bursting.
At the same time, it doesn't make sense to me that adjusting the value of {{max.poll.records}}
with expecting that a single gets all partitions assigned, as I can set that value to much
higher practically when other instances join the group and partitions are evenly distributed.


> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which consists of single
KafkaStreams instance, that instance gets all partitions of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and message traffic,
it is a problem that we don't have a way of throttling the maximum amount of partitions assigned
to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has more than
10MB/sec traffic of each partition we saw that all partitions assigned to the first instance
and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution framework,
there's no predefined procedure of starting Kafka Streams apps so some users might wanna take
an option to start the first single instance and check if it works as expected with lesster
number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, {{max.partition.fetch.bytes}}
and {{max.poll.records}} to reduce the heap pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it accepts all incoming
messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that users don't
have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams
here). Users should be allowed to provide the maximum amount of partitions that is considered
as possible to be processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter {{max.tasks.assigned}}, which
limits the number of tasks(a notion of partition) assigned to the processId(which is the notion
of single KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to tolerate
the incomplete assignment. That is, Kafka Streams should continue working for the part of
partitions even there are some partitions left unassigned, in order to satisfy this> "user
may want to take an option to start the first single instance and check if it works as expected
with lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will continue sophisticating
it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message