kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Liu Luying <luying...@outlook.com>
Subject Stopping All Tasks When a New Connector Added
Date Fri, 19 Jul 2019 03:00:08 GMT
Hi all,
I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and then start all the
tasks, including the new tasks and the existing ones when adding a new connector or changing
a connector configuration. However, I do not think it is a must. Only the new connector and
tasks need to be started. As the rebalancing can be applied for both running and suspended
tasks.

The problem lies in the KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line
623 in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the tasks are
being committed, and the deferred tasks are processed, Some new tasks are added to the 'updatedTasks'(line
623 in KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to updateListener to
complete the task configuration update(line 638 in KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
function, the  'updatedTasks' are added to the member variable, 'taskConfigUpdates', of class
DistributedHerder(line 1295 in DistributedHerder.java).

In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative()
(line 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in
processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in DistributedHerder.java).
This function then uses  'taskConfigUpdatesCopy' to find connectors to stop(line 492 in DistributedHerder.java),
and finally get the tasks to stop, which are all the tasks. The worker thread does the actual
job of stop(line 499 in DistributedHerder.java).

In the original code, all the tasks are added to the 'updatedTasks' (line 623 in KafkaConfigBackingStore.java),
which means all the active connectors are in the 'connectorsWhoseTasksToStop' set, and all
the tasks are in the 'tasksToStop' list. This causes the stops, and of course the subsequent
restarts, of all the tasks.

So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the stops and restarts
of unnecessary tasks.

Best,
Luying


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message