kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4738: Remove generic type of class ClientState
Date Fri, 10 Mar 2017 19:28:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c063561ce -> dfa2ef483


KAFKA-4738: Remove generic type of class ClientState

Remove generic type of class ClientState and generic T inTaskAssignor.

Author: sharad.develop <sharad.develop@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2616 from sharad-develop/KAFKA-4738


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfa2ef48
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfa2ef48
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfa2ef48

Branch: refs/heads/trunk
Commit: dfa2ef4837259cd127b013e63ce26acdec8b94c5
Parents: c063561
Author: sharad.develop <sharad.develop@gmail.com>
Authored: Fri Mar 10 11:28:06 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Mar 10 11:28:06 2017 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 10 +--
 .../internals/assignment/ClientState.java       | 60 +++++++------
 .../assignment/StickyTaskAssignor.java          | 50 +++++------
 .../internals/assignment/ClientStateTest.java   | 92 ++++++++++++--------
 .../assignment/StickyTaskAssignorTest.java      | 56 ++++++------
 5 files changed, 143 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa2ef48/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e3f6698..859d661 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -79,7 +79,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private static class ClientMetadata {
         final HostInfo hostInfo;
         final Set<String> consumers;
-        final ClientState<TaskId> state;
+        final ClientState state;
 
         ClientMetadata(final String endPoint) {
 
@@ -100,7 +100,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             consumers = new HashSet<>();
 
             // initialize the client state
-            state = new ClientState<>();
+            state = new ClientState();
         }
 
         void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
@@ -449,7 +449,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // ---------------- Step Two ---------------- //
 
         // assign tasks to clients
-        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+        Map<UUID, ClientState> states = new HashMap<>();
         for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
             states.put(entry.getKey(), entry.getValue().state);
         }
@@ -471,7 +471,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
             if (hostInfo != null) {
                 final Set<TopicPartition> topicPartitions = new HashSet<>();
-                final ClientState<TaskId> state = entry.getValue().state;
+                final ClientState state = entry.getValue().state;
 
                 for (final TaskId id : state.activeTasks()) {
                     topicPartitions.addAll(partitionsForTask.get(id));
@@ -485,7 +485,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Map<String, Assignment> assignment = new HashMap<>();
         for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
             final Set<String> consumers = entry.getValue().consumers;
-            final ClientState<TaskId> state = entry.getValue().state;
+            final ClientState state = entry.getValue().state;
 
             final ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTaskCount());
             final int numActiveTasks = state.activeTaskCount();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa2ef48/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 99bd29e..15ee849 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -16,28 +16,30 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.HashSet;
 import java.util.Set;
 
-public class ClientState<T> {
-    private final Set<T> activeTasks;
-    private final Set<T> standbyTasks;
-    private final Set<T> assignedTasks;
-    private final Set<T> prevActiveTasks;
-    private final Set<T> prevAssignedTasks;
+public class ClientState {
+    private final Set<TaskId> activeTasks;
+    private final Set<TaskId> standbyTasks;
+    private final Set<TaskId> assignedTasks;
+    private final Set<TaskId> prevActiveTasks;
+    private final Set<TaskId> prevAssignedTasks;
 
     private int capacity;
 
+
     public ClientState() {
         this(0);
     }
 
     ClientState(final int capacity) {
-        this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new
HashSet<T>(), new HashSet<T>(), capacity);
+        this(new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(),
new HashSet<TaskId>(), new HashSet<TaskId>(), capacity);
     }
 
-    private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T>
assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, int capacity)
{
+    private ClientState(Set<TaskId> activeTasks, Set<TaskId> standbyTasks, Set<TaskId>
assignedTasks, Set<TaskId> prevActiveTasks, Set<TaskId> prevAssignedTasks, int
capacity) {
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.assignedTasks = assignedTasks;
@@ -46,12 +48,12 @@ public class ClientState<T> {
         this.capacity = capacity;
     }
 
-    public ClientState<T> copy() {
-        return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(standbyTasks),
new HashSet<>(assignedTasks),
+    public ClientState copy() {
+        return new ClientState(new HashSet<>(activeTasks), new HashSet<>(standbyTasks),
new HashSet<>(assignedTasks),
                 new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks),
capacity);
     }
 
-    public void assign(final T taskId, final boolean active) {
+    public void assign(final TaskId taskId, final boolean active) {
         if (active) {
             activeTasks.add(taskId);
         } else {
@@ -61,11 +63,11 @@ public class ClientState<T> {
         assignedTasks.add(taskId);
     }
 
-    public Set<T> activeTasks() {
+    public Set<TaskId> activeTasks() {
         return activeTasks;
     }
 
-    public Set<T> standbyTasks() {
+    public Set<TaskId> standbyTasks() {
         return standbyTasks;
     }
 
@@ -81,31 +83,31 @@ public class ClientState<T> {
         return activeTasks.size();
     }
 
-    public void addPreviousActiveTasks(final Set<T> prevTasks) {
+    public void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
         prevActiveTasks.addAll(prevTasks);
         prevAssignedTasks.addAll(prevTasks);
     }
 
-    public void addPreviousStandbyTasks(final Set<T> standbyTasks) {
+    public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
         prevAssignedTasks.addAll(standbyTasks);
     }
 
     @Override
     public String toString() {
         return "[activeTasks: (" + activeTasks +
-            ") standbyTasks: (" + standbyTasks +
-            ") assignedTasks: (" + assignedTasks +
-            ") prevActiveTasks: (" + prevActiveTasks +
-            ") prevAssignedTasks: (" + prevAssignedTasks +
-            ") capacity: " + capacity +
-            "]";
+                ") standbyTasks: (" + standbyTasks +
+                ") assignedTasks: (" + assignedTasks +
+                ") prevActiveTasks: (" + prevActiveTasks +
+                ") prevAssignedTasks: (" + prevAssignedTasks +
+                ") capacity: " + capacity +
+                "]";
     }
 
     boolean reachedCapacity() {
         return assignedTasks.size() >= capacity;
     }
 
-    boolean hasMoreAvailableCapacityThan(final ClientState<T> other) {
+    boolean hasMoreAvailableCapacityThan(final ClientState other) {
         if (this.capacity <= 0) {
             throw new IllegalStateException("Capacity of this ClientState must be greater
than 0.");
         }
@@ -125,26 +127,26 @@ public class ClientState<T> {
             return capacity > other.capacity;
     }
 
-    Set<T> previousStandbyTasks() {
-        final Set<T> standby = new HashSet<>(prevAssignedTasks);
+    Set<TaskId> previousStandbyTasks() {
+        final Set<TaskId> standby = new HashSet<>(prevAssignedTasks);
         standby.removeAll(prevActiveTasks);
         return standby;
     }
 
-    Set<T> previousActiveTasks() {
+    Set<TaskId> previousActiveTasks() {
         return prevActiveTasks;
     }
 
-    boolean hasAssignedTask(final T taskId) {
+    boolean hasAssignedTask(final TaskId taskId) {
         return assignedTasks.contains(taskId);
     }
 
     // Visible for testing
-    Set<T> assignedTasks() {
+    Set<TaskId> assignedTasks() {
         return assignedTasks;
     }
 
-    Set<T> previousAssignedTasks() {
+    Set<TaskId> previousAssignedTasks() {
         return prevAssignedTasks;
     }
 
@@ -155,4 +157,4 @@ public class ClientState<T> {
     boolean hasUnfulfilledQuota(final int tasksPerThread) {
         return activeTasks.size() < capacity * tasksPerThread;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa2ef48/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 81c9305..91738e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -31,13 +31,13 @@ import java.util.Set;
 public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
 
     private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);
-    private final Map<ID, ClientState<TaskId>> clients;
+    private final Map<ID, ClientState> clients;
     private final Set<TaskId> taskIds;
     private final Map<TaskId, ID> previousActiveTaskAssignment = new HashMap<>();
     private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new HashMap<>();
     private final TaskPairs taskPairs;
 
-    public StickyTaskAssignor(final Map<ID, ClientState<TaskId>> clients, final
Set<TaskId> taskIds) {
+    public StickyTaskAssignor(final Map<ID, ClientState> clients, final Set<TaskId>
taskIds) {
         this.clients = clients;
         this.taskIds = taskIds;
         taskPairs = new TaskPairs(taskIds.size() * (taskIds.size() - 1) / 2);
@@ -78,7 +78,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
         for (final Map.Entry<TaskId, ID> entry : previousActiveTaskAssignment.entrySet())
{
             final TaskId taskId = entry.getKey();
             if (taskIds.contains(taskId)) {
-                final ClientState<TaskId> client = clients.get(entry.getValue());
+                final ClientState client = clients.get(entry.getValue());
                 if (client.hasUnfulfilledQuota(tasksPerThread)) {
                     assignTaskToClient(assigned, taskId, client);
                 }
@@ -95,7 +95,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
             final Set<ID> clientIds = previousStandbyTaskAssignment.get(taskId);
             if (clientIds != null) {
                 for (final ID clientId : clientIds) {
-                    final ClientState<TaskId> client = clients.get(clientId);
+                    final ClientState client = clients.get(clientId);
                     if (client.hasUnfulfilledQuota(tasksPerThread)) {
                         assignTaskToClient(assigned, taskId, client);
                         iterator.remove();
@@ -115,12 +115,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
 
 
     private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID>
clientsWithin, final boolean active) {
-        final ClientState<TaskId> client = findClient(taskId, clientsWithin);
+        final ClientState client = findClient(taskId, clientsWithin);
         taskPairs.addPairs(taskId, client.assignedTasks());
         client.assign(taskId, active);
     }
 
-    private void assignTaskToClient(final Set<TaskId> assigned, final TaskId taskId,
final ClientState<TaskId> client) {
+    private void assignTaskToClient(final Set<TaskId> assigned, final TaskId taskId,
final ClientState client) {
         taskPairs.addPairs(taskId, client.assignedTasks());
         client.assign(taskId, true);
         assigned.add(taskId);
@@ -128,7 +128,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
 
     private Set<ID> findClientsWithoutAssignedTask(final TaskId taskId) {
         final Set<ID> clientIds = new HashSet<>();
-        for (final Map.Entry<ID, ClientState<TaskId>> client : clients.entrySet())
{
+        for (final Map.Entry<ID, ClientState> client : clients.entrySet()) {
             if (!client.getValue().hasAssignedTask(taskId)) {
                 clientIds.add(client.getKey());
             }
@@ -137,20 +137,20 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
     }
 
 
-    private ClientState<TaskId> findClient(final TaskId taskId,
-                                           final Set<ID> clientsWithin) {
+    private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin)
{
+
         // optimize the case where there is only 1 id to search within.
         if (clientsWithin.size() == 1) {
             return clients.get(clientsWithin.iterator().next());
         }
 
-        final ClientState<TaskId> previous = findClientsWithPreviousAssignedTask(taskId,
clientsWithin);
+        final ClientState previous = findClientsWithPreviousAssignedTask(taskId, clientsWithin);
         if (previous == null) {
             return leastLoaded(taskId, clientsWithin);
         }
 
         if (shouldBalanceLoad(previous)) {
-            final ClientState<TaskId> standby = findLeastLoadedClientWithPreviousStandByTask(taskId,
clientsWithin);
+            final ClientState standby = findLeastLoadedClientWithPreviousStandByTask(taskId,
clientsWithin);
             if (standby == null
                     || shouldBalanceLoad(standby)) {
                 return leastLoaded(taskId, clientsWithin);
@@ -161,12 +161,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
         return previous;
     }
 
-    private boolean shouldBalanceLoad(final ClientState<TaskId> client) {
+    private boolean shouldBalanceLoad(final ClientState client) {
         return client.reachedCapacity() && hasClientsWithMoreAvailableCapacity(client);
     }
 
-    private boolean hasClientsWithMoreAvailableCapacity(final ClientState<TaskId> client)
{
-        for (ClientState<TaskId> clientState : clients.values()) {
+    private boolean hasClientsWithMoreAvailableCapacity(final ClientState client) {
+        for (ClientState clientState : clients.values()) {
             if (clientState.hasMoreAvailableCapacityThan(client)) {
                 return true;
             }
@@ -174,7 +174,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
         return false;
     }
 
-    private ClientState<TaskId> findClientsWithPreviousAssignedTask(final TaskId taskId,
+    private ClientState findClientsWithPreviousAssignedTask(final TaskId taskId,
                                                                     final Set<ID> clientsWithin)
{
         final ID previous = previousActiveTaskAssignment.get(taskId);
         if (previous != null && clientsWithin.contains(previous)) {
@@ -183,7 +183,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
         return findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
     }
 
-    private ClientState<TaskId> findLeastLoadedClientWithPreviousStandByTask(final
TaskId taskId, final Set<ID> clientsWithin) {
+    private ClientState findLeastLoadedClientWithPreviousStandByTask(final TaskId taskId,
final Set<ID> clientsWithin) {
         final Set<ID> ids = previousStandbyTaskAssignment.get(taskId);
         if (ids == null) {
             return null;
@@ -193,20 +193,20 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
         return leastLoaded(taskId, constrainTo);
     }
 
-    private ClientState<TaskId> leastLoaded(final TaskId taskId, final Set<ID>
clientIds) {
-        final ClientState<TaskId> leastLoaded = findLeastLoaded(taskId, clientIds,
true);
+    private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds) {
+        final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true);
         if (leastLoaded == null) {
             return findLeastLoaded(taskId, clientIds, false);
         }
         return leastLoaded;
     }
 
-    private ClientState<TaskId> findLeastLoaded(final TaskId taskId,
+    private ClientState findLeastLoaded(final TaskId taskId,
                                                 final Set<ID> clientIds,
                                                 boolean checkTaskPairs) {
-        ClientState<TaskId> leastLoaded = null;
+        ClientState leastLoaded = null;
         for (final ID id : clientIds) {
-            final ClientState<TaskId> client = clients.get(id);
+            final ClientState client = clients.get(id);
             if (client.assignedTaskCount() == 0) {
                 return client;
             }
@@ -224,8 +224,8 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
 
     }
 
-    private void mapPreviousTaskAssignment(final Map<ID, ClientState<TaskId>>
clients) {
-        for (final Map.Entry<ID, ClientState<TaskId>> clientState : clients.entrySet())
{
+    private void mapPreviousTaskAssignment(final Map<ID, ClientState> clients) {
+        for (final Map.Entry<ID, ClientState> clientState : clients.entrySet()) {
             for (final TaskId activeTask : clientState.getValue().previousActiveTasks())
{
                 previousActiveTaskAssignment.put(activeTask, clientState.getKey());
             }
@@ -240,9 +240,9 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID,
TaskId> {
 
     }
 
-    private int sumCapacity(final Collection<ClientState<TaskId>> values) {
+    private int sumCapacity(final Collection<ClientState> values) {
         int capacity = 0;
-        for (ClientState<TaskId> client : values) {
+        for (ClientState client : values) {
             capacity += client.capacity();
         }
         return capacity;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa2ef48/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index af2c9e3..d0743f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -28,7 +29,7 @@ import static org.junit.Assert.assertTrue;
 
 public class ClientStateTest {
 
-    private final ClientState<Integer> client = new ClientState<>(1);
+    private final ClientState client = new ClientState(1);
 
     @Test
     public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() throws Exception
{
@@ -37,86 +38,101 @@ public class ClientStateTest {
 
     @Test
     public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity()
throws Exception {
-        client.assign(1, true);
+        client.assign(new TaskId(0, 1), true);
         assertTrue(client.reachedCapacity());
     }
 
 
     @Test
     public void shouldAddActiveTasksToBothAssignedAndActive() throws Exception {
-        client.assign(1, true);
-        assertThat(client.activeTasks(), equalTo(Collections.singleton(1)));
-        assertThat(client.assignedTasks(), equalTo(Collections.singleton(1)));
+        final TaskId tid = new TaskId(0, 1);
+
+        client.assign(tid, true);
+        assertThat(client.activeTasks(), equalTo(Collections.singleton(tid)));
+        assertThat(client.assignedTasks(), equalTo(Collections.singleton(tid)));
         assertThat(client.assignedTaskCount(), equalTo(1));
         assertThat(client.standbyTasks().size(), equalTo(0));
     }
 
     @Test
     public void shouldAddStandbyTasksToBothStandbyAndActive() throws Exception {
-        client.assign(1, false);
-        assertThat(client.assignedTasks(), equalTo(Collections.singleton(1)));
-        assertThat(client.standbyTasks(), equalTo(Collections.singleton(1)));
+        final TaskId tid = new TaskId(0, 1);
+
+        client.assign(tid, false);
+        assertThat(client.assignedTasks(), equalTo(Collections.singleton(tid)));
+        assertThat(client.standbyTasks(), equalTo(Collections.singleton(tid)));
         assertThat(client.assignedTaskCount(), equalTo(1));
         assertThat(client.activeTasks().size(), equalTo(0));
     }
 
     @Test
     public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() throws
Exception {
-        client.addPreviousActiveTasks(Utils.mkSet(1, 2));
-        assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(1, 2)));
-        assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(1, 2)));
+        final TaskId tid1 = new TaskId(0, 1);
+        final TaskId tid2 = new TaskId(0, 2);
+
+        client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2));
+        assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1, tid2)));
+        assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2)));
     }
 
     @Test
     public void shouldAddPreviousStandbyTasksToPreviousAssigned() throws Exception {
-        client.addPreviousStandbyTasks(Utils.mkSet(1, 2));
+        final TaskId tid1 = new TaskId(0, 1);
+        final TaskId tid2 = new TaskId(0, 2);
+
+        client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2));
         assertThat(client.previousActiveTasks().size(), equalTo(0));
-        assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(1, 2)));
+        assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2)));
     }
 
     @Test
     public void shouldHaveAssignedTaskIfActiveTaskAssigned() throws Exception {
-        client.assign(2, true);
-        assertTrue(client.hasAssignedTask(2));
+        final TaskId tid = new TaskId(0, 2);
+
+        client.assign(tid, true);
+        assertTrue(client.hasAssignedTask(tid));
     }
 
     @Test
     public void shouldHaveAssignedTaskIfStandbyTaskAssigned() throws Exception {
-        client.assign(2, false);
-        assertTrue(client.hasAssignedTask(2));
+        final TaskId tid = new TaskId(0, 2);
+
+        client.assign(tid, false);
+        assertTrue(client.hasAssignedTask(tid));
     }
 
     @Test
     public void shouldNotHaveAssignedTaskIfTaskNotAssigned() throws Exception {
-        client.assign(2, true);
-        assertFalse(client.hasAssignedTask(3));
+
+        client.assign(new TaskId(0, 2), true);
+        assertFalse(client.hasAssignedTask(new TaskId(0, 3)));
     }
 
     @Test
     public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks()
throws Exception {
-        final ClientState<Integer> c2 = new ClientState<>(1);
-        client.assign(1, true);
+        final ClientState c2 = new ClientState(1);
+        client.assign(new TaskId(0, 1), true);
         assertTrue(c2.hasMoreAvailableCapacityThan(client));
         assertFalse(client.hasMoreAvailableCapacityThan(c2));
     }
 
     @Test
     public void shouldHaveMoreAvailableCapacityWhenCapacityHigherAndSameAssignedTaskCount()
throws Exception {
-        final ClientState<Integer> c2 = new ClientState<>(2);
+        final ClientState c2 = new ClientState(2);
         assertTrue(c2.hasMoreAvailableCapacityThan(client));
         assertFalse(client.hasMoreAvailableCapacityThan(c2));
     }
 
     @Test
     public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapacity()
throws Exception {
-        final ClientState<Integer> c2 = new ClientState<>(2);
+        final ClientState c2 = new ClientState(2);
 
         for (int i = 0; i < 7; i++) {
-            c2.assign(i, true);
+            c2.assign(new TaskId(0, i), true);
         }
 
         for (int i = 7; i < 11; i++) {
-            client.assign(i, true);
+            client.assign(new TaskId(0, i), true);
         }
 
         assertTrue(c2.hasMoreAvailableCapacityThan(client));
@@ -124,39 +140,39 @@ public class ClientStateTest {
 
     @Test
     public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasksIsLess()
throws Exception {
-        final ClientState<Integer> c1 = new ClientState<>(3);
-        final ClientState<Integer> c2 = new ClientState<>(3);
+        final ClientState c1 = new ClientState(3);
+        final ClientState c2 = new ClientState(3);
         for (int i = 0; i < 4; i++) {
-            c1.assign(i, true);
-            c2.assign(i, true);
+            c1.assign(new TaskId(0, i), true);
+            c2.assign(new TaskId(0, i), true);
         }
-        c2.assign(5, true);
+        c2.assign(new TaskId(0, 5), true);
         assertTrue(c1.hasMoreAvailableCapacityThan(c2));
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowIllegalStateExceptionIfCapacityOfThisClientStateIsZero() throws
Exception {
-        final ClientState<Integer> c1 = new ClientState<>(0);
-        c1.hasMoreAvailableCapacityThan(new ClientState<Integer>(1));
+        final ClientState c1 = new ClientState(0);
+        c1.hasMoreAvailableCapacityThan(new ClientState(1));
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() throws
Exception {
-        final ClientState<Integer> c1 = new ClientState<>(1);
-        c1.hasMoreAvailableCapacityThan(new ClientState<Integer>(0));
+        final ClientState c1 = new ClientState(1);
+        c1.hasMoreAvailableCapacityThan(new ClientState(0));
     }
 
     @Test
     public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread()
throws Exception {
-        final ClientState<Integer> client = new ClientState<>(1);
-        client.assign(1, true);
+        final ClientState client = new ClientState(1);
+        client.assign(new TaskId(0, 1), true);
         assertTrue(client.hasUnfulfilledQuota(2));
     }
 
     @Test
     public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread()
throws Exception {
-        final ClientState<Integer> client = new ClientState<>(1);
-        client.assign(1, true);
+        final ClientState client = new ClientState(1);
+        client.assign(new TaskId(0, 1), true);
         assertFalse(client.hasUnfulfilledQuota(1));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa2ef48/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index f37bf7d..449dabd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -44,7 +44,7 @@ public class StickyTaskAssignorTest {
     private final TaskId task03 = new TaskId(0, 3);
     private final TaskId task04 = new TaskId(0, 4);
     private final TaskId task05 = new TaskId(0, 5);
-    private final Map<Integer, ClientState<TaskId>> clients = new TreeMap<>();
+    private final Map<Integer, ClientState> clients = new TreeMap<>();
     private final Integer p1 = 1;
     private final Integer p2 = 2;
     private final Integer p3 = 3;
@@ -153,11 +153,11 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldAssignTasksToClientWithPreviousStandbyTasks() throws Exception {
-        final ClientState<TaskId> client1 = createClient(p1, 1);
+        final ClientState client1 = createClient(p1, 1);
         client1.addPreviousStandbyTasks(Utils.mkSet(task02));
-        final ClientState<TaskId> client2 = createClient(p2, 1);
+        final ClientState client2 = createClient(p2, 1);
         client2.addPreviousStandbyTasks(Utils.mkSet(task01));
-        final ClientState<TaskId> client3 = createClient(p3, 1);
+        final ClientState client3 = createClient(p3, 1);
         client3.addPreviousStandbyTasks(Utils.mkSet(task00));
 
         final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
@@ -171,9 +171,9 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() throws Exception
{
-        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00);
+        final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00);
         c1.addPreviousStandbyTasks(Utils.mkSet(task01));
-        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 2, task02);
+        final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02);
         c2.addPreviousStandbyTasks(Utils.mkSet(task01));
 
         final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
@@ -290,7 +290,7 @@ public class StickyTaskAssignorTest {
         final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task01, task02);
         taskAssignor.assign(1);
 
-        for (final ClientState<TaskId> clientState : clients.values()) {
+        for (final ClientState clientState : clients.values()) {
             assertThat(clientState.assignedTaskCount(), equalTo(1));
         }
     }
@@ -365,9 +365,9 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks()
throws Exception {
-        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01,
task02);
+        final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02);
         c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
-        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task03,
task00);
+        final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00);
         c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02));
 
         createClient(p3, 1);
@@ -487,14 +487,14 @@ public class StickyTaskAssignorTest {
         final TaskId task22 = new TaskId(2, 2);
         final TaskId task23 = new TaskId(2, 3);
 
-        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01,
task12, task13);
+        final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12,
task13);
         c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
-        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task00,
task11, task22);
+        final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11,
task22);
         c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12,
task21, task13, task23));
-        final ClientState<TaskId> c3 = createClientWithPreviousActiveTasks(p3, 1, task20,
task21, task23);
+        final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21,
task23);
         c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
 
-        final ClientState<TaskId> newClient = createClient(p4, 1);
+        final ClientState newClient = createClient(p4, 1);
         newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11,
task20, task03, task12, task21, task13, task22, task23));
 
         final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
@@ -517,15 +517,15 @@ public class StickyTaskAssignorTest {
         final TaskId task22 = new TaskId(2, 2);
         final TaskId task23 = new TaskId(2, 3);
 
-        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01,
task12, task13);
+        final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12,
task13);
         c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
-        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task00,
task11, task22);
+        final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11,
task22);
         c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12,
task21, task13, task23));
 
-        final ClientState<TaskId> bounce1 = createClient(p3, 1);
+        final ClientState bounce1 = createClient(p3, 1);
         bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
 
-        final ClientState<TaskId> bounce2 = createClient(p4, 1);
+        final ClientState bounce2 = createClient(p4, 1);
         bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
 
         final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
@@ -547,9 +547,9 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients()
throws Exception {
-        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00,
task01, task02);
-        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task03,
task04, task05);
-        final ClientState<TaskId> newClient = createClient(p3, 1);
+        final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01,
task02);
+        final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task04,
task05);
+        final ClientState newClient = createClient(p3, 1);
 
         final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task01, task02, task03, task04, task05);
         taskAssignor.assign(0);
@@ -567,10 +567,10 @@ public class StickyTaskAssignorTest {
     @Test
     public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients()
throws Exception {
         final TaskId task06 = new TaskId(0, 6);
-        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00,
task01, task02, task06);
-        final ClientState<TaskId> c2 = createClient(p2, 1);
+        final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01,
task02, task06);
+        final ClientState c2 = createClient(p2, 1);
         c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
-        final ClientState<TaskId> newClient = createClient(p3, 1);
+        final ClientState newClient = createClient(p3, 1);
 
         final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
task01, task02, task03, task04, task05, task06);
         taskAssignor.assign(0);
@@ -594,7 +594,7 @@ public class StickyTaskAssignorTest {
 
     private List<TaskId> allActiveTasks() {
         final List<TaskId> allActive = new ArrayList<>();
-        for (final ClientState<TaskId> client : clients.values()) {
+        for (final ClientState client : clients.values()) {
             allActive.addAll(client.activeTasks());
         }
         Collections.sort(allActive);
@@ -603,19 +603,19 @@ public class StickyTaskAssignorTest {
 
     private List<TaskId> allStandbyTasks() {
         final List<TaskId> tasks = new ArrayList<>();
-        for (final ClientState<TaskId> client : clients.values()) {
+        for (final ClientState client : clients.values()) {
             tasks.addAll(client.standbyTasks());
         }
         Collections.sort(tasks);
         return tasks;
     }
 
-    private ClientState<TaskId> createClient(final Integer processId, final int capacity)
{
+    private ClientState createClient(final Integer processId, final int capacity) {
         return createClientWithPreviousActiveTasks(processId, capacity);
     }
 
-    private ClientState<TaskId> createClientWithPreviousActiveTasks(final Integer processId,
final int capacity, final TaskId... taskIds) {
-        final ClientState<TaskId> clientState = new ClientState<>(capacity);
+    private ClientState createClientWithPreviousActiveTasks(final Integer processId, final
int capacity, final TaskId... taskIds) {
+        final ClientState clientState = new ClientState(capacity);
         clientState.addPreviousActiveTasks(Utils.mkSet(taskIds));
         clients.put(processId, clientState);
         return clientState;


Mime
View raw message