kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuto Kawamura (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
Date Fri, 03 Jun 2016 04:52:59 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313577#comment-15313577

Yuto Kawamura commented on KAFKA-3775:

Thanks for feedback [~mjsax] .

> 1) a KStreams application should process the whole topic and not parts of it – limiting
the number of partitions is kinda artificial from my point of view

So the question is what "KStreams application" consists of. I know that Kafka Streams is designed
to work evenly with standalone but the main purpose of making it able to work as standalone
is about easy development and testing IIUC. Practially, if we try to run it with the production
traffic which consists of hundreads of partitions, it is practially impossible to assign all
partitions to a single instance transparently. Indeed restricting the maximum number of partition
per instance is an artificial control but that should be given as Kafka Streams is not an
execution framework as I said. Users have almost full control of how to construct the Kafka
Streams app cluster, that is, it should be allowed to run instances gradually one by one instead
of starting necessary number of instances at once, but it's impossible with the existing impl
by the reason I described.

> 2) even if we limit the number of partitions, it is quite random which would get processed
which not – I would assume that users would like to have a more transparent assignment

I think Kafka Streams partition assignment already isn't transparent. Unless the sticky partition
assignment strategy enabled, StreamPartitionAssignor chooses which task(partition) assigned
to which instance in round robin with intorducing randomness. That is, we have no control
of which partition assigned to which instance by nature.
At least you can ensure that all partitions are being assigned if you start instances more
than {{partitions / `max.assigned.tasks`}}, and also it's remain possible to not take this
option by leaving the configuration with default value(Interger.MAX_VALUE) which guarantees
that single instance still accepts all tasks(partitions) assigned.

> 3) last but not least, under the hood we are using the standard Java KafkaConsumer: looking
at your patch (just briefly), it seems you changed the task assignment – however, this is
independent from the partitions assignment of the used consumer – thus, the consumer would
still poll all partitions but would not be able to assign records for some partitions as the
corresponding tasks are missing.

Hmm, not sure if I'm understanding your explanation correctly but this sounds different from
what I know.
First, KafkaStreams is providing custom PartitionAssignor; StreamPartitionAssignor which takes
full control of which partition to assign which consumer thread of which instance.
Second, the consuemr polls only partitions which it gets assigned by group coordinator that
relies on PartitionAssignor to decide the actual assignment. So that is, an instance will
never get a record from the partition which isn't being assigned to it, therefore what you've
concerned will never happend IIUC.
Am I misunderstand something?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions:
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For:
> As of today, if I start a Kafka Streams app on a single machine which consists of single
KafkaStreams instance, that instance gets all partitions of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and message traffic,
it is a problem that we don't have a way of throttling the maximum amount of partitions assigned
to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has more than
10MB/sec traffic of each partition we saw that all partitions assigned to the first instance
and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution framework,
there's no predefined procedure of starting Kafka Streams apps so some users might wanna take
an option to start the first single instance and check if it works as expected with lesster
number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, {{max.partition.fetch.bytes}}
and {{max.poll.records}} to reduce the heap pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it accepts all incoming
messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that users don't
have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams
here). Users should be allowed to provide the maximum amount of partitions that is considered
as possible to be processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter {{max.tasks.assigned}}, which
limits the number of tasks(a notion of partition) assigned to the processId(which is the notion
of single KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to tolerate
the incomplete assignment. That is, Kafka Streams should continue working for the part of
partitions even there are some partitions left unassigned, in order to satisfy this> "user
may want to take an option to start the first single instance and check if it works as expected
with lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will continue sophisticating

This message was sent by Atlassian JIRA

View raw message