kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-2507 KAFKA-2959; Remove legacy ControlledShutdown request/response objects
Date Thu, 03 Aug 2017 23:20:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 613971f8d -> 5a516fb28


KAFKA-2507 KAFKA-2959; Remove legacy ControlledShutdown request/response objects

This patch replaces the legacy ControlledShutdown objects in `kafka.api` with the alternatives in `org.apache.kafka.common.requests`. Since this was the last API that needed updating, we have also dropped the reference in `RequestChannel.Request` to the legacy object type.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3612 from hachikuji/remove-old-controlled-shutdown-objects


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a516fb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a516fb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a516fb2

Branch: refs/heads/trunk
Commit: 5a516fb28e6a2af559fcbac5df12f048a5916fc6
Parents: 613971f
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Aug 3 16:18:26 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Aug 3 16:18:26 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  | 46 ++++++----
 .../requests/ControlledShutdownRequest.java     |  6 +-
 .../kafka/common/requests/FetchResponse.java    |  6 +-
 .../kafka/common/requests/RequestHeader.java    | 76 ++++++++++++-----
 .../kafka/clients/NodeApiVersionsTest.java      | 32 +++----
 .../common/requests/RequestHeaderTest.java      | 76 +++++++++++++++++
 .../common/requests/RequestResponseTest.java    | 35 +-------
 .../java/org/apache/kafka/test/TestUtils.java   |  8 ++
 .../kafka/api/ControlledShutdownRequest.scala   | 84 ------------------
 .../kafka/api/ControlledShutdownResponse.scala  | 72 ----------------
 .../scala/kafka/common/TopicAndPartition.scala  |  2 +
 .../scala/kafka/network/RequestChannel.scala    | 65 +++++---------
 .../src/main/scala/kafka/server/KafkaApis.scala | 74 ++++++----------
 .../kafka/server/KafkaRequestHandler.scala      |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 90 ++------------------
 .../unit/kafka/network/SocketServerTest.scala   |  4 +-
 16 files changed, 247 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index ee40133..9f6ae3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -42,15 +42,19 @@ import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
 public class Protocol {
 
-    public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
-                                                           new Field("api_version", INT16, "The version of the API."),
-                                                           new Field("correlation_id",
-                                                                     INT32,
-                                                                     "A user-supplied integer value that will be passed back with the response"),
-                                                           new Field("client_id",
-                                                                     NULLABLE_STRING,
-                                                                     "A user specified identifier for the client making the request.",
-                                                                     ""));
+    public static final Schema REQUEST_HEADER = new Schema(
+            new Field("api_key", INT16, "The id of the request type."),
+            new Field("api_version", INT16, "The version of the API."),
+            new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"),
+            new Field("client_id", NULLABLE_STRING, "A user specified identifier for the client making the request.", ""));
+
+    // Version 0 of the controlled shutdown API used a non-standard request header (the clientId is missing).
+    // This can be removed once we drop support for that version.
+    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0_HEADER = new Schema(
+            new Field("api_key", INT16, "The id of the request type."),
+            new Field("api_version", INT16, "The version of the API."),
+            new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"));
+
 
     public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
                                                                       INT32,
@@ -942,23 +946,25 @@ public class Protocol {
     public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
 
     /* Controlled shutdown api */
-    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
+    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(new Field("broker_id",
                                                                                      INT32,
                                                                                      "The id of the broker for which controlled shutdown has been requested."));
 
-    public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V1 = new Schema(new Field("topic", STRING),
+    public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(new Field("topic", STRING),
                                                                              new Field("partition",
                                                                                        INT32,
                                                                                        "Topic partition id."));
 
-    public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = new Schema(new Field("error_code", INT16),
+    public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                             new Field("partitions_remaining",
-                                                                                      new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V1),
+                                                                                      new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0),
                                                                                       "The partitions that the broker still leads."));
 
-    /* V0 is not supported as it would require changes to the request header not to include `clientId` */
-    public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
-    public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0;
+    public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0;
+
+    public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1};
+    public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1};
 
     /* Join group api */
     public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING),
@@ -1983,6 +1989,14 @@ public class Protocol {
         return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
     }
     
+    public static Schema requestHeaderSchema(short apiKey, short version) {
+        if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id && version == 0)
+            // This will be removed once we remove support for v0 of ControlledShutdownRequest, which
+            // depends on a non-standard request header (it does not have a clientId)
+            return CONTROLLED_SHUTDOWN_REQUEST_V0_HEADER;
+        return REQUEST_HEADER;
+    }
+
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index ee41665..fca2f63 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -31,7 +31,11 @@ public class ControlledShutdownRequest extends AbstractRequest {
         private final int brokerId;
 
         public Builder(int brokerId) {
-            super(ApiKeys.CONTROLLED_SHUTDOWN_KEY);
+            this(brokerId, null);
+        }
+
+        public Builder(int brokerId, Short desiredVersion) {
+            super(ApiKeys.CONTROLLED_SHUTDOWN_KEY, desiredVersion);
             this.brokerId = brokerId;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f0f516c..b52b6f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -239,7 +239,11 @@ public class FetchResponse extends AbstractResponse {
         return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader);
     }
 
-    public Send toSend(Struct responseStruct, int throttleTimeMs, String dest, RequestHeader requestHeader) {
+    public Send toSend(int throttleTimeMs, String dest, RequestHeader requestHeader) {
+        return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader);
+    }
+
+    private Send toSend(Struct responseStruct, int throttleTimeMs, String dest, RequestHeader requestHeader) {
         Struct responseHeader = new ResponseHeader(requestHeader.correlationId()).toStruct();
 
         // write the total size and the response header

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 8a0a549..18ea576 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -16,23 +16,20 @@
  */
 package org.apache.kafka.common.requests;
 
-import static org.apache.kafka.common.protocol.Protocol.REQUEST_HEADER;
-
-import java.nio.ByteBuffer;
-
 import org.apache.kafka.common.protocol.Protocol;
-import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
 /**
  * The header for a request in the Kafka protocol
  */
 public class RequestHeader extends AbstractRequestResponse {
-
-    private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
-    private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
-    private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
-    private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
+    private static final String API_KEY_FIELD_NAME = "api_key";
+    private static final String API_VERSION_FIELD_NAME = "api_version";
+    private static final String CLIENT_ID_FIELD_NAME = "client_id";
+    private static final String CORRELATION_ID_FIELD_NAME = "correlation_id";
 
     private final short apiKey;
     private final short apiVersion;
@@ -40,25 +37,34 @@ public class RequestHeader extends AbstractRequestResponse {
     private final int correlationId;
 
     public RequestHeader(Struct struct) {
-        apiKey = struct.getShort(API_KEY_FIELD);
-        apiVersion = struct.getShort(API_VERSION_FIELD);
-        clientId = struct.getString(CLIENT_ID_FIELD);
-        correlationId = struct.getInt(CORRELATION_ID_FIELD);
+        apiKey = struct.getShort(API_KEY_FIELD_NAME);
+        apiVersion = struct.getShort(API_VERSION_FIELD_NAME);
+
+        // only v0 of the controlled shutdown request is missing the clientId
+        if (struct.hasField(CLIENT_ID_FIELD_NAME))
+            clientId = struct.getString(CLIENT_ID_FIELD_NAME);
+        else
+            clientId = "";
+        correlationId = struct.getInt(CORRELATION_ID_FIELD_NAME);
     }
 
-    public RequestHeader(short apiKey, short version, String client, int correlation) {
+    public RequestHeader(short apiKey, short version, String clientId, int correlation) {
         this.apiKey = apiKey;
         this.apiVersion = version;
-        this.clientId = client;
+        this.clientId = clientId;
         this.correlationId = correlation;
     }
 
     public Struct toStruct() {
-        Struct struct = new Struct(Protocol.REQUEST_HEADER);
-        struct.set(API_KEY_FIELD, apiKey);
-        struct.set(API_VERSION_FIELD, apiVersion);
-        struct.set(CLIENT_ID_FIELD, clientId);
-        struct.set(CORRELATION_ID_FIELD, correlationId);
+        Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
+        Struct struct = new Struct(schema);
+        struct.set(API_KEY_FIELD_NAME, apiKey);
+        struct.set(API_VERSION_FIELD_NAME, apiVersion);
+
+        // only v0 of the controlled shutdown request is missing the clientId
+        if (struct.hasField(CLIENT_ID_FIELD_NAME))
+            struct.set(CLIENT_ID_FIELD_NAME, clientId);
+        struct.set(CORRELATION_ID_FIELD_NAME, correlationId);
         return struct;
     }
 
@@ -83,11 +89,37 @@ public class RequestHeader extends AbstractRequestResponse {
     }
 
     public static RequestHeader parse(ByteBuffer buffer) {
-        return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
+        short apiKey = buffer.getShort();
+        short apiVersion = buffer.getShort();
+        Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
+        buffer.rewind();
+        return new RequestHeader(schema.read(buffer));
     }
 
     @Override
     public String toString() {
         return toStruct().toString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        RequestHeader that = (RequestHeader) o;
+        return apiKey == that.apiKey &&
+                apiVersion == that.apiVersion &&
+                correlationId == that.correlationId &&
+                (clientId == null ? that.clientId == null : clientId.equals(that.clientId));
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) apiKey;
+        result = 31 * result + (int) apiVersion;
+        result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
+        result = 31 * result + correlationId;
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 1f24e7a..41ac42a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -58,9 +57,7 @@ public class NodeApiVersionsTest {
     public void testVersionsToString() {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
-            if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
-                versionList.add(new ApiVersion(apiKey.id, (short) 0, (short) 0));
-            } else if (apiKey == ApiKeys.DELETE_TOPICS) {
+            if (apiKey == ApiKeys.DELETE_TOPICS) {
                 versionList.add(new ApiVersion(apiKey.id, (short) 10000, (short) 10001));
             } else {
                 versionList.add(new ApiVersion(apiKey));
@@ -71,9 +68,7 @@ public class NodeApiVersionsTest {
         String prefix = "(";
         for (ApiKeys apiKey : ApiKeys.values()) {
             bld.append(prefix);
-            if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
-                bld.append("ControlledShutdown(7): 0 [unusable: node too old]");
-            } else if (apiKey == ApiKeys.DELETE_TOPICS) {
+            if (apiKey == ApiKeys.DELETE_TOPICS) {
                 bld.append("DeleteTopics(20): 10000 to 10001 [unusable: node too new]");
             } else {
                 bld.append(apiKey.name).append("(").
@@ -96,21 +91,6 @@ public class NodeApiVersionsTest {
     }
 
     @Test
-    public void testUsableVersionCalculation() {
-        List<ApiVersion> versionList = new ArrayList<>();
-        versionList.add(new ApiVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, (short) 0, (short) 0));
-        versionList.add(new ApiVersion(ApiKeys.FETCH.id, (short) 1, (short) 2));
-        NodeApiVersions versions =  new NodeApiVersions(versionList);
-        try {
-            versions.usableVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY);
-            Assert.fail("expected UnsupportedVersionException");
-        } catch (UnsupportedVersionException e) {
-            // pass
-        }
-        assertEquals(2, versions.usableVersion(ApiKeys.FETCH));
-    }
-
-    @Test
     public void testUsableVersionNoDesiredVersionReturnsLatestUsable() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
@@ -121,6 +101,7 @@ public class NodeApiVersionsTest {
     public void testDesiredVersion() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE));
         assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1));
         assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2));
         assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3));
@@ -147,6 +128,13 @@ public class NodeApiVersionsTest {
         versions.usableVersion(ApiKeys.FETCH);
     }
 
+    @Test(expected = UnsupportedVersionException.class)
+    public void testUsableVersionOutOfRange() {
+        NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
+                new ApiVersion(ApiKeys.PRODUCE.id, (short) 300, (short) 300)));
+        apiVersions.usableVersion(ApiKeys.PRODUCE);
+    }
+
     @Test
     public void testUsableVersionLatestVersions() {
         List<ApiVersion> versionList = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
new file mode 100644
index 0000000..a1184d8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.test.TestUtils.toBuffer;
+import static org.junit.Assert.assertEquals;
+
+public class RequestHeaderTest {
+
+    @Test
+    public void testSerdeControlledShutdownV0() {
+        // Verify that version 0 of controlled shutdown does not include the clientId field
+
+        int correlationId = 2342;
+        ByteBuffer rawBuffer = ByteBuffer.allocate(32);
+        rawBuffer.putShort(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+        rawBuffer.putShort((short) 0);
+        rawBuffer.putInt(correlationId);
+        rawBuffer.flip();
+
+        RequestHeader deserialized = RequestHeader.parse(rawBuffer);
+        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, deserialized.apiKey());
+        assertEquals(0, deserialized.apiVersion());
+        assertEquals(correlationId, deserialized.correlationId());
+        assertEquals("", deserialized.clientId());
+
+        Struct serialized = deserialized.toStruct();
+        ByteBuffer serializedBuffer = toBuffer(serialized);
+
+        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, serializedBuffer.getShort(0));
+        assertEquals(0, serializedBuffer.getShort(2));
+        assertEquals(correlationId, serializedBuffer.getInt(4));
+        assertEquals(8, serializedBuffer.limit());
+    }
+
+    @Test
+    public void testRequestHeader() {
+        RequestHeader header = new RequestHeader((short) 10, (short) 1, "", 10);
+        ByteBuffer buffer = toBuffer(header.toStruct());
+        RequestHeader deserialized = RequestHeader.parse(buffer);
+        assertEquals(header, deserialized);
+    }
+
+    @Test
+    public void testRequestHeaderWithNullClientId() {
+        RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 10);
+        Struct headerStruct = header.toStruct();
+        ByteBuffer buffer = toBuffer(headerStruct);
+        RequestHeader deserialized = RequestHeader.parse(buffer);
+        assertEquals(header.apiKey(), deserialized.apiKey());
+        assertEquals(header.apiVersion(), deserialized.apiVersion());
+        assertEquals(header.correlationId(), deserialized.correlationId());
+        assertEquals("", deserialized.clientId()); // null defaults to ""
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c2c9a4d..10e73b6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -68,6 +68,7 @@ import java.util.Set;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.test.TestUtils.toBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -242,17 +243,6 @@ public class RequestResponseTest {
     }
 
     @Test
-    public void testRequestHeader() {
-        RequestHeader header = createRequestHeader();
-        ByteBuffer buffer = toBuffer(header.toStruct());
-        RequestHeader deserialized = RequestHeader.parse(buffer);
-        assertEquals(header.apiVersion(), deserialized.apiVersion());
-        assertEquals(header.apiKey(), deserialized.apiKey());
-        assertEquals(header.clientId(), deserialized.clientId());
-        assertEquals(header.correlationId(), deserialized.correlationId());
-    }
-
-    @Test
     public void testResponseHeader() {
         ResponseHeader header = createResponseHeader();
         ByteBuffer buffer = toBuffer(header.toStruct());
@@ -295,13 +285,6 @@ public class RequestResponseTest {
         return (AbstractRequestResponse) deserializer.invoke(null, buffer, version);
     }
 
-    private ByteBuffer toBuffer(Struct struct) {
-        ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
-        struct.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
-
     @Test(expected = UnsupportedVersionException.class)
     public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
         FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
@@ -508,18 +491,6 @@ public class RequestResponseTest {
         assertEquals(response.partitionsRemaining(), deserialized.partitionsRemaining());
     }
 
-    @Test
-    public void testRequestHeaderWithNullClientId() {
-        RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 10);
-        Struct headerStruct = header.toStruct();
-        ByteBuffer buffer = toBuffer(headerStruct);
-        RequestHeader deserialized = RequestHeader.parse(buffer);
-        assertEquals(header.apiKey(), deserialized.apiKey());
-        assertEquals(header.apiVersion(), deserialized.apiVersion());
-        assertEquals(header.correlationId(), deserialized.correlationId());
-        assertEquals("", deserialized.clientId()); // null is defaulted to ""
-    }
-
     @Test(expected = UnsupportedVersionException.class)
     public void testCreateTopicRequestV0FailsIfValidateOnly() {
         createCreateTopicRequest(0, true);
@@ -564,10 +535,6 @@ public class RequestResponseTest {
         assertTrue(string.contains("group1"));
     }
 
-    private RequestHeader createRequestHeader() {
-        return new RequestHeader((short) 10, (short) 1, "", 10);
-    }
-
     private ResponseHeader createResponseHeader() {
         return new ResponseHeader(10);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index c1fc675..a27d5cb 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -323,4 +324,11 @@ public class TestUtils {
         return list;
     }
 
+    public static ByteBuffer toBuffer(Struct struct) {
+        ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
+        struct.writeTo(buffer);
+        buffer.rewind();
+        return buffer;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
deleted file mode 100644
index a0ad6cf..0000000
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.TopicAndPartition
-import kafka.api.ApiUtils._
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import kafka.utils.Logging
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-
-object ControlledShutdownRequest extends Logging {
-  val CurrentVersion = 1.shortValue
-  val DefaultClientId = ""
-
-  def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = if (versionId > 0) Some(readShortString(buffer)) else None
-    val brokerId = buffer.getInt
-    new ControlledShutdownRequest(versionId, correlationId, clientId, brokerId)
-  }
-
-}
-
-case class ControlledShutdownRequest(versionId: Short,
-                                     correlationId: Int,
-                                     clientId: Option[String],
-                                     brokerId: Int)
-  extends RequestOrResponse(Some(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)){
-
-  if (versionId > 0 && clientId.isEmpty)
-    throw new IllegalArgumentException("`clientId` must be defined if `versionId` > 0")
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    clientId.foreach(writeShortString(buffer, _))
-    buffer.putInt(brokerId)
-  }
-
-  def sizeInBytes: Int = {
-    2 + /* version id */
-      4 + /* correlation id */
-      clientId.fold(0)(shortStringLength) +
-      4 /* broker id */
-  }
-
-  override def toString: String = {
-    describe(true)
-  }
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition])
-    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
-  override def describe(details: Boolean = false): String = {
-    val controlledShutdownRequest = new StringBuilder
-    controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName)
-    controlledShutdownRequest.append("; Version: " + versionId)
-    controlledShutdownRequest.append("; CorrelationId: " + correlationId)
-    controlledShutdownRequest.append(";ClientId:" + clientId)
-    controlledShutdownRequest.append("; BrokerId: " + brokerId)
-    controlledShutdownRequest.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
deleted file mode 100644
index 15992d2..0000000
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.common.TopicAndPartition
-import kafka.api.ApiUtils._
-import org.apache.kafka.common.protocol.Errors
-import collection.Set
-
-object ControlledShutdownResponse {
-  def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {
-    val correlationId = buffer.getInt
-    val error = Errors.forCode(buffer.getShort)
-    val numEntries = buffer.getInt
-
-    var partitionsRemaining = Set[TopicAndPartition]()
-    for (_ <- 0 until numEntries){
-      val topic = readShortString(buffer)
-      val partition = buffer.getInt
-      partitionsRemaining += new TopicAndPartition(topic, partition)
-    }
-    new ControlledShutdownResponse(correlationId, error, partitionsRemaining)
-  }
-}
-
-
-case class ControlledShutdownResponse(correlationId: Int,
-                                      error: Errors = Errors.NONE,
-                                      partitionsRemaining: Set[TopicAndPartition])
-  extends RequestOrResponse() {
-  def sizeInBytes: Int = {
-    var size =
-      4 /* correlation id */ +
-        2 /* error code */ +
-        4 /* number of responses */
-    for (topicAndPartition <- partitionsRemaining) {
-      size +=
-        2 + topicAndPartition.topic.length /* topic */ +
-        4 /* partition */
-    }
-    size
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(error.code)
-    buffer.putInt(partitionsRemaining.size)
-    for (topicAndPartition:TopicAndPartition <- partitionsRemaining){
-      writeShortString(buffer, topicAndPartition.topic)
-      buffer.putInt(topicAndPartition.partition)
-    }
-  }
-
-  override def describe(details: Boolean):String = { toString }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 4c94c73..07a2292 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -35,5 +35,7 @@ case class TopicAndPartition(topic: String, partition: Int) {
 
   def asTuple = (topic, partition)
 
+  def asTopicPartition = new TopicPartition(topic, partition)
+
   override def toString = "[%s,%d]".format(topic, partition)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4ebef7c..ec8e1eb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,7 +23,6 @@ import java.util.Collections
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.{ControlledShutdownRequest, RequestOrResponse}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
@@ -69,56 +68,34 @@ object RequestChannel extends Logging {
     @volatile var apiRemoteCompleteTimeNanos = -1L
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
-    val requestId = buffer.getShort()
-
-    // TODO: this will be removed once we remove support for v0 of ControlledShutdownRequest (which
-    // depends on a non-standard request header)
-    val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
-      ControlledShutdownRequest.readFrom(buffer)
-    else
-      null
-
-    // if we failed to find a server-side mapping, then try using the
-    // client-side request / response format
-    val header: RequestHeader =
-      if (requestObj == null) {
-        buffer.rewind
-        try RequestHeader.parse(buffer)
-        catch {
-          case ex: Throwable =>
-            throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
-        }
-      } else
-        null
+    val header: RequestHeader = try {
+      RequestHeader.parse(buffer)
+    } catch {
+      case ex: Throwable =>
+        throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: ${buffer.getShort(0)}", ex)
+    }
+
     val bodyAndSize: RequestAndSize =
-      if (requestObj == null)
-        try {
-          // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
-          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
-            new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
-          }
-          else
-            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
-        } catch {
-          case ex: Throwable =>
-            throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
+      try {
+        // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
+        if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
+          new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
         }
-      else
-        null
+        else
+          AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+      } catch {
+        case ex: Throwable =>
+          throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
+      }
 
     //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
     //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
     //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
-    if (!Protocol.requiresDelayedDeallocation(requestId)) {
+    if (!Protocol.requiresDelayedDeallocation(header.apiKey)) {
       dispose()
     }
 
-    def requestDesc(details: Boolean): String = {
-      if (requestObj != null)
-        requestObj.describe(details)
-      else
-        s"$header -- ${body[AbstractRequest].toString(details)}"
-    }
+    def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}"
 
     def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
       bodyAndSize.request match {
@@ -158,7 +135,7 @@ object RequestChannel extends Logging {
       val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
       val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
-        if (requestId == ApiKeys.FETCH.id) {
+        if (header.apiKey == ApiKeys.FETCH.id) {
           val isFromFollower = body[FetchRequest].isFromFollower
           Seq(
             if (isFromFollower) RequestMetrics.followFetchMetricName
@@ -166,7 +143,7 @@ object RequestChannel extends Logging {
           )
         }
         else Seq.empty
-      val metricNames = fetchMetricNames :+ ApiKeys.forId(requestId).name
+      val metricNames = fetchMetricNames :+ ApiKeys.forId(header.apiKey).name
       metricNames.foreach { metricName =>
         val m = RequestMetrics.metricsMap(metricName)
         m.requestRate.mark()

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1fb8901..4537898 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.admin.{AdminUtils, RackAwareMode}
-import kafka.api.{ApiVersion, ControlledShutdownRequest, ControlledShutdownResponse, KAFKA_0_11_0_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
@@ -33,7 +33,7 @@ import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
-import kafka.network.{RequestChannel, RequestOrResponseSend}
+import kafka.network.RequestChannel
 import kafka.security.SecurityUtils
 import kafka.security.auth._
 import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
@@ -93,7 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     try {
       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
         format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
-      ApiKeys.forId(request.requestId) match {
+      ApiKeys.forId(request.header.apiKey) match {
         case ApiKeys.PRODUCE => handleProduceRequest(request)
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
@@ -228,19 +228,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     // ensureTopicExists is only for client facing requests
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
-    val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
-
+    val controlledShutdownRequest = request.body[ControlledShutdownRequest]
     authorizeClusterAction(request)
 
     def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicAndPartition]]): Unit = {
       controlledShutdownResult match {
         case Success(partitionsRemaining) =>
-          val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
-            Errors.NONE, partitionsRemaining)
-          sendResponseExemptThrottle(RequestChannel.Response(request,
-            new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+          val controlledShutdownResponse = new ControlledShutdownResponse(Errors.NONE,
+            partitionsRemaining.map(_.asTopicPartition).asJava)
+          sendResponseExemptThrottle(RequestChannel.Response(request, controlledShutdownResponse))
         case Failure(throwable) =>
-          sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
+          sendResponseExemptThrottle(RequestChannel.Response(request, controlledShutdownRequest.getErrorResponse(throwable)))
       }
     }
     controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
@@ -552,16 +550,14 @@ class KafkaApis(val requestChannel: RequestChannel,
             convertedData.put(tp, convertedPartitionData(tp, partitionData))
           }
           val response = new FetchResponse(convertedData, 0)
-          val responseStruct = response.toStruct(versionId)
+          val responseSend = response.toSend(bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
 
-          trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
+          trace(s"Sending fetch response to client $clientId of ${responseSend.size} bytes.")
           response.responseData.asScala.foreach { case (topicPartition, data) =>
             // record the bytes out metrics only when the response is being sent
             brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
           }
 
-          val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
-            request.connectionId, request.header)
           RequestChannel.Response(request, responseSend)
         }
 
@@ -1884,39 +1880,23 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def handleError(request: RequestChannel.Request, e: Throwable) {
-    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
-    if (request.requestObj != null) {
-      def sendResponseCallback(requestThrottleMs: Int) {
-        request.requestObj.handleError(e, requestChannel, request)
-        error("Error when handling request %s".format(request.requestObj), e)
-      }
-
-      if (mayThrottle) {
-        val clientId: String = request.requestObj match {
-          case r: ControlledShutdownRequest => r.clientId.getOrElse("")
-          case _ =>
-            throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest")
-        }
-        sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
-      } else
-        sendResponseExemptThrottle(request, () => sendResponseCallback(0))
-    } else {
-      def createResponse(requestThrottleMs: Int): RequestChannel.Response = {
-        val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e)
-        /* If request doesn't have a default error response, we just close the connection.
-           For example, when produce request has acks set to 0 */
-        if (response == null)
-          new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction)
-        else RequestChannel.Response(request, response)
-      }
-      error("Error when handling request %s".format(request.body[AbstractRequest]), e)
-      if (mayThrottle)
-        sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
-          requestChannel.sendResponse(createResponse(requestThrottleMs))
-        })
-      else
-        sendResponseExemptThrottle(createResponse(0))
-    }
+    val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.header.apiKey).clusterAction
+
+    def createResponse(requestThrottleMs: Int): RequestChannel.Response = {
+      val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e)
+      /* If request doesn't have a default error response, we just close the connection.
+         For example, when produce request has acks set to 0 */
+      if (response == null)
+        new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction)
+      else RequestChannel.Response(request, response)
+    }
+    error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+    if (mayThrottle)
+      sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
+        requestChannel.sendResponse(createResponse(requestThrottleMs))
+      })
+    else
+      sendResponseExemptThrottle(createResponse(0))
   }
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 512be67..b3f98d1 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -41,7 +41,7 @@ class KafkaRequestHandler(id: Int,
 
   def run() {
     while(true) {
-      var req : RequestChannel.Request = null
+      var req: RequestChannel.Request = null
       try {
         while (req == null) {
           // We use a single meter for aggregate idle percentage for the thread pool.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index fc9e4b8..a167e36 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -363,7 +363,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
     val socketTimeoutMs = config.controllerSocketTimeoutMs
 
-    def networkClientControlledShutdown(retries: Int): Boolean = {
+    def doControlledShutdown(retries: Int): Boolean = {
       val metadataUpdater = new ManualMetadataUpdater()
       val networkClient = {
         val channelBuilder = ChannelBuilders.clientChannelBuilder(
@@ -438,7 +438,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                 throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
               // send the controlled shutdown request
-              val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId)
+              val controlledShutdownApiVersion: Short = if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0 else 1
+              val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId,
+                controlledShutdownApiVersion)
               val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
                 time.milliseconds(), true)
               val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
@@ -472,82 +474,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       shutdownSucceeded
     }
 
-    def blockingChannelControlledShutdown(retries: Int): Boolean = {
-      var remainingRetries = retries
-      var channel: BlockingChannel = null
-      var prevController: Broker = null
-      var shutdownSucceeded: Boolean = false
-      try {
-        while (!shutdownSucceeded && remainingRetries > 0) {
-          remainingRetries = remainingRetries - 1
-
-          // 1. Find the controller and establish a connection to it.
-
-          // Get the current controller info. This is to ensure we use the most recent info to issue the
-          // controlled shutdown request
-          val controllerId = zkUtils.getController()
-          //If this method returns None ignore and try again
-          zkUtils.getBrokerInfo(controllerId).foreach { broker =>
-            if (channel == null || prevController == null || !prevController.equals(broker)) {
-              // if this is the first attempt or if the controller has changed, create a channel to the most recent
-              // controller
-              if (channel != null)
-                channel.disconnect()
-
-              val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
-              channel = new BlockingChannel(brokerEndPoint.host,
-                brokerEndPoint.port,
-                BlockingChannel.UseDefaultBufferSize,
-                BlockingChannel.UseDefaultBufferSize,
-                config.controllerSocketTimeoutMs)
-              channel.connect()
-              prevController = broker
-            }
-          }
-
-          // 2. issue a controlled shutdown to the controller
-          if (channel != null) {
-            var response: NetworkReceive = null
-            try {
-              // send the controlled shutdown request
-              val request = new kafka.api.ControlledShutdownRequest(0, correlationId.getAndIncrement, None, config.brokerId)
-              channel.send(request)
-
-              response = channel.receive()
-              val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
-              if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining != null &&
-                shutdownResponse.partitionsRemaining.isEmpty) {
-                shutdownSucceeded = true
-                info ("Controlled shutdown succeeded")
-              }
-              else {
-                info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
-                info("Error code from controller: %d".format(shutdownResponse.error.code))
-              }
-            }
-            catch {
-              case ioe: java.io.IOException =>
-                channel.disconnect()
-                channel = null
-                warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: %s".format(ioe.getMessage))
-                // ignore and try again
-            }
-          }
-          if (!shutdownSucceeded) {
-            Thread.sleep(config.controlledShutdownRetryBackoffMs)
-            warn("Retrying controlled shutdown after the previous attempt failed...")
-          }
-        }
-      }
-      finally {
-        if (channel != null) {
-          channel.disconnect()
-          channel = null
-        }
-      }
-      shutdownSucceeded
-    }
-
     if (startupComplete.get() && config.controlledShutdownEnable) {
       // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
       // of time and try again for a configured number of retries. If all the attempt fails, we simply force
@@ -556,16 +482,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
       brokerState.newState(PendingControlledShutdown)
 
-      val shutdownSucceeded =
-        // Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in
-        // `RequestHeader`, which is used by `NetworkClient`
-        if (config.interBrokerProtocolVersion >= KAFKA_0_9_0)
-          networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
-        else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
+      val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
 
       if (!shutdownSucceeded)
         warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a516fb2/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 4d969cf..f9a5ac2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -348,7 +348,7 @@ class SocketServerTest extends JUnitSuite {
       val channel = overrideServer.requestChannel
       val request = channel.receiveRequest(2000)
 
-      val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.requestId).name)
+      val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 
@@ -390,7 +390,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.connectionId).isEmpty,
         s"Idle connection `${request.connectionId}` was not closed by selector")
 
-      val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.requestId).name)
+      val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.header.apiKey).name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 


Mime
View raw message