kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/5] kafka git commit: MINOR: Use an explicit `Errors` object when possible instead of a numeric error code
Date Fri, 10 Feb 2017 05:21:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a15fcea79 -> 9898d665d


http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 11f9102..7b40187 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -41,14 +41,14 @@ import scala.concurrent.{Await, Future, Promise}
  */
 class GroupCoordinatorResponseTest extends JUnitSuite {
   type JoinGroupCallback = JoinGroupResult => Unit
-  type SyncGroupCallbackParams = (Array[Byte], Short)
-  type SyncGroupCallback = (Array[Byte], Short) => Unit
-  type HeartbeatCallbackParams = Short
-  type HeartbeatCallback = Short => Unit
-  type CommitOffsetCallbackParams = Map[TopicPartition, Short]
-  type CommitOffsetCallback = Map[TopicPartition, Short] => Unit
-  type LeaveGroupCallbackParams = Short
-  type LeaveGroupCallback = Short => Unit
+  type SyncGroupCallbackParams = (Array[Byte], Errors)
+  type SyncGroupCallback = (Array[Byte], Errors) => Unit
+  type HeartbeatCallbackParams = Errors
+  type HeartbeatCallback = Errors => Unit
+  type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
+  type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
+  type LeaveGroupCallbackParams = Errors
+  type LeaveGroupCallback = Errors => Unit
 
   val ClientId = "consumer-test"
   val ClientHost = "localhost"
@@ -115,8 +115,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, joinGroupError)
   }
 
   @Test
@@ -124,8 +124,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMinSessionTimeout - 1)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
 
   @Test
@@ -133,15 +133,15 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
 
   @Test
   def testJoinGroupUnknownConsumerNewGroup() {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupError)
   }
 
   @Test
@@ -150,7 +150,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
+    assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error)
   }
 
   @Test
@@ -158,8 +158,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
   }
 
   @Test
@@ -168,11 +168,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, otherMemberId, "connect", protocols)
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
   }
 
   @Test
@@ -182,11 +182,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, List(("range", metadata)))
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
   }
 
   @Test
@@ -195,25 +195,25 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val otherMemberId = "memberId"
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, protocols)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, otherJoinGroupResult.errorCode)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, otherJoinGroupResult.error)
   }
 
   @Test
   def testHeartbeatWrongCoordinator() {
 
     val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, heartbeatResult)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, heartbeatResult)
   }
 
   @Test
   def testHeartbeatUnknownGroup() {
 
     val heartbeatResult = heartbeat(groupId, memberId, -1)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
   }
 
   @Test
@@ -223,17 +223,17 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, otherMemberId, 1)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
   }
 
   @Test
@@ -242,12 +242,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
   }
 
   @Test
@@ -256,17 +256,17 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
-    assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+    assertEquals(Errors.ILLEGAL_GENERATION, heartbeatResult)
   }
 
   @Test
@@ -276,17 +276,17 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
-    assertEquals(Errors.NONE.code, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
   }
 
   @Test
@@ -296,12 +296,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
@@ -313,7 +313,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
   }
 
   @Test
@@ -325,24 +325,24 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupError)
 
     timer.advanceClock(sessionTimeout / 2)
 
     EasyMock.reset(replicaManager)
     var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
-    assertEquals(Errors.NONE.code, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
 
     timer.advanceClock(sessionTimeout / 2 + 100)
 
     EasyMock.reset(replicaManager)
     heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
-    assertEquals(Errors.NONE.code, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
   }
 
   @Test
@@ -356,24 +356,24 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupError)
 
     timer.advanceClock(sessionTimeout / 2)
 
     EasyMock.reset(replicaManager)
     val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset))
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     timer.advanceClock(sessionTimeout / 2 + 100)
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
-    assertEquals(Errors.NONE.code, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
   }
 
   @Test
@@ -384,11 +384,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
-    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+    assertEquals(Errors.NONE, firstJoinResult.error)
 
     EasyMock.reset(replicaManager)
     val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, firstSyncResult._2)
+    assertEquals(Errors.NONE, firstSyncResult._2)
 
     // now have a new member join to trigger a rebalance
     EasyMock.reset(replicaManager)
@@ -398,18 +398,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.reset(replicaManager)
     var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
     // letting the session expire should make the member fall out of the group
     timer.advanceClock(1100)
 
     EasyMock.reset(replicaManager)
     heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
 
     // and the rebalance should complete with only the new member
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertEquals(Errors.NONE, otherJoinResult.error)
   }
 
   @Test
@@ -420,11 +420,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
-    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+    assertEquals(Errors.NONE, firstJoinResult.error)
 
     EasyMock.reset(replicaManager)
     val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, firstSyncResult._2)
+    assertEquals(Errors.NONE, firstSyncResult._2)
 
     // now have a new member join to trigger a rebalance
     EasyMock.reset(replicaManager)
@@ -434,18 +434,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     timer.advanceClock(500)
     EasyMock.reset(replicaManager)
     var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
     timer.advanceClock(500)
     EasyMock.reset(replicaManager)
     heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
     // now timeout the rebalance, which should kick the unjoined member out of the group
     // and let the rebalance finish with only the new member
     timer.advanceClock(500)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertEquals(Errors.NONE, otherJoinResult.error)
   }
 
   @Test
@@ -455,18 +455,18 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
     assertTrue(syncGroupResult._1.isEmpty)
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
-    assertEquals(Errors.NONE.code, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
   }
 
   @Test
@@ -474,7 +474,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val generation = 1
 
     val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, syncGroupResult._2)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, syncGroupResult._2)
   }
 
   @Test
@@ -482,7 +482,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val generation = 1
 
     val syncGroupResult = syncGroupFollower(groupId, generation, memberId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, syncGroupResult._2)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2)
   }
 
   @Test
@@ -492,17 +492,17 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val unknownMemberId = "blah"
     val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, unknownMemberSyncResult._2)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, unknownMemberSyncResult._2)
   }
 
   @Test
@@ -512,12 +512,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     // send the sync group with an invalid generation
     val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
-    assertEquals(Errors.ILLEGAL_GENERATION.code, syncGroupResult._2)
+    assertEquals(Errors.ILLEGAL_GENERATION, syncGroupResult._2)
   }
 
   @Test
@@ -530,11 +530,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
-    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+    assertEquals(Errors.NONE, firstJoinResult.error)
 
     EasyMock.reset(replicaManager)
     val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, firstSyncResult._2)
+    assertEquals(Errors.NONE, firstSyncResult._2)
 
     EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@@ -544,8 +544,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, joinResult.errorCode)
-    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
     assertTrue(joinResult.generationId == otherJoinResult.generationId)
 
     assertEquals(firstMemberId, joinResult.leaderId)
@@ -557,7 +557,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, protocolType, protocols)
 
-    assertEquals(Errors.NONE.code, followerJoinResult.errorCode)
+    assertEquals(Errors.NONE, followerJoinResult.error)
     assertEquals(nextGenerationId, followerJoinResult.generationId)
   }
 
@@ -567,11 +567,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
-    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+    assertEquals(Errors.NONE, firstJoinResult.error)
 
     EasyMock.reset(replicaManager)
     val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, firstSyncResult._2)
+    assertEquals(Errors.NONE, firstSyncResult._2)
 
     // join groups from the leader should force the group to rebalance, which allows the
     // leader to push new assignments when local metadata changes
@@ -579,7 +579,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val secondJoinResult = joinGroup(groupId, firstMemberId, protocolType, protocols)
 
-    assertEquals(Errors.NONE.code, secondJoinResult.errorCode)
+    assertEquals(Errors.NONE, secondJoinResult.error)
     assertNotEquals(firstGenerationId, secondJoinResult.generationId)
   }
 
@@ -593,11 +593,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
-    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+    assertEquals(Errors.NONE, firstJoinResult.error)
 
     EasyMock.reset(replicaManager)
     val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, firstSyncResult._2)
+    assertEquals(Errors.NONE, firstSyncResult._2)
 
     EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@@ -607,8 +607,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, joinResult.errorCode)
-    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
     assertTrue(joinResult.generationId == otherJoinResult.generationId)
 
     assertEquals(firstMemberId, joinResult.leaderId)
@@ -624,7 +624,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     timer.advanceClock(DefaultSessionTimeout + 100)
 
     val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, followerSyncResult._2)
   }
 
   @Test
@@ -637,11 +637,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
-    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+    assertEquals(Errors.NONE, firstJoinResult.error)
 
     EasyMock.reset(replicaManager)
     val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    assertEquals(Errors.NONE.code, firstSyncResult._2)
+    assertEquals(Errors.NONE, firstSyncResult._2)
 
     EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@@ -651,8 +651,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, joinResult.errorCode)
-    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
     assertTrue(joinResult.generationId == otherJoinResult.generationId)
 
     assertEquals(firstMemberId, joinResult.leaderId)
@@ -667,12 +667,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
       Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
-    assertEquals(Errors.NONE.code, leaderSyncResult._2)
+    assertEquals(Errors.NONE, leaderSyncResult._2)
     assertEquals(leaderAssignment, leaderSyncResult._1)
 
     EasyMock.reset(replicaManager)
     val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
-    assertEquals(Errors.NONE.code, followerSyncResult._2)
+    assertEquals(Errors.NONE, followerSyncResult._2)
     assertEquals(followerAssignment, followerSyncResult._1)
   }
 
@@ -686,12 +686,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val firstMemberId = joinGroupResult.memberId
     val firstGenerationId = joinGroupResult.generationId
     assertEquals(firstMemberId, joinGroupResult.leaderId)
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@@ -701,8 +701,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, joinResult.errorCode)
-    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
     assertTrue(joinResult.generationId == otherJoinResult.generationId)
 
     val nextGenerationId = joinResult.generationId
@@ -720,11 +720,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
       Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
-    assertEquals(Errors.NONE.code, leaderSyncResult._2)
+    assertEquals(Errors.NONE, leaderSyncResult._2)
     assertEquals(leaderAssignment, leaderSyncResult._1)
 
     val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
-    assertEquals(Errors.NONE.code, followerSyncResult._2)
+    assertEquals(Errors.NONE, followerSyncResult._2)
     assertEquals(followerAssignment, followerSyncResult._1)
   }
 
@@ -735,7 +735,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
-    assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
+    assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
   }
 
   @Test
@@ -745,7 +745,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
       OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
   }
 
   @Test
@@ -755,7 +755,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
       OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, error)
@@ -791,9 +791,9 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
       OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3))
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp1))
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp2))
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp3))
+    assertEquals(Errors.NONE, commitOffsetResult(tp1))
+    assertEquals(Errors.NONE, commitOffsetResult(tp2))
+    assertEquals(Errors.NONE, commitOffsetResult(tp3))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId)
     assertEquals(Errors.NONE, error)
@@ -813,12 +813,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset))
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, commitOffsetResult(tp))
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
   }
 
   @Test
@@ -827,8 +827,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val initialGenerationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     // Then join with a new consumer to trigger a rebalance
     EasyMock.reset(replicaManager)
@@ -837,29 +837,29 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
   }
 
   @Test
   def testGenerationIdIncrementsOnRebalance() {
     val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     val initialGenerationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
+    val joinGroupError = joinGroupResult.error
     val memberId = joinGroupResult.memberId
     assertEquals(1, initialGenerationId)
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val nextGenerationId = otherJoinGroupResult.generationId
-    val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode
+    val otherJoinGroupError = otherJoinGroupResult.error
     assertEquals(2, nextGenerationId)
-    assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
+    assertEquals(Errors.NONE, otherJoinGroupError)
   }
 
   @Test
@@ -867,14 +867,14 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val leaveGroupResult = leaveGroup(otherGroupId, memberId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, leaveGroupResult)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, leaveGroupResult)
   }
 
   @Test
   def testLeaveGroupUnknownGroup() {
 
     val leaveGroupResult = leaveGroup(groupId, memberId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
   }
 
   @Test
@@ -883,12 +883,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val otherMemberId = "consumerId"
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val leaveGroupResult = leaveGroup(groupId, otherMemberId)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
   }
 
   @Test
@@ -897,12 +897,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
-    assertEquals(Errors.NONE.code, leaveGroupResult)
+    assertEquals(Errors.NONE, leaveGroupResult)
   }
 
   @Test
@@ -911,12 +911,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     val (error, groups) = groupCoordinator.handleListGroups()
     assertEquals(Errors.NONE, error)
@@ -928,7 +928,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testListGroupsIncludesRebalancingGroups() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+    assertEquals(Errors.NONE, joinGroupResult.error)
 
     val (error, groups) = groupCoordinator.handleListGroups()
     assertEquals(Errors.NONE, error)
@@ -957,14 +957,14 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
 
-    val syncGroupErrorCode = syncGroupResult._2
-    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
     val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -978,8 +978,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testDescribeGroupRebalancing() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
-    val joinGroupErrorCode = joinGroupResult.errorCode
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
     val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
@@ -1002,15 +1002,15 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = {
     val responsePromise = Promise[SyncGroupCallbackParams]
     val responseFuture = responsePromise.future
-    val responseCallback: SyncGroupCallback = (assignment, errorCode) =>
-      responsePromise.success((assignment, errorCode))
+    val responseCallback: SyncGroupCallback = (assignment, error) =>
+      responsePromise.success((assignment, error))
     (responseFuture, responseCallback)
   }
 
   private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
     val responsePromise = Promise[HeartbeatCallbackParams]
     val responseFuture = responsePromise.future
-    val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode)
+    val responseCallback: HeartbeatCallback = error => responsePromise.success(error)
     (responseFuture, responseCallback)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 30dfc63..86189aa 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -364,8 +364,8 @@ class GroupMetadataManagerTest {
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
-    def callback(errors: immutable.Map[TopicPartition, Short]) {
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
       commitErrors = Some(errors)
     }
 
@@ -376,7 +376,7 @@ class GroupMetadataManagerTest {
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NONE.code), maybeError)
+    assertEquals(Some(Errors.NONE), maybeError)
     assertTrue(group.hasOffsets)
 
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition)))
@@ -405,8 +405,8 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
-    def callback(errors: immutable.Map[TopicPartition, Short]) {
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
       commitErrors = Some(errors)
     }
 
@@ -414,7 +414,7 @@ class GroupMetadataManagerTest {
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError)
+    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
     EasyMock.verify(replicaManager)
   }
 
@@ -448,8 +448,8 @@ class GroupMetadataManagerTest {
     expectAppendMessage(appendError)
     EasyMock.replay(replicaManager)
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
-    def callback(errors: immutable.Map[TopicPartition, Short]) {
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
       commitErrors = Some(errors)
     }
 
@@ -460,7 +460,7 @@ class GroupMetadataManagerTest {
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(expectedError.code), maybeError)
+    assertEquals(Some(expectedError), maybeError)
     assertFalse(group.hasOffsets)
 
     val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition)))
@@ -492,8 +492,8 @@ class GroupMetadataManagerTest {
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
-    def callback(errors: immutable.Map[TopicPartition, Short]) {
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
       commitErrors = Some(errors)
     }
 
@@ -502,7 +502,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.store(delayedStore)
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
     // expire only one of the offsets
     time.sleep(2)
@@ -641,8 +641,8 @@ class GroupMetadataManagerTest {
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
-    def callback(errors: immutable.Map[TopicPartition, Short]) {
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
       commitErrors = Some(errors)
     }
 
@@ -651,7 +651,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.store(delayedStore)
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
     // expire all of the offsets
     time.sleep(4)
@@ -717,8 +717,8 @@ class GroupMetadataManagerTest {
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
-    def callback(errors: immutable.Map[TopicPartition, Short]) {
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
       commitErrors = Some(errors)
     }
 
@@ -727,7 +727,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.store(delayedStore)
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
     // expire all of the offsets
     time.sleep(4)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index b402b25..e53d348 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -70,8 +70,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
 
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000, 0).topicsMetadata
-    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
-    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
     val partitionMetadata = topicsMetadata.head.partitionsMetadata
@@ -91,10 +91,10 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // issue metadata request with empty list of topics
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
       2000, 0).topicsMetadata
-    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
     assertEquals(2, topicsMetadata.size)
-    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    assertEquals(Errors.NONE.code, topicsMetadata.last.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.last.partitionsMetadata.head.error)
     val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
     val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
@@ -111,7 +111,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     val topic = "testAutoCreateTopic"
     var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
       2000,0).topicsMetadata
-    assertEquals(Errors.LEADER_NOT_AVAILABLE.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
     assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
@@ -123,8 +123,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // retry the metadata for the auto created topic
     topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
-    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
-    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
     val partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
@@ -149,7 +149,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     val topic = "testAutoCreateTopic"
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
       2000, 0).topicsMetadata
-    assertEquals(Errors.INVALID_REPLICATION_FACTOR.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicsMetadata.head.error)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
     assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
@@ -166,9 +166,9 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
       2000, 0).topicsMetadata
     assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
     assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
-    assertEquals(Errors.LEADER_NOT_AVAILABLE.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
     assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
-    assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION.code, topicsMetadata(1).errorCode)
+    assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION, topicsMetadata(1).error)
 
     // wait for leader to be elected
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0)
@@ -177,8 +177,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // retry the metadata for the first auto created topic
     topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000, 0).topicsMetadata
-    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
-    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE, topicsMetadata.head.error)
+    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
     var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 761cac8..ff573bc 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -153,7 +153,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
+        response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error.code))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case _: OffsetOutOfRangeException => // This is good.
@@ -169,7 +169,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
+        response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error.code))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case _: UnknownTopicOrPartitionException => // This is good.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 6358bdc..de0f901 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -395,12 +395,12 @@ class AsyncProducerTest {
     val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
       correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val response1 = ProducerResponse(0,
-      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION.code, 0L)),
-          (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE.code, 0L))))
+      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION, 0L)),
+          (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE, 0L))))
     val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21,
       timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val response2 = ProducerResponse(0,
-      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE.code, 0L))))
+      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     // don't care about config mock
     EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index c20aab3..189e21b 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -115,8 +115,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
     val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
 
-    assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
-    assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
+    assertEquals(1, response1.status.count(_._2.error != Errors.NONE))
+    assertEquals(Errors.MESSAGE_TOO_LARGE, response1.status(TopicAndPartition("test", 0)).error)
     assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
 
     val safeSize = configs.head.messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
@@ -124,8 +124,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
     val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
 
-    assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
-    assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error)
+    assertEquals(1, response1.status.count(_._2.error != Errors.NONE))
+    assertEquals(Errors.NONE, response2.status(TopicAndPartition("test", 0)).error)
     assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
   }
 
@@ -174,7 +174,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     assertEquals(3, response.status.size)
     response.status.values.foreach {
       case ProducerResponseStatus(error, nextOffset, timestamp) =>
-        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, error)
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
         assertEquals(-1L, nextOffset)
         assertEquals(Message.NoTimestamp, timestamp)
     }
@@ -191,13 +191,13 @@ class SyncProducerTest extends KafkaServerTestHarness {
     assertEquals(3, response2.status.size)
 
     // the first and last message should have been accepted by broker
-    assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic1", 0)).error)
-    assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic3", 0)).error)
+    assertEquals(Errors.NONE, response2.status(TopicAndPartition("topic1", 0)).error)
+    assertEquals(Errors.NONE, response2.status(TopicAndPartition("topic3", 0)).error)
     assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
     assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
 
     // the middle message should have been rejected because broker doesn't lead partition
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
                         response2.status(TopicAndPartition("topic2", 0)).error)
     assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
   }
@@ -262,6 +262,6 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val response = producer.send(produceRequest(topicName, 0,
       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
 
-    assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName, 0)).error)
+    assertEquals(Errors.NOT_ENOUGH_REPLICAS, response.status(TopicAndPartition(topicName, 0)).error)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 32e69c7..ef98531 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -126,7 +126,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     val metadata = sendMetadataRequest(
       new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
-    assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error() == Errors.NONE))
+    assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE))
   }
 
   protected def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 625ff6c..f279c85 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -92,7 +92,7 @@ class AbstractFetcherThreadTest {
   }
 
   class TestPartitionData(records: MemoryRecords = MemoryRecords.EMPTY) extends PartitionData {
-    override def errorCode: Short = Errors.NONE.code
+    override def error: Errors = Errors.NONE
 
     override def toRecords: MemoryRecords = records
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 8f6faf9..ffe82d1 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -54,7 +54,7 @@ class ApiVersionsRequestTest extends BaseRequestTest {
     val apiVersionsRequest = new ApiVersionsRequest(
       new Struct(ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id)), Short.MaxValue)
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
-    assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+    assertEquals(Errors.UNSUPPORTED_VERSION, apiVersionsResponse.error)
   }
 
   private def sendApiVersionsRequest(request: ApiVersionsRequest): ApiVersionsResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 92546a6..9a092d0 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -107,7 +107,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
   private def validateTopicIsDeleted(topic: String): Unit = {
     val metadata = sendMetadataRequest(new MetadataRequest.
         Builder(List(topic).asJava).build).topicMetadata.asScala
-    TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error() == Errors.NONE),
+    TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE),
       s"The topic $topic should not exist")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index c18d949..3811360 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -125,7 +125,7 @@ class FetchRequestTest extends BaseRequestTest {
     }.sum
     assertTrue(responseSize3 <= maxResponseBytes)
     val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1)
-    assertEquals(Errors.NONE.code, partitionData3.errorCode)
+    assertEquals(Errors.NONE, partitionData3.error)
     assertTrue(partitionData3.highWatermark > 0)
     val size3 = logEntries(partitionData3).map(_.sizeInBytes).sum
     assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 <= maxResponseBytes)
@@ -143,7 +143,7 @@ class FetchRequestTest extends BaseRequestTest {
     }
     assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
     val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2)
-    assertEquals(Errors.NONE.code, partitionData4.errorCode)
+    assertEquals(Errors.NONE, partitionData4.error)
     assertTrue(partitionData4.highWatermark > 0)
     val size4 = logEntries(partitionData4).map(_.sizeInBytes).sum
     assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > maxResponseBytes)
@@ -161,7 +161,7 @@ class FetchRequestTest extends BaseRequestTest {
       setVersion(2)
     val fetchResponse = sendFetchRequest(leaderId, fetchRequestBuilder.build())
     val partitionData = fetchResponse.responseData.get(topicPartition)
-    assertEquals(Errors.NONE.code, partitionData.errorCode)
+    assertEquals(Errors.NONE, partitionData.error)
     assertTrue(partitionData.highWatermark > 0)
     assertEquals(maxPartitionBytes, partitionData.records.sizeInBytes)
     assertEquals(0, logEntries(partitionData).map(_.sizeInBytes).sum)
@@ -180,7 +180,7 @@ class FetchRequestTest extends BaseRequestTest {
 
     expectedPartitions.foreach { tp =>
       val partitionData = fetchResponse.responseData.get(tp)
-      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertEquals(Errors.NONE, partitionData.error)
       assertTrue(partitionData.highWatermark > 0)
 
       val records = partitionData.records

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 0ceb71b..4eeb515 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -162,7 +162,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
   private def staleControllerEpochCallback(response: AbstractResponse): Unit = {
     val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
-    staleControllerEpochDetected = Errors.forCode(leaderAndIsrResponse.errorCode) match {
+    staleControllerEpochDetected = leaderAndIsrResponse.error match {
       case Errors.STALE_CONTROLLER_EPOCH => true
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index f056476..4075a07 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -71,7 +71,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val request = OffsetRequest(
       Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
     val offsetResponse = simpleConsumer.getOffsetsBefore(request)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
                  offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 355acde..ebfbe89 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -86,13 +86,13 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset = 42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicAndPartition).get)
 
     // Fetch it and verify
     val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition))
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(topicAndPartition).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(topicAndPartition).get.error)
     assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
 
@@ -103,13 +103,13 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     )))
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
-    assertEquals(Errors.NONE.code, commitResponse1.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.NONE, commitResponse1.commitStatus.get(topicAndPartition).get)
 
     // Fetch it and verify
     val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition))
     val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1)
 
-    assertEquals(Errors.NONE.code, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
+    assertEquals(Errors.NONE, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
     assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
 
@@ -142,10 +142,10 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L)
     ))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get)
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2, 1)).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic2, 1)).get)
 
     val fetchRequest = OffsetFetchRequest(group, Seq(
       TopicAndPartition(topic1, 0),
@@ -158,19 +158,19 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     ))
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error)
 
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
 
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
 
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
 
-    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
+    assertEquals(Errors.NONE, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
 
     assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
@@ -205,7 +205,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     )))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicAndPartition).get)
 
     val commitRequest1 = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(
       offset=42L,
@@ -213,7 +213,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     )))
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
-    assertEquals(Errors.OFFSET_METADATA_TOO_LARGE.code, commitResponse1.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.OFFSET_METADATA_TOO_LARGE, commitResponse1.commitStatus.get(topicAndPartition).get)
   }
 
   @Test
@@ -232,7 +232,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata")),
       versionId = 0
     )
-    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
     // committed offset should exist with fetch version 0
@@ -248,7 +248,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(2L, "metadata", -1L)),
       versionId = 1
     )
-    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -259,7 +259,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", Time.SYSTEM.milliseconds - 2*24*60*60*1000L)),
       versionId = 1
     )
-    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -271,7 +271,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       versionId = 2,
       retentionMs = 1000 * 60 * 60L
     )
-    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(4L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -283,7 +283,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       versionId = 2,
       retentionMs = 0L
     )
-    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -303,8 +303,8 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     ))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
   }
 
   @Test
@@ -317,7 +317,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val commitRequest = OffsetCommitRequest(group, immutable.Map(topicPartition -> OffsetAndMetadata(offset = 42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicPartition).get)
 
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d5075aa..0259bad 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -228,10 +228,10 @@ class ReplicaManagerTest {
           responseCallback = produceCallback)
       
       var fetchCallbackFired = false
-      var fetchError = 0
+      var fetchError = Errors.NONE
       var fetchedRecords: Records = null
       def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
-        fetchError = responseStatus.map(_._2).head.error.code
+        fetchError = responseStatus.map(_._2).head.error
         fetchedRecords = responseStatus.map(_._2).head.records
         fetchCallbackFired = true
       }
@@ -248,7 +248,7 @@ class ReplicaManagerTest {
         
       
       assertTrue(fetchCallbackFired)
-      assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+      assertEquals("Should not give an exception", Errors.NONE, fetchError)
       assertTrue("Should return some data", fetchedRecords.shallowEntries.iterator.hasNext)
       fetchCallbackFired = false
       
@@ -263,7 +263,7 @@ class ReplicaManagerTest {
         responseCallback = fetchCallback)
           
         assertTrue(fetchCallbackFired)
-        assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+        assertEquals("Should not give an exception", Errors.NONE, fetchError)
         assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
     } finally {
       rm.shutdown(checkpointHW = false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 07e03e3..927ace9 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -75,7 +75,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
       val apiVersionsRequest = new ApiVersionsRequest(
         new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, 0)), Short.MaxValue);
       val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, apiVersionsRequest)
-      assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+      assertEquals(Errors.UNSUPPORTED_VERSION, apiVersionsResponse.error)
       val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket,
           new ApiVersionsRequest.Builder().setVersion(0).build())
       ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2)
@@ -93,7 +93,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
     val response = send(new SaslHandshakeRequest("PLAIN"), ApiKeys.SASL_HANDSHAKE, socket)
     val handshakeResponse = SaslHandshakeResponse.parse(response)
-    assertEquals(Errors.NONE.code, handshakeResponse.errorCode())
+    assertEquals(Errors.NONE, handshakeResponse.error)
     assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms())
   }
 }


Mime
View raw message