Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC299200C3A for ; Fri, 10 Feb 2017 06:21:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DACA2160B50; Fri, 10 Feb 2017 05:21:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7F625160B64 for ; Fri, 10 Feb 2017 06:21:50 +0100 (CET) Received: (qmail 74676 invoked by uid 500); 10 Feb 2017 05:21:49 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 74632 invoked by uid 99); 10 Feb 2017 05:21:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Feb 2017 05:21:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2EF17DFA3D; Fri, 10 Feb 2017 05:21:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Date: Fri, 10 Feb 2017 05:21:51 -0000 Message-Id: <6942c99500424d07992e1b540c3b5342@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] kafka git commit: MINOR: Use an explicit `Errors` object when possible instead of a numeric error code archived-at: Fri, 10 Feb 2017 05:21:53 -0000 http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index ecaade2..b27802d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -138,7 +138,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -163,7 +163,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -201,7 +201,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 1); - client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -246,7 +246,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -265,8 +265,8 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 1); - client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); - client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -309,7 +309,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(records, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(records, Errors.NONE, 100L, 0)); consumerClient.poll(0); consumerRecords = fetcher.fetchedRecords().get(tp); assertEquals(3, consumerRecords.size()); @@ -369,7 +369,7 @@ public class FetcherTest { assertFalse(fetcher.hasCompletedFetches()); MemoryRecords partialRecord = MemoryRecords.readableRecords( ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0})); - client.prepareResponse(fetchResponse(partialRecord, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(partialRecord, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); } @@ -381,7 +381,7 @@ public class FetcherTest { // resize the limit of the buffer to pretend it is only fetch-size large assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -401,7 +401,7 @@ public class FetcherTest { // Now the rebalance happens and fetch positions are cleared subscriptions.assignFromSubscribed(singleton(tp)); - client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); // The active fetch should be ignored since its position is no longer valid @@ -416,7 +416,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); subscriptions.pause(tp); - client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertNull(fetcher.fetchedRecords().get(tp)); } @@ -437,7 +437,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -449,7 +449,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -461,7 +461,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertTrue(subscriptions.isOffsetResetNeeded(tp)); @@ -476,7 +476,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); subscriptions.seek(tp, 1); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -490,7 +490,7 @@ public class FetcherTest { subscriptionsNoAutoReset.seek(tp, 0); assertTrue(fetcherNoAutoReset.sendFetches() > 0); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp)); subscriptionsNoAutoReset.seek(tp, 2); @@ -503,7 +503,7 @@ public class FetcherTest { subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.sendFetches(); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp)); @@ -523,7 +523,7 @@ public class FetcherTest { subscriptions.seek(tp, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true); + client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0), true); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -743,7 +743,7 @@ public class FetcherTest { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); for (int v = 0; v < 3; v++) builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); - List> records = fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(tp); + List> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp); assertEquals(3, records.size()); } @@ -772,7 +772,7 @@ public class FetcherTest { assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse - fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0); + fetchRecords(MemoryRecords.EMPTY, Errors.NONE, 100L, 0); assertEquals(100, recordsFetchLagMax.value(), EPSILON); KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); @@ -782,7 +782,7 @@ public class FetcherTest { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); for (int v = 0; v < 3; v++) builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); - fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0); + fetchRecords(builder.build(), Errors.NONE, 200L, 0); assertEquals(197, recordsFetchLagMax.value(), EPSILON); // verify de-registration of partition lag @@ -790,7 +790,7 @@ public class FetcherTest { assertFalse(allMetrics.containsKey(partitionLagMetric)); } - private Map>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) { + private Map>> fetchRecords(MemoryRecords records, Errors error, long hw, int throttleTime) { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fetchResponse(records, error, hw, throttleTime)); consumerClient.poll(0); @@ -882,13 +882,13 @@ public class FetcherTest { } private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) { - ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset); + ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset); Map allPartitionData = new HashMap<>(); allPartitionData.put(tp, partitionData); return new ListOffsetResponse(allPartitionData, 1); } - private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) { + private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) { return new FetchResponse( new LinkedHashMap<>(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records))), throttleTime); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 7f5fe15..6e054d0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -99,7 +99,7 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); - client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); + client.respond(produceResponse(tp, offset, Errors.NONE, 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -114,9 +114,9 @@ public class SenderTest { public void testQuotaMetrics() throws Exception { final long offset = 0; for (int i = 1; i <= 3; i++) { - Future future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT); sender.run(time.milliseconds()); // send produce request - client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); + client.respond(produceResponse(tp, offset, Errors.NONE, 100 * i)); sender.run(time.milliseconds()); } Map allMetrics = metrics.metrics(); @@ -158,7 +158,7 @@ public class SenderTest { sender.run(time.milliseconds()); // resend assertEquals(1, client.inFlightRequestCount()); long offset = 0; - client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0)); + client.respond(produceResponse(tp, offset, Errors.NONE, 0)); sender.run(time.milliseconds()); assertTrue("Request should have retried and completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -239,7 +239,7 @@ public class SenderTest { assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic())); metadata.update(cluster, time.milliseconds()); sender.run(time.milliseconds()); // send produce request - client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0)); + client.respond(produceResponse(tp, offset++, Errors.NONE, 0)); sender.run(time.milliseconds()); assertEquals("Request completed.", 0, client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -254,7 +254,7 @@ public class SenderTest { assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic())); metadata.update(cluster, time.milliseconds()); sender.run(time.milliseconds()); // send produce request - client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0)); + client.respond(produceResponse(tp, offset++, Errors.NONE, 0)); sender.run(time.milliseconds()); assertEquals("Request completed.", 0, client.inFlightRequestCount()); sender.run(time.milliseconds()); @@ -271,9 +271,8 @@ public class SenderTest { } } - private ProduceResponse produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.forCode((short) error), - offset, Record.NO_TIMESTAMP); + private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { + ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, Record.NO_TIMESTAMP); Map partResp = Collections.singletonMap(tp, resp); return new ProduceResponse(partResp, throttleTimeMs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 69f6276..699f6e2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -204,7 +204,7 @@ public class RequestResponseTest { LinkedHashMap responseData = new LinkedHashMap<>(); MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10)); - responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records)); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records)); FetchResponse v0Response = new FetchResponse(0, responseData, 0); FetchResponse v1Response = new FetchResponse(1, responseData, 10); @@ -252,7 +252,7 @@ public class RequestResponseTest { response.writeTo(buffer); buffer.rewind(); ControlledShutdownResponse deserialized = ControlledShutdownResponse.parse(buffer); - assertEquals(response.errorCode(), deserialized.errorCode()); + assertEquals(response.error(), deserialized.error()); assertEquals(response.partitionsRemaining(), deserialized.partitionsRemaining()); } @@ -287,7 +287,7 @@ public class RequestResponseTest { } private GroupCoordinatorResponse createGroupCoordinatorResponse() { - return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); + return new GroupCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014)); } private FetchRequest createFetchRequest(int version) { @@ -301,7 +301,7 @@ public class RequestResponseTest { private FetchResponse createFetchResponse() { LinkedHashMap responseData = new LinkedHashMap<>(); MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10)); - responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records)); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records)); return new FetchResponse(responseData, 25); } @@ -310,7 +310,7 @@ public class RequestResponseTest { } private HeartbeatResponse createHeartBeatResponse() { - return new HeartbeatResponse(Errors.NONE.code()); + return new HeartbeatResponse(Errors.NONE); } @SuppressWarnings("deprecation") @@ -331,7 +331,7 @@ public class RequestResponseTest { Map members = new HashMap<>(); members.put("consumer1", ByteBuffer.wrap(new byte[]{})); members.put("consumer2", ByteBuffer.wrap(new byte[]{})); - return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members); + return new JoinGroupResponse(Errors.NONE, 1, "range", "consumer1", "leader", members); } private ListGroupsRequest createListGroupsRequest() { @@ -340,7 +340,7 @@ public class RequestResponseTest { private ListGroupsResponse createListGroupsResponse() { List groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer")); - return new ListGroupsResponse(Errors.NONE.code(), groups); + return new ListGroupsResponse(Errors.NONE, groups); } private DescribeGroupsRequest createDescribeGroupRequest() { @@ -353,7 +353,7 @@ public class RequestResponseTest { ByteBuffer empty = ByteBuffer.allocate(0); DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId", clientId, clientHost, empty, empty); - DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(), + DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE, "STABLE", "consumer", "roundrobin", Arrays.asList(member)); return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata)); } @@ -363,7 +363,7 @@ public class RequestResponseTest { } private LeaveGroupResponse createLeaveGroupResponse() { - return new LeaveGroupResponse(Errors.NONE.code()); + return new LeaveGroupResponse(Errors.NONE); } @SuppressWarnings("deprecation") @@ -386,11 +386,11 @@ public class RequestResponseTest { private ListOffsetResponse createListOffsetResponse(int version) { if (version == 0) { Map responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); + responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE, Arrays.asList(100L))); return new ListOffsetResponse(responseData); } else if (version == 1) { Map responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), 10000L, 100L)); + responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L)); return new ListOffsetResponse(responseData, 1); } else { throw new IllegalArgumentException("Illegal ListOffsetResponse version " + version); @@ -430,8 +430,8 @@ public class RequestResponseTest { } private OffsetCommitResponse createOffsetCommitResponse() { - Map responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), Errors.NONE.code()); + Map responseData = new HashMap<>(); + responseData.put(new TopicPartition("test", 0), Errors.NONE); return new OffsetCommitResponse(responseData); } @@ -467,9 +467,9 @@ public class RequestResponseTest { } private StopReplicaResponse createStopReplicaResponse() { - Map responses = new HashMap<>(); - responses.put(new TopicPartition("test", 0), Errors.NONE.code()); - return new StopReplicaResponse(Errors.NONE.code(), responses); + Map responses = new HashMap<>(); + responses.put(new TopicPartition("test", 0), Errors.NONE); + return new StopReplicaResponse(Errors.NONE, responses); } private ControlledShutdownRequest createControlledShutdownRequest() { @@ -481,7 +481,7 @@ public class RequestResponseTest { new TopicPartition("test2", 5), new TopicPartition("test1", 10) )); - return new ControlledShutdownResponse(Errors.NONE.code(), topicPartitions); + return new ControlledShutdownResponse(Errors.NONE, topicPartitions); } private LeaderAndIsrRequest createLeaderAndIsrRequest() { @@ -504,9 +504,9 @@ public class RequestResponseTest { } private LeaderAndIsrResponse createLeaderAndIsrResponse() { - Map responses = new HashMap<>(); - responses.put(new TopicPartition("test", 0), Errors.NONE.code()); - return new LeaderAndIsrResponse(Errors.NONE.code(), responses); + Map responses = new HashMap<>(); + responses.put(new TopicPartition("test", 0), Errors.NONE); + return new LeaderAndIsrResponse(Errors.NONE, responses); } private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) { @@ -545,7 +545,7 @@ public class RequestResponseTest { } private UpdateMetadataResponse createUpdateMetadataResponse() { - return new UpdateMetadataResponse(Errors.NONE.code()); + return new UpdateMetadataResponse(Errors.NONE); } private SaslHandshakeRequest createSaslHandshakeRequest() { @@ -553,7 +553,7 @@ public class RequestResponseTest { } private SaslHandshakeResponse createSaslHandshakeResponse() { - return new SaslHandshakeResponse(Errors.NONE.code(), singletonList("GSSAPI")); + return new SaslHandshakeResponse(Errors.NONE, singletonList("GSSAPI")); } private ApiVersionsRequest createApiVersionRequest() { @@ -562,7 +562,7 @@ public class RequestResponseTest { private ApiVersionsResponse createApiVersionResponse() { List apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2)); - return new ApiVersionsResponse(Errors.NONE.code(), apiVersions); + return new ApiVersionsResponse(Errors.NONE, apiVersions); } private CreateTopicsRequest createCreateTopicRequest(int version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index bc967af..76fb9b3 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -393,7 +393,7 @@ public class SaslAuthenticatorTest { ByteBuffer responseBuffer = waitForResponse(); ResponseHeader.parse(responseBuffer); ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer); - assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.errorCode()); + assertEquals(Errors.UNSUPPORTED_VERSION, response.error()); // Send ApiVersionsRequest with a supported version. This should succeed. sendVersionRequestReceiveResponse(node); @@ -838,14 +838,14 @@ public class SaslAuthenticatorTest { private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception { SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN"); SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); - assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(Errors.NONE, response.error()); return response; } private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception { ApiVersionsRequest handshakeRequest = new ApiVersionsRequest.Builder().build(); ApiVersionsResponse response = (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest); - assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(Errors.NONE, response.error()); return response; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 92393a1..680f4ca 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -202,14 +202,14 @@ public class WorkerCoordinatorTest { final String consumerId = "leader"; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal join group Map memberConfigOffsets = new HashMap<>(); memberConfigOffsets.put("leader", 1L); memberConfigOffsets.put("member", 1L); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -219,7 +219,7 @@ public class WorkerCoordinatorTest { sync.groupAssignment().containsKey(consumerId); } }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1), - Collections.emptyList(), Errors.NONE.code())); + Collections.emptyList(), Errors.NONE)); coordinator.ensureActiveGroup(); assertFalse(coordinator.needRejoin()); @@ -242,11 +242,11 @@ public class WorkerCoordinatorTest { final String memberId = "member"; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal join group - client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -256,7 +256,7 @@ public class WorkerCoordinatorTest { sync.groupAssignment().isEmpty(); } }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.emptyList(), - Collections.singletonList(taskId1x0), Errors.NONE.code())); + Collections.singletonList(taskId1x0), Errors.NONE)); coordinator.ensureActiveGroup(); assertFalse(coordinator.needRejoin()); @@ -283,11 +283,11 @@ public class WorkerCoordinatorTest { final String memberId = "member"; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // config mismatch results in assignment error - client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE)); MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -298,10 +298,10 @@ public class WorkerCoordinatorTest { } }; client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L, - Collections.emptyList(), Collections.emptyList(), Errors.NONE.code())); - client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); + Collections.emptyList(), Collections.emptyList(), Errors.NONE)); + client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE)); client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, - Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE.code())); + Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE)); coordinator.ensureActiveGroup(); PowerMock.verifyAll(); @@ -314,13 +314,13 @@ public class WorkerCoordinatorTest { PowerMock.replayAll(); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // join the group once - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.emptyList(), - Collections.singletonList(taskId1x0), Errors.NONE.code())); + Collections.singletonList(taskId1x0), Errors.NONE)); coordinator.ensureActiveGroup(); assertEquals(0, rebalanceListener.revokedCount); @@ -332,9 +332,9 @@ public class WorkerCoordinatorTest { // and join the group again coordinator.requestRejoin(); - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1), - Collections.emptyList(), Errors.NONE.code())); + Collections.emptyList(), Errors.NONE)); coordinator.ensureActiveGroup(); assertEquals(1, rebalanceListener.revokedCount); @@ -459,12 +459,12 @@ public class WorkerCoordinatorTest { } - private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) { + private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { return new GroupCoordinatorResponse(error, node); } private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId, - Map configOffsets, short error) { + Map configOffsets, Errors error) { Map metadata = new HashMap<>(); for (Map.Entry configStateEntry : configOffsets.entrySet()) { // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID @@ -476,13 +476,13 @@ public class WorkerCoordinatorTest { return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata); } - private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { + private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) { return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId, Collections.emptyMap()); } private SyncGroupResponse syncGroupResponse(short assignmentError, String leader, long configOffset, List connectorIds, - List taskIds, short error) { + List taskIds, Errors error) { ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds); ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment); return new SyncGroupResponse(error, buf); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 13b6571..25e64eb 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -69,19 +69,19 @@ class AdminClient(val time: Time, def findCoordinator(groupId: String): Node = { val requestBuilder = new GroupCoordinatorRequest.Builder(groupId) val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse] - Errors.forCode(response.errorCode).maybeThrow() + response.error.maybeThrow() response.node } def listGroups(node: Node): List[GroupOverview] = { val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse] - Errors.forCode(response.errorCode).maybeThrow() + response.error.maybeThrow() response.groups.asScala.map(group => GroupOverview(group.groupId, group.protocolType)).toList } def getApiVersions(node: Node): List[ApiVersion] = { val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse] - Errors.forCode(response.errorCode).maybeThrow() + response.error.maybeThrow() response.apiVersions.asScala.toList } @@ -164,7 +164,7 @@ class AdminClient(val time: Time, if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group") - Errors.forCode(metadata.errorCode()).maybeThrow() + metadata.error.maybeThrow() val consumers = metadata.members.asScala.map { consumer => ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match { case "Stable" => http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index c3d9f24..11f4f89 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -318,10 +318,10 @@ object ConsumerGroupCommand extends Logging { case z: ZkNoNodeException => printError(s"Could not fetch offset from zookeeper for group '$group' partition '$topicAndPartition' due to missing offset data in zookeeper.", Some(z)) } - case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE.code => + case offsetAndMetaData if offsetAndMetaData.error == Errors.NONE => offsetMap.put(topicAndPartition, offsetAndMetadata.offset) case _ => - printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${Errors.forCode(offsetAndMetadata.error).exception}.") + printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${offsetAndMetadata.error.message}.") } } channel.disconnect() http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 52c8828..46ae1e7 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -67,8 +67,8 @@ case class ControlledShutdownRequest(versionId: Short, describe(true) } - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e).code, Set.empty[TopicAndPartition]) + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition]) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala index 1ba5cfa..e0a03e8 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -26,7 +26,7 @@ import collection.Set object ControlledShutdownResponse { def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = { val correlationId = buffer.getInt - val errorCode = buffer.getShort + val error = Errors.forCode(buffer.getShort) val numEntries = buffer.getInt var partitionsRemaining = Set[TopicAndPartition]() @@ -35,13 +35,13 @@ object ControlledShutdownResponse { val partition = buffer.getInt partitionsRemaining += new TopicAndPartition(topic, partition) } - new ControlledShutdownResponse(correlationId, errorCode, partitionsRemaining) + new ControlledShutdownResponse(correlationId, error, partitionsRemaining) } } case class ControlledShutdownResponse(correlationId: Int, - errorCode: Short = Errors.NONE.code, + error: Errors = Errors.NONE, partitionsRemaining: Set[TopicAndPartition]) extends RequestOrResponse() { def sizeInBytes(): Int ={ @@ -59,7 +59,7 @@ case class ControlledShutdownResponse(correlationId: Int, def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) - buffer.putShort(errorCode) + buffer.putShort(error.code) buffer.putInt(partitionsRemaining.size) for (topicAndPartition:TopicAndPartition <- partitionsRemaining){ writeShortString(buffer, topicAndPartition.topic) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 5e5360a..97da1f5 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -203,7 +203,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, val responseData = new util.LinkedHashMap[TopicPartition, JFetchResponse.PartitionData] requestInfo.foreach { case (TopicAndPartition(topic, partition), _) => responseData.put(new TopicPartition(topic, partition), - new JFetchResponse.PartitionData(Errors.forException(e).code, -1, MemoryRecords.EMPTY)) + new JFetchResponse.PartitionData(Errors.forException(e), -1, MemoryRecords.EMPTY)) } val errorResponse = new JFetchResponse(versionId, responseData, 0) // Magic value does not matter here because the message set is empty http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index cd346cb..0f677f1 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -28,7 +28,7 @@ import scala.collection._ object FetchResponsePartitionData { def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { - val error = buffer.getShort + val error = Errors.forCode(buffer.getShort) val hw = buffer.getLong val messageSetSize = buffer.getInt val messageSetBuffer = buffer.slice() @@ -43,7 +43,7 @@ object FetchResponsePartitionData { 4 /* messageSetSize */ } -case class FetchResponsePartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, messages: MessageSet) { +case class FetchResponsePartitionData(error: Errors = Errors.NONE, hw: Long = -1L, messages: MessageSet) { val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes } @@ -166,7 +166,7 @@ case class FetchResponse(correlationId: Int, def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw - def hasError = dataByTopicAndPartition.values.exists(_.error != Errors.NONE.code) + def hasError = dataByTopicAndPartition.values.exists(_.error != Errors.NONE) - def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error + def error(topic: String, partition: Int) = partitionDataFor(topic, partition).error } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala index 5f88136..2082e44 100644 --- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala +++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala @@ -64,7 +64,7 @@ case class GroupCoordinatorRequest(group: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { // return ConsumerCoordinatorNotAvailable for all uncaught errors - val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, correlationId) + val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, correlationId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala index 83ba96e..782eebb 100644 --- a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala +++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala @@ -28,19 +28,19 @@ object GroupCoordinatorResponse { def readFrom(buffer: ByteBuffer) = { val correlationId = buffer.getInt - val errorCode = buffer.getShort + val error = Errors.forCode(buffer.getShort) val broker = BrokerEndPoint.readFrom(buffer) - val coordinatorOpt = if (errorCode == Errors.NONE.code) + val coordinatorOpt = if (error == Errors.NONE) Some(broker) else None - GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId) + GroupCoordinatorResponse(coordinatorOpt, error, correlationId) } } -case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int) +case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], error: Errors, correlationId: Int) extends RequestOrResponse() { def sizeInBytes = @@ -50,7 +50,7 @@ case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], err def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) - buffer.putShort(errorCode) + buffer.putShort(error.code) coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 95663af..b9693f6 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -161,8 +161,8 @@ case class OffsetCommitRequest(groupId: String, }) override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorCode = Errors.forException(e).code - val commitStatus = requestInfo.mapValues(_ => errorCode) + val error = Errors.forException(e) + val commitStatus = requestInfo.mapValues(_ => error) val commitResponse = OffsetCommitResponse(commitStatus, correlationId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse))) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetCommitResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 94223c7..ace480a 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -34,7 +34,7 @@ object OffsetCommitResponse extends Logging { val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partitionId = buffer.getInt - val error = buffer.getShort + val error = Errors.forCode(buffer.getShort) (TopicAndPartition(topic, partitionId), error) }) }) @@ -42,13 +42,13 @@ object OffsetCommitResponse extends Logging { } } -case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short], +case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Errors], correlationId: Int = 0) extends RequestOrResponse() { lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic) - def hasError = commitStatus.values.exists(_ != Errors.NONE.code) + def hasError = commitStatus.values.exists(_ != Errors.NONE) def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) @@ -56,9 +56,9 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short], commitStatusGroupedByTopic.foreach { case(topic, statusMap) => ApiUtils.writeShortString(buffer, topic) buffer.putInt(statusMap.size) // partition count - statusMap.foreach { case(topicAndPartition, errorCode) => + statusMap.foreach { case(topicAndPartition, error) => buffer.putInt(topicAndPartition.partition) - buffer.putShort(errorCode) + buffer.putShort(error.code) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index dac4cc5..33d3795 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -97,13 +97,13 @@ case class OffsetFetchRequest(groupId: String, val responseMap = if (requestVersion < 2) { requestInfo.map { - topicAndPartition => (topicAndPartition, OffsetMetadataAndError(thrownError.code)) + topicAndPartition => (topicAndPartition, OffsetMetadataAndError(thrownError)) }.toMap } else { Map[kafka.common.TopicAndPartition, kafka.common.OffsetMetadataAndError]() } - val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, errorCode=thrownError.code) + val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, error=thrownError) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetFetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index 791cd6c..0691503 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -41,24 +41,24 @@ object OffsetFetchResponse extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val metadata = readShortString(buffer) - val error = buffer.getShort + val error = Errors.forCode(buffer.getShort) (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error)) }) }) - val errorCode = requestVersion match { - case 0 | 1 => Errors.NONE.code - case _ => buffer.getShort + val error = requestVersion match { + case 0 | 1 => Errors.NONE + case _ => Errors.forCode(buffer.getShort) } - OffsetFetchResponse(Map(pairs:_*), requestVersion, correlationId, errorCode) + OffsetFetchResponse(Map(pairs:_*), requestVersion, correlationId, error) } } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], requestVersion: Int = OffsetFetchRequest.CurrentVersion, correlationId: Int = 0, - errorCode: Short = Errors.NONE.code) + error: Errors = Errors.NONE) extends RequestOrResponse() { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) @@ -73,13 +73,13 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat buffer.putInt(t2._1.partition) buffer.putLong(t2._2.offset) writeShortString(buffer, t2._2.metadata) - buffer.putShort(t2._2.error) + buffer.putShort(t2._2.error.code) }) }) // the top level error_code was introduced in v2 if (requestVersion > 1) - buffer.putShort(errorCode) + buffer.putShort(error.code) } override def sizeInBytes = http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 416dd73..879d60d 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -115,7 +115,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) => - (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil)) + (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), Nil)) } val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/OffsetResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index b767c08..8d23d41 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -33,7 +33,7 @@ object OffsetResponse { val numPartitions = buffer.getInt (1 to numPartitions).map(_ => { val partition = buffer.getInt - val error = buffer.getShort + val error = Errors.forCode(buffer.getShort) val numOffsets = buffer.getInt val offsets = (1 to numOffsets).map(_ => buffer.getLong) (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets)) @@ -45,9 +45,9 @@ object OffsetResponse { } -case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) { +case class PartitionOffsetsResponse(error: Errors, offsets: Seq[Long]) { override def toString: String = { - new String("error: " + Errors.forCode(error).exceptionName + " offsets: " + offsets.mkString) + new String("error: " + error.exceptionName + " offsets: " + offsets.mkString) } } @@ -58,7 +58,7 @@ case class OffsetResponse(correlationId: Int, lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic) - def hasError = partitionErrorAndOffsets.values.exists(_.error != Errors.NONE.code) + def hasError = partitionErrorAndOffsets.values.exists(_.error != Errors.NONE) val sizeInBytes = { 4 + /* correlation id */ @@ -88,7 +88,7 @@ case class OffsetResponse(correlationId: Int, errorAndOffsetsMap.foreach { case((TopicAndPartition(_, partition), errorAndOffsets)) => buffer.putInt(partition) - buffer.putShort(errorAndOffsets.error) + buffer.putShort(errorAndOffsets.error.code) buffer.putInt(errorAndOffsets.offsets.size) // offset array length errorAndOffsets.offsets.foreach(buffer.putLong(_)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index f6e4475..a87cdc9 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -134,7 +134,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } else { val producerResponseStatus = data.map { case (topicAndPartition, _) => - (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp)) + (topicAndPartition, ProducerResponseStatus(Errors.forException(e), -1l, Message.NoTimestamp)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/ProducerResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 89d7538..b09c73b 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -35,7 +35,7 @@ object ProducerResponse { val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partition = buffer.getInt - val error = buffer.getShort + val error = Errors.forCode(buffer.getShort) val offset = buffer.getLong val timestamp = buffer.getLong (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset, timestamp)) @@ -47,7 +47,7 @@ object ProducerResponse { } } -case class ProducerResponseStatus(var error: Short, offset: Long, timestamp: Long = Message.NoTimestamp) +case class ProducerResponseStatus(var error: Errors, offset: Long, timestamp: Long = Message.NoTimestamp) case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus], @@ -60,7 +60,7 @@ case class ProducerResponse(correlationId: Int, */ private lazy val statusGroupedByTopic = status.groupBy(_._1.topic) - def hasError = status.values.exists(_.error != Errors.NONE.code) + def hasError = status.values.exists(_.error != Errors.NONE) val sizeInBytes = { val throttleTimeSize = if (requestVersion > 0) 4 else 0 @@ -93,7 +93,7 @@ case class ProducerResponse(correlationId: Int, errorsAndOffsets.foreach { case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset, timestamp))) => buffer.putInt(partition) - buffer.putShort(error) + buffer.putShort(error.code) buffer.putLong(nextOffset) buffer.putLong(timestamp) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/TopicMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 7b05a8b..e4d730c 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -28,7 +28,7 @@ object TopicMetadata { val NoLeaderNodeId = -1 def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = { - val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) + val error = Errors.forCode(readShortInRange(buffer, "error code", (-1, Short.MaxValue))) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions) @@ -36,11 +36,11 @@ object TopicMetadata { val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers) partitionsMetadata(i) = partitionMetadata } - new TopicMetadata(topic, partitionsMetadata, errorCode) + new TopicMetadata(topic, partitionsMetadata, error) } } -case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = Errors.NONE.code) extends Logging { +case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], error: Errors = Errors.NONE) extends Logging { def sizeInBytes: Int = { 2 /* error code */ + shortStringLength(topic) + @@ -49,7 +49,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat def writeTo(buffer: ByteBuffer) { /* error code */ - buffer.putShort(errorCode) + buffer.putShort(error.code) /* topic */ writeShortString(buffer, topic) /* number of partitions */ @@ -60,10 +60,10 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat override def toString: String = { val topicMetadataInfo = new StringBuilder topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic)) - Errors.forCode(errorCode) match { + error match { case Errors.NONE => partitionsMetadata.foreach { partitionMetadata => - Errors.forCode(partitionMetadata.errorCode) match { + partitionMetadata.error match { case Errors.NONE => topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %s".format(topic, partitionMetadata.partitionId, partitionMetadata.toString())) @@ -89,7 +89,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): PartitionMetadata = { - val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) + val error = Errors.forCode(readShortInRange(buffer, "error code", (-1, Short.MaxValue))) val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ val leaderId = buffer.getInt val leader = brokers.get(leaderId) @@ -104,7 +104,7 @@ object PartitionMetadata { val isrIds = (0 until numIsr).map(_ => buffer.getInt) val isr = isrIds.map(brokers) - new PartitionMetadata(partitionId, leader, replicas, isr, errorCode) + new PartitionMetadata(partitionId, leader, replicas, isr, error) } } @@ -112,7 +112,7 @@ case class PartitionMetadata(partitionId: Int, leader: Option[BrokerEndPoint], replicas: Seq[BrokerEndPoint], isr: Seq[BrokerEndPoint] = Seq.empty, - errorCode: Short = Errors.NONE.code) extends Logging { + error: Errors = Errors.NONE) extends Logging { def sizeInBytes: Int = { 2 /* error code */ + 4 /* partition id */ + @@ -122,7 +122,7 @@ case class PartitionMetadata(partitionId: Int, } def writeTo(buffer: ByteBuffer) { - buffer.putShort(errorCode) + buffer.putShort(error.code) buffer.putInt(partitionId) /* leader */ http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/api/TopicMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index c64b268..403152a 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -61,7 +61,7 @@ case class TopicMetadataRequest(versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val topicMetadata = topics.map { - topic => TopicMetadata(topic, Nil, Errors.forException(e).code) + topic => TopicMetadata(topic, Nil, Errors.forException(e)) } val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index dbb8a76..5508549 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -168,7 +168,7 @@ object ClientUtils extends Logging{ val response = queryChannel.receive() val consumerMetadataResponse = GroupCoordinatorResponse.readFrom(response.payload()) debug("Consumer metadata response: " + consumerMetadataResponse.toString) - if (consumerMetadataResponse.errorCode == Errors.NONE.code) + if (consumerMetadataResponse.error == Errors.NONE) coordinatorOpt = consumerMetadataResponse.coordinatorOpt else { debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/common/ErrorMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index be5fed2..9f29073 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -30,14 +30,14 @@ import scala.Predef._ object ErrorMapping { val EmptyByteBuffer = ByteBuffer.allocate(0) - val UnknownCode : Short = -1 - val NoError : Short = 0 - val OffsetOutOfRangeCode : Short = 1 - val InvalidMessageCode : Short = 2 - val UnknownTopicOrPartitionCode : Short = 3 - val InvalidFetchSizeCode : Short = 4 - val LeaderNotAvailableCode : Short = 5 - val NotLeaderForPartitionCode : Short = 6 + val UnknownCode: Short = -1 + val NoError: Short = 0 + val OffsetOutOfRangeCode: Short = 1 + val InvalidMessageCode: Short = 2 + val UnknownTopicOrPartitionCode: Short = 3 + val InvalidFetchSizeCode: Short = 4 + val LeaderNotAvailableCode: Short = 5 + val NotLeaderForPartitionCode: Short = 6 val RequestTimedOutCode: Short = 7 val BrokerNotAvailableCode: Short = 8 val ReplicaNotAvailableCode: Short = 9 @@ -48,9 +48,9 @@ object ErrorMapping { val OffsetsLoadInProgressCode: Short = 14 val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 - val InvalidTopicCode : Short = 17 + val InvalidTopicCode: Short = 17 val MessageSetSizeTooLargeCode: Short = 18 - val NotEnoughReplicasCode : Short = 19 + val NotEnoughReplicasCode: Short = 19 val NotEnoughReplicasAfterAppendCode: Short = 20 // 21: InvalidRequiredAcks // 22: IllegalConsumerGeneration http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 4d9ae40..46c0881 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -53,7 +53,7 @@ object OffsetAndMetadata { def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata)) } -case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) { +case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors = Errors.NONE) { def offset = offsetMetadata.offset def metadata = offsetMetadata.metadata @@ -62,19 +62,19 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = } object OffsetMetadataAndError { - val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code) - val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS.code) - val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code) - val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code) - val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) - val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code) + val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE) + val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS) + val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID) + val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP) + val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION) + val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION) - def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE.code) + def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE) - def apply(error: Short) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error) + def apply(error: Errors) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error) - def apply(offset: Long, metadata: String, error: Short) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error) + def apply(offset: Long, metadata: String, error: Errors) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index f702b9d..2827a2f 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -26,6 +26,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition} import scala.collection.Map import ConsumerFetcherThread._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords class ConsumerFetcherThread(name: String, @@ -124,11 +125,11 @@ object ConsumerFetcherThread { } class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData { - def errorCode: Short = underlying.error + def error = underlying.error def toRecords: MemoryRecords = underlying.messages.asInstanceOf[ByteBufferMessageSet].asRecords def highWatermark: Long = underlying.hw def exception: Option[Throwable] = - if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode)) + if (error == Errors.NONE) None else Some(ErrorMapping.exceptionFor(error.code)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index f7e8bcb..e93f08c 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -26,6 +26,7 @@ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} import org.apache.kafka.common.network.{NetworkReceive} +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Utils._ /** @@ -187,8 +188,8 @@ class SimpleConsumer(val host: String, replicaId = consumerId) val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) val offset = partitionErrorAndOffset.error match { - case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head - case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error) + case Errors.NONE => partitionErrorAndOffset.offsets.head + case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error.code) } offset } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9bf0d20..fde4710 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -356,25 +356,25 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, trace("Offset commit response: %s.".format(offsetCommitResponse)) val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { - offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, error)) => - if (errorCode == Errors.NONE.code && config.dualCommitEnabled) { + if (error == Errors.NONE && config.dualCommitEnabled) { val offset = offsetsToCommit(topicPartition).offset commitOffsetToZooKeeper(topicPartition, offset) } (folded._1 || // update commitFailed - errorCode != Errors.NONE.code, + error != Errors.NONE, folded._2 || // update retryableIfFailed - (only metadata too large is not retryable) - (errorCode != Errors.NONE.code && errorCode != Errors.OFFSET_METADATA_TOO_LARGE.code), + (error != Errors.NONE && error != Errors.OFFSET_METADATA_TOO_LARGE), folded._3 || // update shouldRefreshCoordinator - errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code || - errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, + error == Errors.NOT_COORDINATOR_FOR_GROUP || + error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE, // update error count - folded._4 + (if (errorCode != Errors.NONE.code) 1 else 0)) + folded._4 + (if (error != Errors.NONE) 1 else 0)) } } debug(errorCount + " errors in offset commit response.") @@ -444,8 +444,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val (leaderChanged, loadInProgress) = offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) => - (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code), - folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code)) + (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP), + folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS)) } if (leaderChanged) { @@ -465,7 +465,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) => val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset val mostRecentOffset = zkOffset.max(kafkaOffset.offset) - (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE.code)) + (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE)) } Some(OffsetFetchResponse(mostRecentOffsets)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 823f9b4..fadc736 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -384,8 +384,8 @@ class TopicDeletionManager(controller: KafkaController, debug("Delete topic callback invoked for %s".format(stopReplicaResponse)) val responseMap = stopReplicaResponse.responses.asScala val partitionsInError = - if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet - else responseMap.filter { case (_, error) => error != Errors.NONE.code }.keySet + if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet + else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)) inLock(controllerContext.controllerLock) { // move all the failed replicas to ReplicaDeletionIneligible