kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
Date Tue, 30 Jan 2018 23:32:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345980#comment-16345980
] 

ASF GitHub Bot commented on KAFKA-4969:
---------------------------------------

guozhangwang closed pull request #4410: KAFKA-4969: Attempt to evenly distribute load of tasks
URL: https://github.com/apache/kafka/pull/4410
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 6709419c2bc..2a08308a2fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -45,6 +45,7 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -505,27 +506,26 @@ public Subscription subscription(Set<String> topics) {
             final Set<String> consumers = entry.getValue().consumers;
             final ClientState state = entry.getValue().state;
 
-            final ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTaskCount());
-            final int numActiveTasks = state.activeTaskCount();
+            final List<List<TaskId>> interleavedActive = interleaveTasksByGroupId(state.activeTasks(),
consumers.size());
+            final List<List<TaskId>> interleavedStandby = interleaveTasksByGroupId(state.standbyTasks(),
consumers.size());
 
-            taskIds.addAll(state.activeTasks());
-            taskIds.addAll(state.standbyTasks());
+            int consumerTaskIndex = 0;
 
-            final int numConsumers = consumers.size();
-
-            int i = 0;
             for (String consumer : consumers) {
                 final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
                 final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
 
-                final int numTaskIds = taskIds.size();
-                for (int j = i; j < numTaskIds; j += numConsumers) {
-                    final TaskId taskId = taskIds.get(j);
-                    if (j < numActiveTasks) {
-                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
-                            assignedPartitions.add(new AssignedPartition(taskId, partition));
-                        }
-                    } else {
+                final List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex);
+
+                for (final TaskId taskId : assignedActiveList) {
+                    for (final TopicPartition partition : partitionsForTask.get(taskId))
{
+                        assignedPartitions.add(new AssignedPartition(taskId, partition));
+                    }
+                }
+
+                if (!state.standbyTasks().isEmpty()) {
+                    final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
+                    for (final TaskId taskId : assignedStandbyList) {
                         Set<TopicPartition> standbyPartitions = standby.get(taskId);
                         if (standbyPartitions == null) {
                             standbyPartitions = new HashSet<>();
@@ -535,6 +535,8 @@ public Subscription subscription(Set<String> topics) {
                     }
                 }
 
+                consumerTaskIndex++;
+
                 Collections.sort(assignedPartitions);
                 final List<TaskId> active = new ArrayList<>();
                 final List<TopicPartition> activePartitions = new ArrayList<>();
@@ -545,13 +547,32 @@ public Subscription subscription(Set<String> topics) {
 
                 // finally, encode the assignment before sending back to coordinator
                 assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active,
standby, partitionsByHostState).encode()));
-                i++;
             }
         }
 
         return assignment;
     }
 
+    // visible for testing
+    List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId>
taskIds, final int numberThreads) {
+        final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
+        Collections.sort(sortedTasks);
+        final List<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads);
+        for (int i = 0; i < numberThreads; i++) {
+            taskIdsForConsumerAssignment.add(new ArrayList<TaskId>());
+        }
+        while (!sortedTasks.isEmpty()) {
+            for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment) {
+                final TaskId taskId = sortedTasks.poll();
+                if (taskId == null) {
+                    break;
+                }
+                taskIdList.add(taskId);
+            }
+        }
+        return taskIdsForConsumerAssignment;
+    }
+
     /**
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 91738e0da20..de8fa57e36a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -115,7 +115,7 @@ private void assignActive() {
 
 
     private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID>
clientsWithin, final boolean active) {
-        final ClientState client = findClient(taskId, clientsWithin);
+        final ClientState client = findClient(taskId, clientsWithin, active);
         taskPairs.addPairs(taskId, client.assignedTasks());
         client.assign(taskId, active);
     }
@@ -137,7 +137,7 @@ private void assignTaskToClient(final Set<TaskId> assigned, final
TaskId taskId,
     }
 
 
-    private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin)
{
+    private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin,
boolean active) {
 
         // optimize the case where there is only 1 id to search within.
         if (clientsWithin.size() == 1) {
@@ -146,14 +146,14 @@ private ClientState findClient(final TaskId taskId, final Set<ID>
clientsWithin)
 
         final ClientState previous = findClientsWithPreviousAssignedTask(taskId, clientsWithin);
         if (previous == null) {
-            return leastLoaded(taskId, clientsWithin);
+            return leastLoaded(taskId, clientsWithin, active);
         }
 
         if (shouldBalanceLoad(previous)) {
             final ClientState standby = findLeastLoadedClientWithPreviousStandByTask(taskId,
clientsWithin);
             if (standby == null
                     || shouldBalanceLoad(standby)) {
-                return leastLoaded(taskId, clientsWithin);
+                return leastLoaded(taskId, clientsWithin, active);
             }
             return standby;
         }
@@ -190,20 +190,21 @@ private ClientState findLeastLoadedClientWithPreviousStandByTask(final
TaskId ta
         }
         final HashSet<ID> constrainTo = new HashSet<>(ids);
         constrainTo.retainAll(clientsWithin);
-        return leastLoaded(taskId, constrainTo);
+        return leastLoaded(taskId, constrainTo, false);
     }
 
-    private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds) {
-        final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true);
+    private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds, final
boolean active) {
+        final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true, active);
         if (leastLoaded == null) {
-            return findLeastLoaded(taskId, clientIds, false);
+            return findLeastLoaded(taskId, clientIds, false, active);
         }
         return leastLoaded;
     }
 
     private ClientState findLeastLoaded(final TaskId taskId,
-                                                final Set<ID> clientIds,
-                                                boolean checkTaskPairs) {
+                                        final Set<ID> clientIds,
+                                        final boolean checkTaskPairs,
+                                        final boolean active) {
         ClientState leastLoaded = null;
         for (final ID id : clientIds) {
             final ClientState client = clients.get(id);
@@ -214,7 +215,7 @@ private ClientState findLeastLoaded(final TaskId taskId,
             if (leastLoaded == null || client.hasMoreAvailableCapacityThan(leastLoaded))
{
                 if (!checkTaskPairs) {
                     leastLoaded = client;
-                } else if (taskPairs.hasNewPair(taskId, client.assignedTasks())) {
+                } else if (taskPairs.hasNewPair(taskId, client.assignedTasks(), active))
{
                     leastLoaded = client;
                 }
             }
@@ -257,12 +258,17 @@ private int sumCapacity(final Collection<ClientState> values)
{
             this.pairs = new HashSet<>(maxPairs);
         }
 
-        boolean hasNewPair(final TaskId task1, final Set<TaskId> taskIds) {
+        boolean hasNewPair(final TaskId task1,
+                           final Set<TaskId> taskIds,
+                           final boolean active) {
             if (pairs.size() == maxPairs) {
                 return false;
             }
             for (final TaskId taskId : taskIds) {
-                if (!pairs.contains(pair(task1, taskId))) {
+                if (!active && !pairs.contains(pair(task1, taskId))) {
+                    return true;
+                }
+                if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId !=
taskId.topicGroupId) {
                     return true;
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 582c70b3dfd..02ab803735a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -70,9 +70,11 @@
     private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
     private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
     private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
+    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
     private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
     private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
     private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
+    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
     private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
     private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
     private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
@@ -137,6 +139,33 @@ private void mockTaskManager(final Set<TaskId> prevTasks,
         EasyMock.replay(taskManager);
     }
 
+    @Test
+    public void shouldInterleaveTasksByGroupId() {
+        final TaskId taskIdA0 = new TaskId(0, 0);
+        final TaskId taskIdA1 = new TaskId(0, 1);
+        final TaskId taskIdA2 = new TaskId(0, 2);
+        final TaskId taskIdA3 = new TaskId(0, 3);
+
+        final TaskId taskIdB0 = new TaskId(1, 0);
+        final TaskId taskIdB1 = new TaskId(1, 1);
+        final TaskId taskIdB2 = new TaskId(1, 2);
+
+        final TaskId taskIdC0 = new TaskId(2, 0);
+        final TaskId taskIdC1 = new TaskId(2, 1);
+
+        final List<TaskId> expectedSubList1 = Arrays.asList(taskIdA0, taskIdA3, taskIdB2);
+        final List<TaskId> expectedSubList2 = Arrays.asList(taskIdA1, taskIdB0, taskIdC0);
+        final List<TaskId> expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1);
+        final List<List<TaskId>> embeddedList = Arrays.asList(expectedSubList1,
expectedSubList2, expectedSubList3);
+
+        List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1,
taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
+        Collections.shuffle(tasks);
+
+        final List<List<TaskId>> interleavedTaskIds = partitionAssignor.interleaveTasksByGroupId(tasks,
3);
+
+        assertThat(interleavedTaskIds, equalTo(embeddedList));
+    }
+
     @Test
     public void testSubscription() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -229,6 +258,74 @@ public void testAssignBasic() throws Exception {
         assertEquals(allTasks, allActiveTasks);
     }
 
+    @Test
+    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws Exception
{
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
+        builder.addProcessor("processorII", new MockProcessorSupplier(),  "source2");
+
+        final List<PartitionInfo> localInfos = Arrays.asList(
+            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0])
+        );
+
+        final Cluster localMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            localInfos, Collections.<String>emptySet(),
+            Collections.<String>emptySet());
+
+        final List<String> topics = Utils.mkList("topic1", "topic2");
+
+        final TaskId taskIdA0 = new TaskId(0, 0);
+        final TaskId taskIdA1 = new TaskId(0, 1);
+        final TaskId taskIdA2 = new TaskId(0, 2);
+        final TaskId taskIdA3 = new TaskId(0, 3);
+
+        final TaskId taskIdB0 = new TaskId(1, 0);
+        final TaskId taskIdB1 = new TaskId(1, 1);
+        final TaskId taskIdB2 = new TaskId(1, 2);
+        final TaskId taskIdB3 = new TaskId(1, 3);
+
+        final UUID uuid1 = UUID.randomUUID();
+
+        mockTaskManager(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1,
builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig,
mockClientSupplier.restoreConsumer));
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1,
new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+        subscriptions.put("consumer11",
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1,
new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata,
subscriptions);
+
+        // check assigned partitions
+        assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1,
t1p3, t2p3)),
+                     Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()),
new HashSet<>(assignments.get("consumer11").partitions())));
+
+        // the first consumer
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+
+        final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3,
taskIdB1, taskIdB3);
+        assertEquals(expectedInfo10TaskIds, info10.activeTasks);
+
+        // the second consumer
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2,
taskIdB0, taskIdB2);
+
+        assertEquals(expectedInfo11TaskIds, info11.activeTasks);
+    }
+
     @Test
     public void testAssignWithPartialTopology() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 86af0be1ec9..4f770c86239 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -44,6 +44,16 @@
     private final TaskId task03 = new TaskId(0, 3);
     private final TaskId task04 = new TaskId(0, 4);
     private final TaskId task05 = new TaskId(0, 5);
+
+    private final TaskId task10 = new TaskId(1, 0);
+    private final TaskId task11 = new TaskId(1, 1);
+    private final TaskId task12 = new TaskId(1, 2);
+    private final TaskId task20 = new TaskId(2, 0);
+    private final TaskId task21 = new TaskId(2, 1);
+    private final TaskId task22 = new TaskId(2, 2);
+
+    private final List<Integer> expectedTopicGroupIds = Arrays.asList(1, 2);
+
     private final Map<Integer, ClientState> clients = new TreeMap<>();
     private final Integer p1 = 1;
     private final Integer p2 = 2;
@@ -64,6 +74,28 @@ public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCoun
         }
     }
 
+    @Test
+    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() {
+        createClient(p1, 2);
+        createClient(p2, 2);
+        createClient(p3, 2);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task11, task22,
task20, task21, task12);
+        taskAssignor.assign(0);
+        assertActiveTaskTopicGroupIdsEvenlyDistributed();
+    }
+
+    @Test
+    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() {
+        createClient(p1, 2);
+        createClient(p2, 2);
+        createClient(p3, 2);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task20, task11, task12,
task10, task21, task22);
+        taskAssignor.assign(1);
+        assertActiveTaskTopicGroupIdsEvenlyDistributed();
+    }
+
     @Test
     public void shouldNotMigrateActiveTaskToOtherProcess() {
         createClientWithPreviousActiveTasks(p1, 1, task00);
@@ -621,4 +653,16 @@ private ClientState createClientWithPreviousActiveTasks(final Integer
processId,
         return clientState;
     }
 
+    private void assertActiveTaskTopicGroupIdsEvenlyDistributed() {
+        for (final Map.Entry<Integer, ClientState> clientStateEntry : clients.entrySet())
{
+            final List<Integer> topicGroupIds = new ArrayList<>();
+            final Set<TaskId> activeTasks = clientStateEntry.getValue().activeTasks();
+            for (final TaskId activeTask : activeTasks) {
+                topicGroupIds.add(activeTask.topicGroupId);
+            }
+            Collections.sort(topicGroupIds);
+            assertThat(topicGroupIds, equalTo(expectedTopicGroupIds));
+        }
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> State-store workload-aware StreamsPartitionAssignor
> ---------------------------------------------------
>
>                 Key: KAFKA-4969
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4969
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Bill Bejeck
>            Priority: Major
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" of tasks.
For example, task can be stateless of have one or multiple stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless and 2 stateful
tasks and the app is running with 2 instances. To share the "store load" it would be good
to place one stateless and one stateful task per instance. Right now, there is no guarantee
about this, and it can happen, that one instance processed both stateless tasks while the
other processes both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" including a
cost model for task placement. We should consider the following parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki (no KIP required
though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message