kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/5] kafka git commit: MINOR: Use an explicit `Errors` object when possible instead of a numeric error code
Date Fri, 10 Feb 2017 05:21:51 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ecaade2..b27802d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -138,7 +138,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -163,7 +163,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -201,7 +201,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 1);
 
-        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -246,7 +246,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -265,8 +265,8 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 1);
 
-        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
-        client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -309,7 +309,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(records, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
         assertEquals(3, consumerRecords.size());
@@ -369,7 +369,7 @@ public class FetcherTest {
         assertFalse(fetcher.hasCompletedFetches());
         MemoryRecords partialRecord = MemoryRecords.readableRecords(
             ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
-        client.prepareResponse(fetchResponse(partialRecord, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(partialRecord, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
     }
@@ -381,7 +381,7 @@ public class FetcherTest {
 
         // resize the limit of the buffer to pretend it is only fetch-size large
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -401,7 +401,7 @@ public class FetcherTest {
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.assignFromSubscribed(singleton(tp));
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // The active fetch should be ignored since its position is no longer valid
@@ -416,7 +416,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         subscriptions.pause(tp);
 
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertNull(fetcher.fetchedRecords().get(tp));
     }
@@ -437,7 +437,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -449,7 +449,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -461,7 +461,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
@@ -476,7 +476,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         subscriptions.seek(tp, 1);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -490,7 +490,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.seek(tp, 0);
 
         assertTrue(fetcherNoAutoReset.sendFetches() > 0);
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
         subscriptionsNoAutoReset.seek(tp, 2);
@@ -503,7 +503,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.seek(tp, 0);
 
         fetcherNoAutoReset.sendFetches();
-        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
 
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -523,7 +523,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true);
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
 
@@ -743,7 +743,7 @@ public class FetcherTest {
             MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
             for (int v = 0; v < 3; v++)
                 builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
-            List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(tp);
+            List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp);
             assertEquals(3, records.size());
         }
 
@@ -772,7 +772,7 @@ public class FetcherTest {
         assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
 
         // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
-        fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0);
+        fetchRecords(MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
         assertEquals(100, recordsFetchLagMax.value(), EPSILON);
 
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
@@ -782,7 +782,7 @@ public class FetcherTest {
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
-        fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0);
+        fetchRecords(builder.build(), Errors.NONE, 200L, 0);
         assertEquals(197, recordsFetchLagMax.value(), EPSILON);
 
         // verify de-registration of partition lag
@@ -790,7 +790,7 @@ public class FetcherTest {
         assertFalse(allMetrics.containsKey(partitionLagMetric));
     }
 
-    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) {
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, Errors error, long hw, int throttleTime) {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(records, error, hw, throttleTime));
         consumerClient.poll(0);
@@ -882,13 +882,13 @@ public class FetcherTest {
     }
 
     private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
-        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
+        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset);
         Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
         allPartitionData.put(tp, partitionData);
         return new ListOffsetResponse(allPartitionData, 1);
     }
 
-    private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) {
+    private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
         return new FetchResponse(
                 new LinkedHashMap<>(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records))),
                 throttleTime);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 7f5fe15..6e054d0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -99,7 +99,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
-        client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
+        client.respond(produceResponse(tp, offset, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
         sender.run(time.milliseconds());
@@ -114,9 +114,9 @@ public class SenderTest {
     public void testQuotaMetrics() throws Exception {
         final long offset = 0;
         for (int i = 1; i <= 3; i++) {
-            Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
             sender.run(time.milliseconds()); // send produce request
-            client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i));
+            client.respond(produceResponse(tp, offset, Errors.NONE, 100 * i));
             sender.run(time.milliseconds());
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@@ -158,7 +158,7 @@ public class SenderTest {
             sender.run(time.milliseconds()); // resend
             assertEquals(1, client.inFlightRequestCount());
             long offset = 0;
-            client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
+            client.respond(produceResponse(tp, offset, Errors.NONE, 0));
             sender.run(time.milliseconds());
             assertTrue("Request should have retried and completed", future.isDone());
             assertEquals(offset, future.get().offset());
@@ -239,7 +239,7 @@ public class SenderTest {
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
         metadata.update(cluster, time.milliseconds());
         sender.run(time.milliseconds());  // send produce request
-        client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
+        client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         sender.run(time.milliseconds());
@@ -254,7 +254,7 @@ public class SenderTest {
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
         metadata.update(cluster, time.milliseconds());
         sender.run(time.milliseconds());  // send produce request
-        client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
+        client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         sender.run(time.milliseconds());
@@ -271,9 +271,8 @@ public class SenderTest {
         }
     }
 
-    private ProduceResponse produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.forCode((short) error),
-                offset, Record.NO_TIMESTAMP);
+    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, Record.NO_TIMESTAMP);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 69f6276..699f6e2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -204,7 +204,7 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
-        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records));
 
         FetchResponse v0Response = new FetchResponse(0, responseData, 0);
         FetchResponse v1Response = new FetchResponse(1, responseData, 10);
@@ -252,7 +252,7 @@ public class RequestResponseTest {
         response.writeTo(buffer);
         buffer.rewind();
         ControlledShutdownResponse deserialized = ControlledShutdownResponse.parse(buffer);
-        assertEquals(response.errorCode(), deserialized.errorCode());
+        assertEquals(response.error(), deserialized.error());
         assertEquals(response.partitionsRemaining(), deserialized.partitionsRemaining());
     }
 
@@ -287,7 +287,7 @@ public class RequestResponseTest {
     }
 
     private GroupCoordinatorResponse createGroupCoordinatorResponse() {
-        return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
+        return new GroupCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
     }
 
     private FetchRequest createFetchRequest(int version) {
@@ -301,7 +301,7 @@ public class RequestResponseTest {
     private FetchResponse createFetchResponse() {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
-        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records));
         return new FetchResponse(responseData, 25);
     }
 
@@ -310,7 +310,7 @@ public class RequestResponseTest {
     }
 
     private HeartbeatResponse createHeartBeatResponse() {
-        return new HeartbeatResponse(Errors.NONE.code());
+        return new HeartbeatResponse(Errors.NONE);
     }
 
     @SuppressWarnings("deprecation")
@@ -331,7 +331,7 @@ public class RequestResponseTest {
         Map<String, ByteBuffer> members = new HashMap<>();
         members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
         members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
-        return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
+        return new JoinGroupResponse(Errors.NONE, 1, "range", "consumer1", "leader", members);
     }
 
     private ListGroupsRequest createListGroupsRequest() {
@@ -340,7 +340,7 @@ public class RequestResponseTest {
 
     private ListGroupsResponse createListGroupsResponse() {
         List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
-        return new ListGroupsResponse(Errors.NONE.code(), groups);
+        return new ListGroupsResponse(Errors.NONE, groups);
     }
 
     private DescribeGroupsRequest createDescribeGroupRequest() {
@@ -353,7 +353,7 @@ public class RequestResponseTest {
         ByteBuffer empty = ByteBuffer.allocate(0);
         DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
                 clientId, clientHost, empty, empty);
-        DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(),
+        DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE,
                 "STABLE", "consumer", "roundrobin", Arrays.asList(member));
         return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
     }
@@ -363,7 +363,7 @@ public class RequestResponseTest {
     }
 
     private LeaveGroupResponse createLeaveGroupResponse() {
-        return new LeaveGroupResponse(Errors.NONE.code());
+        return new LeaveGroupResponse(Errors.NONE);
     }
 
     @SuppressWarnings("deprecation")
@@ -386,11 +386,11 @@ public class RequestResponseTest {
     private ListOffsetResponse createListOffsetResponse(int version) {
         if (version == 0) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
-            responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
+            responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE, Arrays.asList(100L)));
             return new ListOffsetResponse(responseData);
         } else if (version == 1) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
-            responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), 10000L, 100L));
+            responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L));
             return new ListOffsetResponse(responseData, 1);
         } else {
             throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version);
@@ -430,8 +430,8 @@ public class RequestResponseTest {
     }
 
     private OffsetCommitResponse createOffsetCommitResponse() {
-        Map<TopicPartition, Short> responseData = new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
+        Map<TopicPartition, Errors> responseData = new HashMap<>();
+        responseData.put(new TopicPartition("test", 0), Errors.NONE);
         return new OffsetCommitResponse(responseData);
     }
 
@@ -467,9 +467,9 @@ public class RequestResponseTest {
     }
 
     private StopReplicaResponse createStopReplicaResponse() {
-        Map<TopicPartition, Short> responses = new HashMap<>();
-        responses.put(new TopicPartition("test", 0), Errors.NONE.code());
-        return new StopReplicaResponse(Errors.NONE.code(), responses);
+        Map<TopicPartition, Errors> responses = new HashMap<>();
+        responses.put(new TopicPartition("test", 0), Errors.NONE);
+        return new StopReplicaResponse(Errors.NONE, responses);
     }
 
     private ControlledShutdownRequest createControlledShutdownRequest() {
@@ -481,7 +481,7 @@ public class RequestResponseTest {
                 new TopicPartition("test2", 5),
                 new TopicPartition("test1", 10)
         ));
-        return new ControlledShutdownResponse(Errors.NONE.code(), topicPartitions);
+        return new ControlledShutdownResponse(Errors.NONE, topicPartitions);
     }
 
     private LeaderAndIsrRequest createLeaderAndIsrRequest() {
@@ -504,9 +504,9 @@ public class RequestResponseTest {
     }
 
     private LeaderAndIsrResponse createLeaderAndIsrResponse() {
-        Map<TopicPartition, Short> responses = new HashMap<>();
-        responses.put(new TopicPartition("test", 0), Errors.NONE.code());
-        return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
+        Map<TopicPartition, Errors> responses = new HashMap<>();
+        responses.put(new TopicPartition("test", 0), Errors.NONE);
+        return new LeaderAndIsrResponse(Errors.NONE, responses);
     }
 
     private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
@@ -545,7 +545,7 @@ public class RequestResponseTest {
     }
 
     private UpdateMetadataResponse createUpdateMetadataResponse() {
-        return new UpdateMetadataResponse(Errors.NONE.code());
+        return new UpdateMetadataResponse(Errors.NONE);
     }
 
     private SaslHandshakeRequest createSaslHandshakeRequest() {
@@ -553,7 +553,7 @@ public class RequestResponseTest {
     }
 
     private SaslHandshakeResponse createSaslHandshakeResponse() {
-        return new SaslHandshakeResponse(Errors.NONE.code(), singletonList("GSSAPI"));
+        return new SaslHandshakeResponse(Errors.NONE, singletonList("GSSAPI"));
     }
 
     private ApiVersionsRequest createApiVersionRequest() {
@@ -562,7 +562,7 @@ public class RequestResponseTest {
 
     private ApiVersionsResponse createApiVersionResponse() {
         List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
-        return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
+        return new ApiVersionsResponse(Errors.NONE, apiVersions);
     }
 
     private CreateTopicsRequest createCreateTopicRequest(int version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index bc967af..76fb9b3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -393,7 +393,7 @@ public class SaslAuthenticatorTest {
         ByteBuffer responseBuffer = waitForResponse();
         ResponseHeader.parse(responseBuffer);
         ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer);
-        assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.errorCode());
+        assertEquals(Errors.UNSUPPORTED_VERSION, response.error());
 
         // Send ApiVersionsRequest with a supported version. This should succeed.
         sendVersionRequestReceiveResponse(node);
@@ -838,14 +838,14 @@ public class SaslAuthenticatorTest {
     private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception {
         SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN");
         SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
-        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(Errors.NONE, response.error());
         return response;
     }
 
     private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception {
         ApiVersionsRequest handshakeRequest = new ApiVersionsRequest.Builder().build();
         ApiVersionsResponse response =  (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
-        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(Errors.NONE, response.error());
         return response;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 92393a1..680f4ca 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -202,14 +202,14 @@ public class WorkerCoordinatorTest {
 
         final String consumerId = "leader";
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
         Map<String, Long> memberConfigOffsets = new HashMap<>();
         memberConfigOffsets.put("leader", 1L);
         memberConfigOffsets.put("member", 1L);
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -219,7 +219,7 @@ public class WorkerCoordinatorTest {
                         sync.groupAssignment().containsKey(consumerId);
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
-                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
         coordinator.ensureActiveGroup();
 
         assertFalse(coordinator.needRejoin());
@@ -242,11 +242,11 @@ public class WorkerCoordinatorTest {
 
         final String memberId = "member";
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
-        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -256,7 +256,7 @@ public class WorkerCoordinatorTest {
                         sync.groupAssignment().isEmpty();
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
-                Collections.singletonList(taskId1x0), Errors.NONE.code()));
+                Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
         assertFalse(coordinator.needRejoin());
@@ -283,11 +283,11 @@ public class WorkerCoordinatorTest {
 
         final String memberId = "member";
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // config mismatch results in assignment error
-        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
         MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -298,10 +298,10 @@ public class WorkerCoordinatorTest {
             }
         };
         client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
-                Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
-        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+                Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
         client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
-                Collections.<String>emptyList(), Collections.singletonList(taskId1x0), Errors.NONE.code()));
+                Collections.<String>emptyList(), Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
         PowerMock.verifyAll();
@@ -314,13 +314,13 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // join the group once
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
-                Collections.singletonList(taskId1x0), Errors.NONE.code()));
+                Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
         assertEquals(0, rebalanceListener.revokedCount);
@@ -332,9 +332,9 @@ public class WorkerCoordinatorTest {
 
         // and join the group again
         coordinator.requestRejoin();
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
-                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
         coordinator.ensureActiveGroup();
 
         assertEquals(1, rebalanceListener.revokedCount);
@@ -459,12 +459,12 @@ public class WorkerCoordinatorTest {
     }
 
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) {
+    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
         return new GroupCoordinatorResponse(error, node);
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
-                                           Map<String, Long> configOffsets, short error) {
+                                           Map<String, Long> configOffsets, Errors error) {
         Map<String, ByteBuffer> metadata = new HashMap<>();
         for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
             // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID
@@ -476,13 +476,13 @@ public class WorkerCoordinatorTest {
         return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata);
     }
 
-    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
                 Collections.<String, ByteBuffer>emptyMap());
     }
 
     private SyncGroupResponse syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
-                                     List<ConnectorTaskId> taskIds, short error) {
+                                     List<ConnectorTaskId> taskIds, Errors error) {
         ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
         ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment);
         return new SyncGroupResponse(error, buf);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 13b6571..25e64eb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -69,19 +69,19 @@ class AdminClient(val time: Time,
   def findCoordinator(groupId: String): Node = {
     val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
     val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
-    Errors.forCode(response.errorCode).maybeThrow()
+    response.error.maybeThrow()
     response.node
   }
 
   def listGroups(node: Node): List[GroupOverview] = {
     val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
-    Errors.forCode(response.errorCode).maybeThrow()
+    response.error.maybeThrow()
     response.groups.asScala.map(group => GroupOverview(group.groupId, group.protocolType)).toList
   }
 
   def getApiVersions(node: Node): List[ApiVersion] = {
     val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
-    Errors.forCode(response.errorCode).maybeThrow()
+    response.error.maybeThrow()
     response.apiVersions.asScala.toList
   }
 
@@ -164,7 +164,7 @@ class AdminClient(val time: Time,
     if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
       throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")
 
-    Errors.forCode(metadata.errorCode()).maybeThrow()
+    metadata.error.maybeThrow()
     val consumers = metadata.members.asScala.map { consumer =>
       ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
         case "Stable" =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index c3d9f24..11f4f89 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -318,10 +318,10 @@ object ConsumerGroupCommand extends Logging {
               case z: ZkNoNodeException =>
                 printError(s"Could not fetch offset from zookeeper for group '$group' partition '$topicAndPartition' due to missing offset data in zookeeper.", Some(z))
             }
-          case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE.code =>
+          case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE =>
             offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
           case _ =>
-            printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${Errors.forCode(offsetAndMetadata.error).exception}.")
+            printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${offsetAndMetadata.error.message}.")
         }
       }
       channel.disconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 52c8828..46ae1e7 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -67,8 +67,8 @@ case class ControlledShutdownRequest(versionId: Short,
     describe(true)
   }
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e).code, Set.empty[TopicAndPartition])
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition])
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 1ba5cfa..e0a03e8 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -26,7 +26,7 @@ import collection.Set
 object ControlledShutdownResponse {
   def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {
     val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
+    val error = Errors.forCode(buffer.getShort)
     val numEntries = buffer.getInt
 
     var partitionsRemaining = Set[TopicAndPartition]()
@@ -35,13 +35,13 @@ object ControlledShutdownResponse {
       val partition = buffer.getInt
       partitionsRemaining += new TopicAndPartition(topic, partition)
     }
-    new ControlledShutdownResponse(correlationId, errorCode, partitionsRemaining)
+    new ControlledShutdownResponse(correlationId, error, partitionsRemaining)
   }
 }
 
 
 case class ControlledShutdownResponse(correlationId: Int,
-                                      errorCode: Short = Errors.NONE.code,
+                                      error: Errors = Errors.NONE,
                                       partitionsRemaining: Set[TopicAndPartition])
   extends RequestOrResponse() {
   def sizeInBytes(): Int ={
@@ -59,7 +59,7 @@ case class ControlledShutdownResponse(correlationId: Int,
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
+    buffer.putShort(error.code)
     buffer.putInt(partitionsRemaining.size)
     for (topicAndPartition:TopicAndPartition <- partitionsRemaining){
       writeShortString(buffer, topicAndPartition.topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 5e5360a..97da1f5 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -203,7 +203,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
     val responseData = new util.LinkedHashMap[TopicPartition, JFetchResponse.PartitionData]
     requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
       responseData.put(new TopicPartition(topic, partition),
-        new JFetchResponse.PartitionData(Errors.forException(e).code, -1, MemoryRecords.EMPTY))
+        new JFetchResponse.PartitionData(Errors.forException(e), -1, MemoryRecords.EMPTY))
     }
     val errorResponse = new JFetchResponse(versionId, responseData, 0)
     // Magic value does not matter here because the message set is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index cd346cb..0f677f1 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -28,7 +28,7 @@ import scala.collection._
 
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
-    val error = buffer.getShort
+    val error = Errors.forCode(buffer.getShort)
     val hw = buffer.getLong
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
@@ -43,7 +43,7 @@ object FetchResponsePartitionData {
     4 /* messageSetSize */
 }
 
-case class FetchResponsePartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(error: Errors = Errors.NONE, hw: Long = -1L, messages: MessageSet) {
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 }
 
@@ -166,7 +166,7 @@ case class FetchResponse(correlationId: Int,
 
   def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw
 
-  def hasError = dataByTopicAndPartition.values.exists(_.error != Errors.NONE.code)
+  def hasError = dataByTopicAndPartition.values.exists(_.error != Errors.NONE)
 
-  def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
+  def error(topic: String, partition: Int) = partitionDataFor(topic, partition).error
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 5f88136..2082e44 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -64,7 +64,7 @@ case class GroupCoordinatorRequest(group: String,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, correlationId)
+    val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, correlationId)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
index 83ba96e..782eebb 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -28,19 +28,19 @@ object GroupCoordinatorResponse {
 
   def readFrom(buffer: ByteBuffer) = {
     val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
+    val error = Errors.forCode(buffer.getShort)
     val broker = BrokerEndPoint.readFrom(buffer)
-    val coordinatorOpt = if (errorCode == Errors.NONE.code)
+    val coordinatorOpt = if (error == Errors.NONE)
       Some(broker)
     else
       None
 
-    GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
+    GroupCoordinatorResponse(coordinatorOpt, error, correlationId)
   }
 
 }
 
-case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], error: Errors, correlationId: Int)
   extends RequestOrResponse() {
 
   def sizeInBytes =
@@ -50,7 +50,7 @@ case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], err
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
+    buffer.putShort(error.code)
     coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 95663af..b9693f6 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -161,8 +161,8 @@ case class OffsetCommitRequest(groupId: String,
     })
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorCode = Errors.forException(e).code
-    val commitStatus = requestInfo.mapValues(_ => errorCode)
+    val error = Errors.forException(e)
+    val commitStatus = requestInfo.mapValues(_ => error)
     val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
 
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 94223c7..ace480a 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -34,7 +34,7 @@ object OffsetCommitResponse extends Logging {
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partitionId = buffer.getInt
-        val error = buffer.getShort
+        val error = Errors.forCode(buffer.getShort)
         (TopicAndPartition(topic, partitionId), error)
       })
     })
@@ -42,13 +42,13 @@ object OffsetCommitResponse extends Logging {
   }
 }
 
-case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
+case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Errors],
                                 correlationId: Int = 0)
     extends RequestOrResponse() {
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
-  def hasError = commitStatus.values.exists(_ != Errors.NONE.code)
+  def hasError = commitStatus.values.exists(_ != Errors.NONE)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
@@ -56,9 +56,9 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
     commitStatusGroupedByTopic.foreach { case(topic, statusMap) =>
       ApiUtils.writeShortString(buffer, topic)
       buffer.putInt(statusMap.size) // partition count
-      statusMap.foreach { case(topicAndPartition, errorCode) =>
+      statusMap.foreach { case(topicAndPartition, error) =>
         buffer.putInt(topicAndPartition.partition)
-        buffer.putShort(errorCode)
+        buffer.putShort(error.code)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index dac4cc5..33d3795 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -97,13 +97,13 @@ case class OffsetFetchRequest(groupId: String,
     val responseMap =
       if (requestVersion < 2) {
         requestInfo.map {
-          topicAndPartition => (topicAndPartition, OffsetMetadataAndError(thrownError.code))
+          topicAndPartition => (topicAndPartition, OffsetMetadataAndError(thrownError))
         }.toMap
       } else {
         Map[kafka.common.TopicAndPartition, kafka.common.OffsetMetadataAndError]()
       }
 
-    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, errorCode=thrownError.code)
+    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, error=thrownError)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index 791cd6c..0691503 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -41,24 +41,24 @@ object OffsetFetchResponse extends Logging {
         val partitionId = buffer.getInt
         val offset = buffer.getLong
         val metadata = readShortString(buffer)
-        val error = buffer.getShort
+        val error = Errors.forCode(buffer.getShort)
         (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
       })
     })
 
-    val errorCode = requestVersion match {
-      case 0 | 1 => Errors.NONE.code
-      case _ => buffer.getShort
+    val error = requestVersion match {
+      case 0 | 1 => Errors.NONE
+      case _ => Errors.forCode(buffer.getShort)
     }
 
-    OffsetFetchResponse(Map(pairs:_*), requestVersion, correlationId, errorCode)
+    OffsetFetchResponse(Map(pairs:_*), requestVersion, correlationId, error)
   }
 }
 
 case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
                                requestVersion: Int = OffsetFetchRequest.CurrentVersion,
                                correlationId: Int = 0,
-                               errorCode: Short = Errors.NONE.code)
+                               error: Errors = Errors.NONE)
     extends RequestOrResponse() {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
@@ -73,13 +73,13 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
         buffer.putInt(t2._1.partition)
         buffer.putLong(t2._2.offset)
         writeShortString(buffer, t2._2.metadata)
-        buffer.putShort(t2._2.error)
+        buffer.putShort(t2._2.error.code)
       })
     })
 
     // the top level error_code was introduced in v2
     if (requestVersion > 1)
-      buffer.putShort(errorCode)
+      buffer.putShort(error.code)
   }
 
   override def sizeInBytes =

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 416dd73..879d60d 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -115,7 +115,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) =>
-        (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
+        (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), Nil))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index b767c08..8d23d41 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -33,7 +33,7 @@ object OffsetResponse {
       val numPartitions = buffer.getInt
       (1 to numPartitions).map(_ => {
         val partition = buffer.getInt
-        val error = buffer.getShort
+        val error = Errors.forCode(buffer.getShort)
         val numOffsets = buffer.getInt
         val offsets = (1 to numOffsets).map(_ => buffer.getLong)
         (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
@@ -45,9 +45,9 @@ object OffsetResponse {
 }
 
 
-case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
+case class PartitionOffsetsResponse(error: Errors, offsets: Seq[Long]) {
   override def toString: String = {
-    new String("error: " + Errors.forCode(error).exceptionName + " offsets: " + offsets.mkString)
+    new String("error: " + error.exceptionName + " offsets: " + offsets.mkString)
   }
 }
 
@@ -58,7 +58,7 @@ case class OffsetResponse(correlationId: Int,
 
   lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
 
-  def hasError = partitionErrorAndOffsets.values.exists(_.error != Errors.NONE.code)
+  def hasError = partitionErrorAndOffsets.values.exists(_.error != Errors.NONE)
 
   val sizeInBytes = {
     4 + /* correlation id */
@@ -88,7 +88,7 @@ case class OffsetResponse(correlationId: Int,
         errorAndOffsetsMap.foreach {
           case((TopicAndPartition(_, partition), errorAndOffsets)) =>
             buffer.putInt(partition)
-            buffer.putShort(errorAndOffsets.error)
+            buffer.putShort(errorAndOffsets.error.code)
             buffer.putInt(errorAndOffsets.offsets.size) // offset array length
             errorAndOffsets.offsets.foreach(buffer.putLong(_))
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index f6e4475..a87cdc9 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -134,7 +134,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     }
     else {
       val producerResponseStatus = data.map { case (topicAndPartition, _) =>
-        (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
+        (topicAndPartition, ProducerResponseStatus(Errors.forException(e), -1l, Message.NoTimestamp))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 89d7538..b09c73b 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -35,7 +35,7 @@ object ProducerResponse {
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partition = buffer.getInt
-        val error = buffer.getShort
+        val error = Errors.forCode(buffer.getShort)
         val offset = buffer.getLong
         val timestamp = buffer.getLong
         (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset, timestamp))
@@ -47,7 +47,7 @@ object ProducerResponse {
   }
 }
 
-case class ProducerResponseStatus(var error: Short, offset: Long, timestamp: Long = Message.NoTimestamp)
+case class ProducerResponseStatus(var error: Errors, offset: Long, timestamp: Long = Message.NoTimestamp)
 
 case class ProducerResponse(correlationId: Int,
                             status: Map[TopicAndPartition, ProducerResponseStatus],
@@ -60,7 +60,7 @@ case class ProducerResponse(correlationId: Int,
    */
   private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
 
-  def hasError = status.values.exists(_.error != Errors.NONE.code)
+  def hasError = status.values.exists(_.error != Errors.NONE)
 
   val sizeInBytes = {
     val throttleTimeSize = if (requestVersion > 0) 4 else 0
@@ -93,7 +93,7 @@ case class ProducerResponse(correlationId: Int,
       errorsAndOffsets.foreach {
         case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset, timestamp))) =>
           buffer.putInt(partition)
-          buffer.putShort(error)
+          buffer.putShort(error.code)
           buffer.putLong(nextOffset)
           buffer.putLong(timestamp)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 7b05a8b..e4d730c 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -28,7 +28,7 @@ object TopicMetadata {
   val NoLeaderNodeId = -1
 
   def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = {
-    val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
+    val error = Errors.forCode(readShortInRange(buffer, "error code", (-1, Short.MaxValue)))
     val topic = readShortString(buffer)
     val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
     val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions)
@@ -36,11 +36,11 @@ object TopicMetadata {
       val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers)
       partitionsMetadata(i) = partitionMetadata
     }
-    new TopicMetadata(topic, partitionsMetadata, errorCode)
+    new TopicMetadata(topic, partitionsMetadata, error)
   }
 }
 
-case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = Errors.NONE.code) extends Logging {
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], error: Errors = Errors.NONE) extends Logging {
   def sizeInBytes: Int = {
     2 /* error code */ +
     shortStringLength(topic) +
@@ -49,7 +49,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
 
   def writeTo(buffer: ByteBuffer) {
     /* error code */
-    buffer.putShort(errorCode)
+    buffer.putShort(error.code)
     /* topic */
     writeShortString(buffer, topic)
     /* number of partitions */
@@ -60,10 +60,10 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
   override def toString: String = {
     val topicMetadataInfo = new StringBuilder
     topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
-    Errors.forCode(errorCode) match {
+    error match {
       case Errors.NONE =>
         partitionsMetadata.foreach { partitionMetadata =>
-          Errors.forCode(partitionMetadata.errorCode) match {
+          partitionMetadata.error match {
             case Errors.NONE =>
               topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %s".format(topic,
                 partitionMetadata.partitionId, partitionMetadata.toString()))
@@ -89,7 +89,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
 object PartitionMetadata {
 
   def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): PartitionMetadata = {
-    val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
+    val error = Errors.forCode(readShortInRange(buffer, "error code", (-1, Short.MaxValue)))
     val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
     val leaderId = buffer.getInt
     val leader = brokers.get(leaderId)
@@ -104,7 +104,7 @@ object PartitionMetadata {
     val isrIds = (0 until numIsr).map(_ => buffer.getInt)
     val isr = isrIds.map(brokers)
 
-    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode)
+    new PartitionMetadata(partitionId, leader, replicas, isr, error)
   }
 }
 
@@ -112,7 +112,7 @@ case class PartitionMetadata(partitionId: Int,
                              leader: Option[BrokerEndPoint],
                              replicas: Seq[BrokerEndPoint],
                              isr: Seq[BrokerEndPoint] = Seq.empty,
-                             errorCode: Short = Errors.NONE.code) extends Logging {
+                             error: Errors = Errors.NONE) extends Logging {
   def sizeInBytes: Int = {
     2 /* error code */ +
     4 /* partition id */ +
@@ -122,7 +122,7 @@ case class PartitionMetadata(partitionId: Int,
   }
 
   def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(errorCode)
+    buffer.putShort(error.code)
     buffer.putInt(partitionId)
 
     /* leader */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index c64b268..403152a 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -61,7 +61,7 @@ case class TopicMetadataRequest(versionId: Short,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val topicMetadata = topics.map {
-      topic => TopicMetadata(topic, Nil, Errors.forException(e).code)
+      topic => TopicMetadata(topic, Nil, Errors.forException(e))
     }
     val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index dbb8a76..5508549 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -168,7 +168,7 @@ object ClientUtils extends Logging{
            val response = queryChannel.receive()
            val consumerMetadataResponse =  GroupCoordinatorResponse.readFrom(response.payload())
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
-           if (consumerMetadataResponse.errorCode == Errors.NONE.code)
+           if (consumerMetadataResponse.error == Errors.NONE)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt
            else {
              debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds."

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index be5fed2..9f29073 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -30,14 +30,14 @@ import scala.Predef._
 object ErrorMapping {
   val EmptyByteBuffer = ByteBuffer.allocate(0)
 
-  val UnknownCode : Short = -1
-  val NoError : Short = 0
-  val OffsetOutOfRangeCode : Short = 1
-  val InvalidMessageCode : Short = 2
-  val UnknownTopicOrPartitionCode : Short = 3
-  val InvalidFetchSizeCode  : Short = 4
-  val LeaderNotAvailableCode : Short = 5
-  val NotLeaderForPartitionCode : Short = 6
+  val UnknownCode: Short = -1
+  val NoError: Short = 0
+  val OffsetOutOfRangeCode: Short = 1
+  val InvalidMessageCode: Short = 2
+  val UnknownTopicOrPartitionCode: Short = 3
+  val InvalidFetchSizeCode: Short = 4
+  val LeaderNotAvailableCode: Short = 5
+  val NotLeaderForPartitionCode: Short = 6
   val RequestTimedOutCode: Short = 7
   val BrokerNotAvailableCode: Short = 8
   val ReplicaNotAvailableCode: Short = 9
@@ -48,9 +48,9 @@ object ErrorMapping {
   val OffsetsLoadInProgressCode: Short = 14
   val ConsumerCoordinatorNotAvailableCode: Short = 15
   val NotCoordinatorForConsumerCode: Short = 16
-  val InvalidTopicCode : Short = 17
+  val InvalidTopicCode: Short = 17
   val MessageSetSizeTooLargeCode: Short = 18
-  val NotEnoughReplicasCode : Short = 19
+  val NotEnoughReplicasCode: Short = 19
   val NotEnoughReplicasAfterAppendCode: Short = 20
   // 21: InvalidRequiredAcks
   // 22: IllegalConsumerGeneration

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 4d9ae40..46c0881 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -53,7 +53,7 @@ object OffsetAndMetadata {
   def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
 }
 
-case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) {
+case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors = Errors.NONE) {
   def offset = offsetMetadata.offset
 
   def metadata = offsetMetadata.metadata
@@ -62,19 +62,19 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short =
 }
 
 object OffsetMetadataAndError {
-  val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code)
-  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS.code)
-  val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code)
-  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code)
-  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
-  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-  val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
+  val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE)
+  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS)
+  val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID)
+  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP)
+  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+  val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION)
 
-  def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE.code)
+  def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE)
 
-  def apply(error: Short) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
+  def apply(error: Errors) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
 
-  def apply(offset: Long, metadata: String, error: Short) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error)
+  def apply(offset: Long, metadata: String, error: Errors) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error)
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f702b9d..2827a2f 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -26,6 +26,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition}
 import scala.collection.Map
 import ConsumerFetcherThread._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 
 class ConsumerFetcherThread(name: String,
@@ -124,11 +125,11 @@ object ConsumerFetcherThread {
   }
 
   class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData {
-    def errorCode: Short = underlying.error
+    def error = underlying.error
     def toRecords: MemoryRecords = underlying.messages.asInstanceOf[ByteBufferMessageSet].asRecords
     def highWatermark: Long = underlying.hw
     def exception: Option[Throwable] =
-      if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode))
+      if (error == Errors.NONE) None else Some(ErrorMapping.exceptionFor(error.code))
 
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index f7e8bcb..e93f08c 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -26,6 +26,7 @@ import kafka.network._
 import kafka.utils._
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import org.apache.kafka.common.network.{NetworkReceive}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Utils._
 
 /**
@@ -187,8 +188,8 @@ class SimpleConsumer(val host: String,
                                 replicaId = consumerId)
     val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
     val offset = partitionErrorAndOffset.error match {
-      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
-      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+      case Errors.NONE => partitionErrorAndOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error.code)
     }
     offset
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 9bf0d20..fde4710 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -356,25 +356,25 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               trace("Offset commit response: %s.".format(offsetCommitResponse))
 
               val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
-                offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) =>
+                offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, error)) =>
 
-                  if (errorCode == Errors.NONE.code && config.dualCommitEnabled) {
+                  if (error == Errors.NONE && config.dualCommitEnabled) {
                     val offset = offsetsToCommit(topicPartition).offset
                     commitOffsetToZooKeeper(topicPartition, offset)
                   }
 
                   (folded._1 || // update commitFailed
-                    errorCode != Errors.NONE.code,
+                    error != Errors.NONE,
 
                     folded._2 || // update retryableIfFailed - (only metadata too large is not retryable)
-                      (errorCode != Errors.NONE.code && errorCode != Errors.OFFSET_METADATA_TOO_LARGE.code),
+                      (error != Errors.NONE && error != Errors.OFFSET_METADATA_TOO_LARGE),
 
                     folded._3 || // update shouldRefreshCoordinator
-                      errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code ||
-                      errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code,
+                      error == Errors.NOT_COORDINATOR_FOR_GROUP ||
+                      error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE,
 
                     // update error count
-                    folded._4 + (if (errorCode != Errors.NONE.code) 1 else 0))
+                    folded._4 + (if (error != Errors.NONE) 1 else 0))
                 }
               }
               debug(errorCount + " errors in offset commit response.")
@@ -444,8 +444,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             val (leaderChanged, loadInProgress) =
               offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
-                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
-                 folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
+                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP),
+                 folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS))
               }
 
             if (leaderChanged) {
@@ -465,7 +465,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                 val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) =>
                   val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset
                   val mostRecentOffset = zkOffset.max(kafkaOffset.offset)
-                  (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE.code))
+                  (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE))
                 }
                 Some(OffsetFetchResponse(mostRecentOffsets))
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 823f9b4..fadc736 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -384,8 +384,8 @@ class TopicDeletionManager(controller: KafkaController,
     debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
     val responseMap = stopReplicaResponse.responses.asScala
     val partitionsInError =
-      if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
-      else responseMap.filter { case (_, error) => error != Errors.NONE.code }.keySet
+      if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
+      else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
     val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
     inLock(controllerContext.controllerLock) {
       // move all the failed replicas to ReplicaDeletionIneligible


Mime
View raw message