kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-4117) Cleanup StreamPartitionAssignor behavior
Date Fri, 02 Sep 2016 17:06:20 GMT
Guozhang Wang created KAFKA-4117:

             Summary: Cleanup StreamPartitionAssignor behavior
                 Key: KAFKA-4117
                 URL: https://issues.apache.org/jira/browse/KAFKA-4117
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Guozhang Wang

I went through the whole assignment logic once again and I feel the logic has now becomes
a bit lossy, and I want to clean them up probably in another PR but just dump my thoughts
here on the appropriate logic (also cc @enothereska @mjsax):

Some background:

1. Each KafkaStreams instance contains a clientId, and if not specified default value is applicationId-1/2/etc
if there are multiple instances inside the same JVM. One instance contains multiple threads
where the thread-clientId is constructed as clientId-StreamThread-1/2/etc, and the thread-clientId
is used as the embedded consumer clientId as well as metrics tag.

2. However, since one instance can contain multiple threads, and hence multiple consumers,
and when considering partition assignment, the streams library need to take the capacity into
consideration based on the granularity of instance not on threads. Therefore we create a 4byte
UUID.randomUUID() as the processId and encode that in the subscription metadata bytes, and
the leader then knows if multiple consumer members are actually belong to the same instance
(i.e. belong to threads of that instance), so that when assigning partitions it can balance
among instances. NOTE that in production we recommend one thread per instance, so consumersByClient
will only have one consumer per client (i.e. instance).

3. In addition, historically we hard-code the partition grouper logic, where for each task,
it is assigned only with one partition of its subscribed topic. For example, if we have topicA
with 5 partitions and topicB with 10 partitions, we will create 10 tasks, with the first five
tasks containing one of the partitions each, while the last five tasks contain only one partition
from topicB. And therefore the TaskId class contains the groupId of the sub-topology and the
partition, so that taskId(group, 1) gets partition1 of topicA and partition1 of topicB. We
later expose this to users to customize so that more than one partitions of the topic can
be assigned to the same task, so that the partition field in the TaskId no longer indicate
anything about which partitions are assigned, and we add AssignedPartitions to capture which
partitions are assigned to which tasks.

4. While doing the assignment, the leader is also responsible for creating these changelog
/ repartition topics, and the number of partitions of these topics are equal to the number
of tasks that needs to write to these topics, which are wrapped in stateChangelogTopicToTaskIds
and internalSourceTopicToTaskIds respectively. After such topics are created, the leader also
needs to "augment" the received cluster metadata with these topics to 1) check for copartitioning,
and 2) maintained for QueryableState's discovery function.

The current implementation is mixed with all these legacy logic and gets quite messy, and
I'm thinking to make a pass over the StreamPartitionAssignor and cleaning up it bit. More

1. Read and parse the subscription information to construct the clientMetadata map, where
each metadata contains the Set<String> consumerMemberIds, ClientState<TaskId>
state, and HostInfo hostInfo.

2. Access the (sub-)topology to create the corresponding changelog / repartition topics and
construct the stateChangelogTopicToTaskIds and internalSourceTopicToTaskIds.

Call streamThread.partitionGrouper.partitionGroups to get the map from created tasks to their
assigned partitions.

3. Call TaskAssignor.assign (which now takes the whole clientMetadata map) to assign tasks
to clients, and hence we get the assigned partitions to clients.

4. For each client, use some round-robin manner (as we did now) to assign tasks to their hosted
consumers with the clientMetadata.consumerMemberIds map.

5. Check co-partitioning of assigned partitions, and maintain the Cluster metadata locally
on the leader.

6. Construct the assignment info, where activeTasks is also a map from TaskId to list of TopicPartitions
since otherwise we will not know which partitions are assigned to which tasks.

7. For non-leaders, when getting the assignment, also construct the Cluster metadata from
the decoded assignment information; and also maintain the AssignmentInfo locally for constructing
the tasks.

And some minor improvements:

1. The default thread-clientIds applicationId-x-StreamThread-y" may still be conflicting to
each other with multiple JVMs / machines, which is bad for metrics collection / debugging
across hosts. We can modify the default clientId toapplicationId-processIdwhereprocessIdisUUID,
hence the default thread-clientId isapplicationId-UUID-StreamThread-y`.

2. The TaskId.partition field no longer indicate which partitions are actually assigned to
this task, but we still need to keep its topicGroupId field as it indicates which sub-topology
this task belongs to, hence helpful for debugging. So maybe we can rename the partition field
to sth. like sequence?

This message was sent by Atlassian JIRA

View raw message