kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/3] kafka git commit: KAFKA-2411; remove usage of blocking channel
Date Wed, 02 Sep 2015 18:55:24 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
new file mode 100644
index 0000000..f9f76be
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class UpdateMetadataRequest extends AbstractRequest {
+
+    public static final class PartitionState {
+        public final int controllerEpoch;
+        public final int leader;
+        public final int leaderEpoch;
+        public final List<Integer> isr;
+        public final int zkVersion;
+        public final Set<Integer> replicas;
+
+        public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
+            this.controllerEpoch = controllerEpoch;
+            this.leader = leader;
+            this.leaderEpoch = leaderEpoch;
+            this.isr = isr;
+            this.zkVersion = zkVersion;
+            this.replicas = replicas;
+        }
+
+    }
+
+    public static final class Broker {
+        public final int id;
+        public final Map<SecurityProtocol, EndPoint> endPoints;
+
+        public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
+            this.id = id;
+            this.endPoints = endPoints;
+        }
+    }
+
+    public static final class EndPoint {
+        public final String host;
+        public final int port;
+
+        public EndPoint(String host, int port) {
+            this.host = host;
+            this.port = port;
+        }
+    }
+
+    @Deprecated
+    public static final class BrokerEndPoint {
+        public final int id;
+        public final String host;
+        public final int port;
+
+        public BrokerEndPoint(int id, String host, int port) {
+            this.id = id;
+            this.host = host;
+            this.port = port;
+        }
+    }
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.UPDATE_METADATA_KEY.id);
+
+    private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+    private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
+    private static final String PARTITION_STATES_KEY_NAME = "partition_states";
+    private static final String LIVE_BROKERS_KEY_NAME = "live_brokers";
+
+    // PartitionState key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String LEADER_KEY_NAME = "leader";
+    private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch";
+    private static final String ISR_KEY_NAME = "isr";
+    private static final String ZK_VERSION_KEY_NAME = "zk_version";
+    private static final String REPLICAS_KEY_NAME = "replicas";
+
+    // Broker key names
+    private static final String BROKER_ID_KEY_NAME = "id";
+    private static final String ENDPOINTS_KEY_NAME = "end_points";
+
+    // EndPoint key names
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+    private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = "security_protocol_type";
+
+    private final int controllerId;
+    private final int controllerEpoch;
+    private final Map<TopicPartition, PartitionState> partitionStates;
+    private final Set<Broker> liveBrokers;
+
+    /**
+     * Constructor for version 0.
+     */
+    @Deprecated
+    public UpdateMetadataRequest(int controllerId, int controllerEpoch, Set<BrokerEndPoint> liveBrokers,
+                                 Map<TopicPartition, PartitionState> partitionStates) {
+        this(0, controllerId, controllerEpoch, partitionStates,
+             brokerEndPointsToBrokers(liveBrokers));
+    }
+
+    private static Set<Broker> brokerEndPointsToBrokers(Set<BrokerEndPoint> brokerEndPoints) {
+        Set<Broker> brokers = new HashSet<>(brokerEndPoints.size());
+        for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
+            Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
+                    new EndPoint(brokerEndPoint.host, brokerEndPoint.port));
+            brokers.add(new Broker(brokerEndPoint.id, endPoints));
+        }
+        return brokers;
+    }
+
+    /**
+     * Constructor for version 1.
+     */
+    public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
+            PartitionState> partitionStates, Set<Broker> liveBrokers) {
+        this(1, controllerId, controllerEpoch, partitionStates, liveBrokers);
+    }
+
+    private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
+            PartitionState> partitionStates, Set<Broker> liveBrokers) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)));
+        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+
+        List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size());
+        for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
+            Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME);
+            TopicPartition topicPartition = entry.getKey();
+            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
+            partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
+            PartitionState partitionState = entry.getValue();
+            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
+            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
+            partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
+            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
+            partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
+            partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+            partitionStatesData.add(partitionStateData);
+        }
+        struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
+
+        List<Struct> brokersData = new ArrayList<>(liveBrokers.size());
+        for (Broker broker : liveBrokers) {
+            Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME);
+            brokerData.set(BROKER_ID_KEY_NAME, broker.id);
+
+            if (version == 0) {
+                EndPoint endPoint = broker.endPoints.get(SecurityProtocol.PLAINTEXT);
+                brokerData.set(HOST_KEY_NAME, endPoint.host);
+                brokerData.set(PORT_KEY_NAME, endPoint.port);
+            } else {
+                List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size());
+                for (Map.Entry<SecurityProtocol, EndPoint> entry : broker.endPoints.entrySet()) {
+                    Struct endPointData = brokerData.instance(ENDPOINTS_KEY_NAME);
+                    endPointData.set(PORT_KEY_NAME, entry.getValue().port);
+                    endPointData.set(HOST_KEY_NAME, entry.getValue().host);
+                    endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, entry.getKey().id);
+                    endPointsData.add(endPointData);
+
+                }
+                brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
+            }
+
+            brokersData.add(brokerData);
+        }
+        struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray());
+
+        this.controllerId = controllerId;
+        this.controllerEpoch = controllerEpoch;
+        this.partitionStates = partitionStates;
+        this.liveBrokers = liveBrokers;
+    }
+
+    public UpdateMetadataRequest(Struct struct) {
+        super(struct);
+
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+        for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
+            Struct partitionStateData = (Struct) partitionStateDataObj;
+            String topic = partitionStateData.getString(TOPIC_KEY_NAME);
+            int partition = partitionStateData.getInt(PARTITION_KEY_NAME);
+            int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME);
+            int leader = partitionStateData.getInt(LEADER_KEY_NAME);
+            int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME);
+
+            Object[] isrArray = partitionStateData.getArray(ISR_KEY_NAME);
+            List<Integer> isr = new ArrayList<>(isrArray.length);
+            for (Object r : isrArray)
+                isr.add((Integer) r);
+
+            int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME);
+
+            Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME);
+            Set<Integer> replicas = new HashSet<>(replicasArray.length);
+            for (Object r : replicasArray)
+                replicas.add((Integer) r);
+
+            PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+            partitionStates.put(new TopicPartition(topic, partition), partitionState);
+
+        }
+
+        Set<Broker> liveBrokers = new HashSet<>();
+
+        for (Object brokerDataObj : struct.getArray(LIVE_BROKERS_KEY_NAME)) {
+            Struct brokerData = (Struct) brokerDataObj;
+            int brokerId = brokerData.getInt(BROKER_ID_KEY_NAME);
+
+            // V0
+            if (brokerData.hasField(HOST_KEY_NAME)) {
+                String host = brokerData.getString(HOST_KEY_NAME);
+                int port = brokerData.getInt(PORT_KEY_NAME);
+                Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1);
+                endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port));
+                liveBrokers.add(new Broker(brokerId, endPoints));
+            } else { // V1
+                Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>();
+                for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) {
+                    Struct endPointData = (Struct) endPointDataObj;
+                    int port = endPointData.getInt(PORT_KEY_NAME);
+                    String host = endPointData.getString(HOST_KEY_NAME);
+                    short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME);
+                    endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port));
+                }
+                liveBrokers.add(new Broker(brokerId, endPoints));
+            }
+
+        }
+
+        controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+        controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
+        this.partitionStates = partitionStates;
+        this.liveBrokers = liveBrokers;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+            case 1:
+                return new UpdateMetadataResponse(Errors.forException(e).code());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
+        }
+    }
+
+    public int controllerId() {
+        return controllerId;
+    }
+
+    public int controllerEpoch() {
+        return controllerEpoch;
+    }
+
+    public Map<TopicPartition, PartitionState> partitionStates() {
+        return partitionStates;
+    }
+
+    public Set<Broker> liveBrokers() {
+        return liveBrokers;
+    }
+
+    public static UpdateMetadataRequest parse(ByteBuffer buffer, int versionId) {
+        return new UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, versionId, buffer));
+    }
+
+    public static UpdateMetadataRequest parse(ByteBuffer buffer) {
+        return new UpdateMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
new file mode 100644
index 0000000..5bec437
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class UpdateMetadataResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.UPDATE_METADATA_KEY.id);
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * STALE_CONTROLLER_EPOCH (11)
+     */
+    private final short errorCode;
+
+    public UpdateMetadataResponse(short errorCode) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        this.errorCode = errorCode;
+    }
+
+    public UpdateMetadataResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static UpdateMetadataResponse parse(ByteBuffer buffer) {
+        return new UpdateMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static UpdateMetadataResponse parse(ByteBuffer buffer, int version) {
+        return new UpdateMetadataResponse(ProtoUtils.parseResponse(ApiKeys.UPDATE_METADATA_KEY.id, version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9133d85..e5815f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -201,6 +201,11 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
+        return new RequestHeader(key.id, version, "mock", correlation++);
+    }
+
+    @Override
     public void wakeup() {
     }
 
@@ -209,6 +214,11 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public void close(String nodeId) {
+        ready.remove(Integer.valueOf(nodeId));
+    }
+
+    @Override
     public Node leastLoadedNode(long now) {
         return this.node;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 43238ce..ce6328a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -52,6 +52,8 @@ public class NetworkClientTest {
     private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     private Node node = cluster.nodes().get(0);
     private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
+    private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
+            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
 
     @Before
     public void setup() {
@@ -84,15 +86,24 @@ public class NetworkClientTest {
 
     @Test
     public void testSimpleRequestResponse() {
+        checkSimpleRequestResponse(client);
+    }
+
+    @Test
+    public void testSimpleRequestResponseWithStaticNodes() {
+        checkSimpleRequestResponse(clientWithStaticNodes);
+    }
+
+    private void checkSimpleRequestResponse(NetworkClient networkClient) {
         ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
-        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
+        RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);
         RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
-        awaitReady(client, node);
-        client.send(request);
-        client.poll(1, time.milliseconds());
-        assertEquals(1, client.inFlightRequestCount());
+        awaitReady(networkClient, node);
+        networkClient.send(request);
+        networkClient.poll(1, time.milliseconds());
+        assertEquals(1, networkClient.inFlightRequestCount());
         ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
         Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
         resp.set("responses", new Object[0]);
@@ -102,7 +113,7 @@ public class NetworkClientTest {
         resp.writeTo(buffer);
         buffer.flip();
         selector.completeReceive(new NetworkReceive(node.idString(), buffer));
-        List<ClientResponse> responses = client.poll(1, time.milliseconds());
+        List<ClientResponse> responses = networkClient.poll(1, time.milliseconds());
         assertEquals(1, responses.size());
         assertTrue("The handler should have executed.", handler.executed);
         assertTrue("Should have a response body.", handler.response.hasResponse());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 353d621..b668013 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -44,6 +46,9 @@ public class RequestResponseTest {
                 createConsumerMetadataRequest(),
                 createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()),
                 createConsumerMetadataResponse(),
+                createControlledShutdownRequest(),
+                createControlledShutdownResponse(),
+                createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()),
                 createFetchRequest(),
                 createFetchRequest().getErrorResponse(0, new UnknownServerException()),
                 createFetchResponse(),
@@ -70,18 +75,37 @@ public class RequestResponseTest {
                 createProduceResponse(),
                 createStopReplicaRequest(),
                 createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
-                createStopReplicaResponse());
+                createStopReplicaResponse(),
+                createUpdateMetadataRequest(1),
+                createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()),
+                createUpdateMetadataResponse(),
+                createLeaderAndIsrRequest(),
+                createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
+                createLeaderAndIsrResponse()
+        );
+
+        for (AbstractRequestResponse req : requestResponseList)
+            checkSerialization(req, null);
+
+        checkSerialization(createUpdateMetadataRequest(0), 0);
+        checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
+    }
 
-        for (AbstractRequestResponse req: requestResponseList) {
-            ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
-            req.writeTo(buffer);
-            buffer.rewind();
+    private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
+        req.writeTo(buffer);
+        buffer.rewind();
+        AbstractRequestResponse deserialized;
+        if (version == null) {
             Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class);
-            AbstractRequestResponse deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer);
-            assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized);
-            assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.",
-                    req.hashCode(), deserialized.hashCode());
+            deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer);
+        } else {
+            Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class, Integer.TYPE);
+            deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer, version);
         }
+        assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized);
+        assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.",
+                req.hashCode(), deserialized.hashCode());
     }
 
     @Test
@@ -233,4 +257,81 @@ public class RequestResponseTest {
         responses.put(new TopicPartition("test", 0), Errors.NONE.code());
         return new StopReplicaResponse(Errors.NONE.code(), responses);
     }
+
+    private AbstractRequest createControlledShutdownRequest() {
+        return new ControlledShutdownRequest(10);
+    }
+
+    private AbstractRequestResponse createControlledShutdownResponse() {
+        HashSet<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
+                new TopicPartition("test2", 5),
+                new TopicPartition("test1", 10)
+        ));
+        return new ControlledShutdownResponse(Errors.NONE.code(), topicPartitions);
+    }
+
+    private AbstractRequest createLeaderAndIsrRequest() {
+        Map<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new HashMap<>();
+        List<Integer> isr = Arrays.asList(1, 2);
+        List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
+        partitionStates.put(new TopicPartition("topic5", 105),
+                new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
+        partitionStates.put(new TopicPartition("topic5", 1),
+                new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
+        partitionStates.put(new TopicPartition("topic20", 1),
+                new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
+
+        Set<LeaderAndIsrRequest.EndPoint> leaders = new HashSet<>(Arrays.asList(
+                new LeaderAndIsrRequest.EndPoint(0, "test0", 1223),
+                new LeaderAndIsrRequest.EndPoint(1, "test1", 1223)
+        ));
+
+        return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
+    }
+
+    private AbstractRequestResponse createLeaderAndIsrResponse() {
+        Map<TopicPartition, Short> responses = new HashMap<>();
+        responses.put(new TopicPartition("test", 0), Errors.NONE.code());
+        return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
+    }
+
+    private AbstractRequest createUpdateMetadataRequest(int version) {
+        Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
+        List<Integer> isr = Arrays.asList(1, 2);
+        List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
+        partitionStates.put(new TopicPartition("topic5", 105),
+                new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
+        partitionStates.put(new TopicPartition("topic5", 1),
+                new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
+        partitionStates.put(new TopicPartition("topic20", 1),
+                new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
+
+        if (version == 0) {
+            Set<UpdateMetadataRequest.BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList(
+                    new UpdateMetadataRequest.BrokerEndPoint(0, "host1", 1223),
+                    new UpdateMetadataRequest.BrokerEndPoint(1, "host2", 1234)
+            ));
+
+            return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates);
+        } else {
+            Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints1 = new HashMap<>();
+            endPoints1.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1223));
+
+            Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints2 = new HashMap<>();
+            endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
+            endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234));
+
+            Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1),
+                    new UpdateMetadataRequest.Broker(1, endPoints2)
+            ));
+
+            return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers);
+        }
+    }
+
+    private AbstractRequestResponse createUpdateMetadataResponse() {
+        return new UpdateMetadataResponse(Errors.NONE.code());
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 7257cad..f83fd9b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -57,6 +57,10 @@ public class MockSelector implements Selectable {
     public void close() {
     }
 
+    @Override
+    public void close(String id) {
+    }
+
     public void clear() {
         this.completedSends.clear();
         this.completedReceives.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 8092007..33c107f 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -20,39 +20,45 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.api.ApiUtils._
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
 
 object ControlledShutdownRequest extends Logging {
-  val CurrentVersion = 0.shortValue
+  val CurrentVersion = 1.shortValue
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
+    val clientId = if (versionId > 0) Some(readShortString(buffer)) else None
     val brokerId = buffer.getInt
-    new ControlledShutdownRequest(versionId, correlationId, brokerId)
+    new ControlledShutdownRequest(versionId, correlationId, clientId, brokerId)
   }
+
 }
 
 case class ControlledShutdownRequest(versionId: Short,
                                      correlationId: Int,
+                                     clientId: Option[String],
                                      brokerId: Int)
   extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){
 
-  def this(correlationId: Int, brokerId: Int) =
-    this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId)
+  if (versionId > 0 && clientId.isEmpty)
+    throw new IllegalArgumentException("`clientId` must be defined if `versionId` > 0")
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
+    clientId.foreach(writeShortString(buffer, _))
     buffer.putInt(brokerId)
   }
 
-  def sizeInBytes(): Int = {
-    2 +  /* version id */
+  def sizeInBytes: Int = {
+    2 + /* version id */
       4 + /* correlation id */
+      clientId.fold(0)(shortStringLength)
       4 /* broker id */
   }
 
@@ -70,6 +76,7 @@ case class ControlledShutdownRequest(versionId: Short,
     controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName)
     controlledShutdownRequest.append("; Version: " + versionId)
     controlledShutdownRequest.append("; CorrelationId: " + correlationId)
+    controlledShutdownRequest.append(";ClientId:" + clientId)
     controlledShutdownRequest.append("; BrokerId: " + brokerId)
     controlledShutdownRequest.toString()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 4396b6e..da1cff0 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -16,20 +16,27 @@
 */
 package kafka.controller
 
-import kafka.network.BlockingChannel
-import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
-import org.apache.kafka.common.network.NetworkReceive
+import kafka.api.{LeaderAndIsr, KAFKA_083, PartitionStateInfo}
+import kafka.utils._
+import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.{Selectable, ChannelBuilders, Selector, NetworkReceive}
+import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.security.ssl.SSLFactory
+import org.apache.kafka.common.utils.Time
 import collection.mutable.HashMap
 import kafka.cluster.Broker
+import java.net.{SocketTimeoutException}
 import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
 import kafka.server.KafkaConfig
 import collection.mutable
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.api.RequestOrResponse
+import kafka.common.{KafkaException, TopicAndPartition}
 import collection.Set
+import collection.JavaConverters._
 
-class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
+class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics) extends Logging {
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
@@ -44,16 +51,16 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
   def shutdown() = {
     brokerLock synchronized {
-      brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
+      brokerStateInfo.values.foreach(removeExistingBroker)
     }
   }
 
-  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
+  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
     brokerLock synchronized {
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
         case Some(stateInfo) =>
-          stateInfo.messageQueue.put((request, callback))
+          stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
         case None =>
           warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
       }
@@ -72,30 +79,48 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
   def removeBroker(brokerId: Int) {
     brokerLock synchronized {
-      removeExistingBroker(brokerId)
+      removeExistingBroker(brokerStateInfo(brokerId))
     }
   }
 
   private def addNewBroker(broker: Broker) {
-    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]()
-    debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
+    val messageQueue = new LinkedBlockingQueue[QueueItem]
+    debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
     val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
-    val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port,
-      BlockingChannel.UseDefaultBufferSize,
-      BlockingChannel.UseDefaultBufferSize,
-      config.controllerSocketTimeoutMs)
-    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, channel)
+    val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
+    val networkClient = controllerContext.networkClientMap.getOrElseUpdate(broker.id, {
+      val selector = new Selector(
+        NetworkReceive.UNLIMITED,
+        config.connectionsMaxIdleMs,
+        metrics,
+        time,
+        "controller-channel",
+        Map("broker-id" -> broker.id.toString).asJava,
+        false,
+        ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs)
+      )
+      new NetworkClient(
+        selector,
+        new ManualMetadataUpdater(Seq(brokerNode).asJava),
+        config.brokerId.toString,
+        1,
+        0,
+        Selectable.USE_DEFAULT_BUFFER_SIZE,
+        Selectable.USE_DEFAULT_BUFFER_SIZE
+      )
+    })
+    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, networkClient, brokerNode, config, time)
     requestThread.setDaemon(false)
-    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
+    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, broker, messageQueue, requestThread))
   }
 
-  private def removeExistingBroker(brokerId: Int) {
+  private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
     try {
-      brokerStateInfo(brokerId).channel.disconnect()
-      brokerStateInfo(brokerId).messageQueue.clear()
-      brokerStateInfo(brokerId).requestSendThread.shutdown()
-      brokerStateInfo.remove(brokerId)
-    }catch {
+      brokerState.networkClient.close(brokerState.brokerNode.idString)
+      brokerState.messageQueue.clear()
+      brokerState.requestSendThread.shutdown()
+      brokerStateInfo.remove(brokerState.broker.id)
+    } catch {
       case e: Throwable => error("Error while removing broker by the controller", e)
     }
   }
@@ -107,21 +132,29 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
   }
 }
 
+case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit)
+
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
                         val toBroker: Broker,
-                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
-                        val channel: BlockingChannel)
+                        val queue: BlockingQueue[QueueItem],
+                        val networkClient: NetworkClient,
+                        val brokerNode: Node,
+                        val config: KafkaConfig,
+                        val time: Time)
   extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) {
+
   private val lock = new Object()
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  connectToBroker(toBroker, channel)
+  private val socketTimeoutMs = config.controllerSocketTimeoutMs
 
   override def doWork(): Unit = {
-    val queueItem = queue.take()
-    val request = queueItem._1
-    val callback = queueItem._2
-    var receive: NetworkReceive = null
+
+    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))
+
+    val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
+    import NetworkClientBlockingOps._
+    var clientResponse: ClientResponse = null
     try {
       lock synchronized {
         var isSendSuccessful = false
@@ -129,30 +162,35 @@ class RequestSendThread(val controllerId: Int,
           // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
           // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
           try {
-            channel.send(request)
-            receive = channel.receive()
-            isSendSuccessful = true
+            if (!brokerReady()) {
+              isSendSuccessful = false
+              backoff()
+            }
+            else {
+              val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
+              val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
+              val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
+              clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)(time).getOrElse {
+                throw new SocketTimeoutException(s"No response received within $socketTimeoutMs ms")
+              }
+              isSendSuccessful = true
+            }
           } catch {
             case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
               warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                   request.toString, toBroker.toString()), e)
-              channel.disconnect()
-              connectToBroker(toBroker, channel)
+              networkClient.close(brokerNode.idString)
               isSendSuccessful = false
-              // backoff before retrying the connection and send
-              CoreUtils.swallowTrace(Thread.sleep(300))
+              backoff()
           }
         }
-        if (receive != null) {
-          var response: RequestOrResponse = null
-          request.requestId.get match {
-            case RequestKeys.LeaderAndIsrKey =>
-              response = LeaderAndIsrResponse.readFrom(receive.payload())
-            case RequestKeys.StopReplicaKey =>
-              response = StopReplicaResponse.readFrom(receive.payload())
-            case RequestKeys.UpdateMetadataKey =>
-              response = UpdateMetadataResponse.readFrom(receive.payload())
+        if (clientResponse != null) {
+          val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
+            case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
+            case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
+            case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
+            case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
           }
           stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
             .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
@@ -165,70 +203,79 @@ class RequestSendThread(val controllerId: Int,
     } catch {
       case e: Throwable =>
         error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
-        // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
-        channel.disconnect()
+        // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
+        networkClient.close(brokerNode.idString)
     }
   }
 
-  private def connectToBroker(broker: Broker, channel: BlockingChannel) {
+  private def brokerReady(): Boolean = {
+    import NetworkClientBlockingOps._
     try {
-      channel.connect()
-      info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString()))
-    } catch {
-      case e: Throwable => {
-        channel.disconnect()
-        error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e)
+
+      if (networkClient.isReady(brokerNode, time.milliseconds()))
+        true
+      else {
+        val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)
+
+        if (!ready)
+          throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
+
+        info("Controller %d connected to %s for sending state change requests".format(controllerId, toBroker.toString()))
+        true
       }
+    } catch {
+      case e: Throwable =>
+        error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, toBroker.toString()), e)
+        networkClient.close(brokerNode.idString)
+        false
     }
   }
+
 }
 
 class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
   val controllerContext = controller.controllerContext
   val controllerId: Int = controller.config.brokerId
-  val clientId: String = controller.clientId
-  val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
-  val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]]
-  val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
+  val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
+  val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
+  val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
   def newBatch() {
     // raise error if the previous batch is not empty
-    if(leaderAndIsrRequestMap.size > 0)
+    if (leaderAndIsrRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
         "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
-    if(stopReplicaRequestMap.size > 0)
+    if (stopReplicaRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
         "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
-    if(updateMetadataRequestMap.size > 0)
+    if (updateMetadataRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
         "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) {
-    val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition)
-
-    brokerIds.filter(b => b >= 0).foreach {
-      brokerId =>
-        leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
-        leaderAndIsrRequestMap(brokerId).put((topic, partition),
-          PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
+                                       replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
+    val topicPartition = new TopicPartition(topic, partition)
+
+    brokerIds.filter(_ >= 0).foreach { brokerId =>
+      val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
+      result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
     }
 
     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
-                                       Set(topicAndPartition))
+                                       Set(TopicAndPartition(topic, partition)))
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
-                                      callback: (RequestOrResponse, Int) => Unit = null) {
+                                      callback: (AbstractRequestResponse, Int) => Unit = null) {
     brokerIds.filter(b => b >= 0).foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
       val v = stopReplicaRequestMap(brokerId)
       if(callback != null)
         stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
-          deletePartition, (r: RequestOrResponse) => { callback(r, brokerId) })
+          deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
       else
         stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
           deletePartition)
@@ -238,7 +285,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
   /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
                                          partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
-                                         callback: (RequestOrResponse) => Unit = null) {
+                                         callback: AbstractRequestResponse => Unit = null) {
     def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
@@ -251,8 +298,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
             PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
           }
           brokerIds.filter(b => b >= 0).foreach { brokerId =>
-            updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
-            updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
+            updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
+            updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
           }
         case None =>
           info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
@@ -269,9 +316,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       else
         givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
     }
-    if(filteredPartitions.isEmpty)
+    if (filteredPartitions.isEmpty)
       brokerIds.filter(b => b >= 0).foreach { brokerId =>
-        updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+        updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
       }
     else
       filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
@@ -279,38 +326,71 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
     controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
   }
 
-  def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
+  def sendRequestsToBrokers(controllerEpoch: Int) {
     try {
-      leaderAndIsrRequestMap.foreach { m =>
-        val broker = m._1
-        val partitionStateInfos = m._2.toMap
-        val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-        val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
-        val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
-        for (p <- partitionStateInfos) {
-          val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
-          stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
+      leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
+        partitionStateInfos.foreach { case (topicPartition, state) =>
+          val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
+          stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
                                    "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
-                                                                   p._2.leaderIsrAndControllerEpoch, correlationId, broker,
-                                                                   p._1._1, p._1._2))
+                                                                   state.leaderIsrAndControllerEpoch, broker,
+                                                                   topicPartition.topic, topicPartition.partition))
+        }
+        val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
+        val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { b =>
+          val brokerEndPoint = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)
+          new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+        }
+        val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
+          val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
+          val partitionState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+            leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
+            partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
+          )
+          topicPartition -> partitionState
         }
-        controller.sendRequest(broker, leaderAndIsrRequest, null)
+        val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
+        controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
       }
       leaderAndIsrRequestMap.clear()
-      updateMetadataRequestMap.foreach { m =>
-        val broker = m._1
-        val partitionStateInfos = m._2.toMap
-
-        val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0
-        val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch,
-          correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
-        partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
-          "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
-          correlationId, broker, p._1)))
-        controller.sendRequest(broker, updateMetadataRequest, null)
+      updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
+
+        partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
+          "to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
+          broker, p._1)))
+        val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
+          val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
+          val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+            leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
+            partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
+          )
+          topicPartition -> partitionState
+        }
+
+        val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) (1: Short) else (0: Short)
+
+        val updateMetadataRequest =
+          if (version == 0) {
+            val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
+              val brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
+              new UpdateMetadataRequest.BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+            }
+            new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
+          }
+          else {
+            val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
+              val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
+                securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
+              }
+              new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava)
+            }
+            new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+          }
+
+        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
       }
       updateMetadataRequestMap.clear()
-      stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
+      stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
         debug("The stop replica request (delete = true) sent to broker %d is %s"
@@ -318,23 +398,23 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
         debug("The stop replica request (delete = false) sent to broker %d is %s"
           .format(broker, stopReplicaWithoutDelete.mkString(",")))
         replicaInfoList.foreach { r =>
-          val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
-            Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
-          controller.sendRequest(broker, stopReplicaRequest, r.callback)
+          val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
+            Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
+          controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
         }
       }
       stopReplicaRequestMap.clear()
     } catch {
       case e : Throwable => {
-        if(leaderAndIsrRequestMap.size > 0) {
+        if (leaderAndIsrRequestMap.size > 0) {
           error("Haven't been able to send leader and isr requests, current state of " +
               s"the map is $leaderAndIsrRequestMap")
         }
-        if(updateMetadataRequestMap.size > 0) {
+        if (updateMetadataRequestMap.size > 0) {
           error("Haven't been able to send metadata update requests, current state of " +
               s"the map is $updateMetadataRequestMap")
         }
-        if(stopReplicaRequestMap.size > 0) {
+        if (stopReplicaRequestMap.size > 0) {
           error("Haven't been able to send stop replica requests, current state of " +
               s"the map is $stopReplicaRequestMap")
         }
@@ -344,34 +424,35 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
   }
 }
 
-case class ControllerBrokerStateInfo(channel: BlockingChannel,
+case class ControllerBrokerStateInfo(networkClient: NetworkClient,
+                                     brokerNode: Node,
                                      broker: Broker,
-                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                                     messageQueue: BlockingQueue[QueueItem],
                                      requestSendThread: RequestSendThread)
 
-case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null)
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null)
 
-class Callbacks private (var leaderAndIsrResponseCallback:(RequestOrResponse) => Unit = null,
-                         var updateMetadataResponseCallback:(RequestOrResponse) => Unit = null,
-                         var stopReplicaResponseCallback:(RequestOrResponse, Int) => Unit = null)
+class Callbacks private (var leaderAndIsrResponseCallback: AbstractRequestResponse => Unit = null,
+                         var updateMetadataResponseCallback: AbstractRequestResponse => Unit = null,
+                         var stopReplicaResponseCallback: (AbstractRequestResponse, Int) => Unit = null)
 
 object Callbacks {
   class CallbackBuilder {
-    var leaderAndIsrResponseCbk:(RequestOrResponse) => Unit = null
-    var updateMetadataResponseCbk:(RequestOrResponse) => Unit = null
-    var stopReplicaResponseCbk:(RequestOrResponse, Int) => Unit = null
+    var leaderAndIsrResponseCbk: AbstractRequestResponse => Unit = null
+    var updateMetadataResponseCbk: AbstractRequestResponse => Unit = null
+    var stopReplicaResponseCbk: (AbstractRequestResponse, Int) => Unit = null
 
-    def leaderAndIsrCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
+    def leaderAndIsrCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = {
       leaderAndIsrResponseCbk = cbk
       this
     }
 
-    def updateMetadataCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
+    def updateMetadataCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = {
       updateMetadataResponseCbk = cbk
       this
     }
 
-    def stopReplicaCallback(cbk: (RequestOrResponse, Int) => Unit): CallbackBuilder = {
+    def stopReplicaCallback(cbk: (AbstractRequestResponse, Int) => Unit): CallbackBuilder = {
       stopReplicaResponseCbk = cbk
       this
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4c37616..2d0845d 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -18,6 +18,10 @@ package kafka.controller
 
 import java.util
 
+import org.apache.kafka.clients.NetworkClient
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
+
 import scala.collection._
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
@@ -32,9 +36,10 @@ import kafka.utils.ZkUtils._
 import kafka.utils._
 import kafka.utils.CoreUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
 import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
-import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
 import kafka.server._
 import kafka.common.TopicAndPartition
@@ -47,12 +52,24 @@ class ControllerContext(val zkClient: ZkClient,
   val brokerShutdownLock: Object = new Object
   var epoch: Int = KafkaController.InitialControllerEpoch - 1
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
-  val correlationId: AtomicInteger = new AtomicInteger(0)
   var allTopics: Set[String] = Set.empty
   var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
   var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
-  var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
-  var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
+  val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+  val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
+
+  /**
+   * This map is used to ensure the following invariant: at most one `NetworkClient`/`Selector` instance should be
+   * created per broker during the lifetime of the `metrics` parameter received by `KafkaController` (which has the same
+   * lifetime as `KafkaController` since they are both shut down during `KafkaServer.shutdown()`).
+   *
+   * If we break this invariant, an exception is thrown during the instantiation of `Selector` due to the usage of
+   * two equal `MetricName` instances for two `Selector` instantiations. This way also helps to maintain the metrics sane.
+   *
+   * In the future, we should consider redesigning `ControllerChannelManager` so that we can use a single
+   * `NetworkClient`/`Selector` for multiple broker connections as that is the intended usage and it may help simplify this code.
+   */
+  private[controller] val networkClientMap = mutable.Map[Int, NetworkClient]()
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -117,6 +134,12 @@ class ControllerContext(val zkClient: ZkClient,
     partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
     allTopics -= topic
   }
+
+  private[controller] def closeNetworkClients(): Unit = {
+    networkClientMap.values.foreach(_.close())
+    networkClientMap.clear()
+  }
+
 }
 
 
@@ -149,7 +172,7 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState, time: Time, metrics: Metrics) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -267,7 +290,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
                     brokerRequestBatch.newBatch()
                     brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
                       topicAndPartition.partition, deletePartition = false)
-                    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+                    brokerRequestBatch.sendRequestsToBrokers(epoch)
                   } catch {
                     case e : IllegalStateException => {
                       // Resign if the controller is in an illegal state
@@ -688,10 +711,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
       isRunning = false
     }
     onControllerResignation()
+    controllerContext.closeNetworkClients()
   }
 
-  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
-    controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
+  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = {
+    controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, apiVersion, request, callback)
   }
 
   def incrementControllerEpoch(zkClient: ZkClient) = {
@@ -811,7 +835,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
   }
 
   private def startChannelManager() {
-    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config)
+    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics)
     controllerContext.controllerChannelManager.startup()
   }
 
@@ -901,7 +925,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
         try {
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
             topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
-          brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
+          brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
           case e : IllegalStateException => {
             // Resign if the controller is in an illegal state
@@ -1020,7 +1044,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
     try {
       brokerRequestBatch.newBatch()
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
-      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+      brokerRequestBatch.sendRequestsToBrokers(epoch)
     } catch {
       case e : IllegalStateException => {
         // Resign if the controller is in an illegal state

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5b616f3..675a807 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -119,7 +119,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
                             (new CallbackBuilder).build)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
     } catch {
       case e: Throwable => error("Error while moving some partitions to the online state", e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
@@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
     }catch {
       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 3a44fdc..acad83a 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -112,7 +112,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       try {
         brokerRequestBatch.newBatch()
         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
-        brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
+        brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
       }catch {
         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 64b11df..9e39dd5 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -18,13 +18,15 @@ package kafka.controller
 
 
 import kafka.server.ConfigType
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestResponse}
 
 import collection.mutable
+import collection.JavaConverters._
 import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
 import kafka.utils.CoreUtils._
 import collection.Set
-import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.api.{StopReplicaResponse, RequestOrResponse}
+import kafka.common.TopicAndPartition
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -363,20 +365,20 @@ class TopicDeletionManager(controller: KafkaController,
     startReplicaDeletion(replicasPerPartition)
   }
 
-  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: RequestOrResponse, replicaId: Int) {
+  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
     val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
     debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
-    val partitionsInError = if(stopReplicaResponse.errorCode != ErrorMapping.NoError) {
-      stopReplicaResponse.responseMap.keySet
-    } else
-      stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet
+    val responseMap = stopReplicaResponse.responses.asScala
+    val partitionsInError =
+      if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
+      else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
     val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
     inLock(controllerContext.controllerLock) {
       // move all the failed replicas to ReplicaDeletionIneligible
       failReplicaDeletion(replicasInError)
-      if(replicasInError.size != stopReplicaResponse.responseMap.size) {
+      if (replicasInError.size != responseMap.size) {
         // some replicas could have been successfully deleted
-        val deletedReplicas = stopReplicaResponse.responseMap.keySet -- partitionsInError
+        val deletedReplicas = responseMap.keySet -- partitionsInError
         completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 1197259..5408e0d 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -117,11 +117,17 @@ class BlockingChannel( val host: String,
     if(!connected)
       throw new ClosedChannelException()
 
-    val response = new NetworkReceive()
-    response.readCompletely(readChannel)
+    val response = readCompletely(readChannel)
     response.payload().rewind()
 
     response
   }
 
+  private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
+    val response = new NetworkReceive
+    while (!response.complete())
+      response.readFromReadableChannel(channel)
+    response
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index af02c4e..d46603b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -32,8 +32,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{InvalidReceiveException, ChannelBuilder,
-                                        PlaintextChannelBuilder, SSLChannelBuilder}
+import org.apache.kafka.common.network.{ChannelBuilders, InvalidReceiveException, ChannelBuilder, PlaintextChannelBuilder, SSLChannelBuilder}
 import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -368,7 +367,7 @@ private[kafka] class Processor(val id: Int,
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
-  private val channelBuilder = createChannelBuilder
+  private val channelBuilder = ChannelBuilders.create(protocol, SSLFactory.Mode.SERVER, channelConfigs)
   private val metricTags = new util.HashMap[String, String]()
   metricTags.put("networkProcessor", id.toString)
 
@@ -514,14 +513,6 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
-  private def createChannelBuilder(): ChannelBuilder = {
-    val channelBuilder: ChannelBuilder = if (protocol == SecurityProtocol.SSL)  new SSLChannelBuilder(SSLFactory.Mode.SERVER)
-                                        else new PlaintextChannelBuilder()
-
-    channelBuilder.configure(channelConfigs)
-    channelBuilder
-  }
-
   /**
    * Close all open connections
    */


Mime
View raw message