kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/2] kafka git commit: KAFKA-3853; Extend OffsetFetch API to allow fetching all offsets for a consumer group (KIP-88)
Date Sat, 14 Jan 2017 01:19:44 GMT
KAFKA-3853; Extend OffsetFetch API to allow fetching all offsets for a consumer group (KIP-88)

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #2074 from vahidhashemian/KAFKA-3853


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2d9b95f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2d9b95f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2d9b95f

Branch: refs/heads/trunk
Commit: c2d9b95f36a9bb853bb3851696161d7f7f790bec
Parents: 88fdca2
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Jan 13 17:00:03 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Jan 13 17:16:28 2017 -0800

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |  28 ++-
 .../apache/kafka/common/protocol/Protocol.java  |  24 +-
 .../common/requests/OffsetFetchRequest.java     |  93 ++++---
 .../common/requests/OffsetFetchResponse.java    | 140 +++++++++--
 .../clients/consumer/KafkaConsumerTest.java     |  10 +-
 .../internals/ConsumerCoordinatorTest.java      |  22 +-
 .../common/requests/RequestResponseTest.java    |   7 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  12 +
 .../kafka/admin/ConsumerGroupCommand.scala      |  66 +++--
 core/src/main/scala/kafka/api/ApiVersion.scala  |   2 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |  32 ++-
 .../scala/kafka/api/OffsetFetchResponse.scala   |  30 ++-
 .../scala/kafka/consumer/SimpleConsumer.scala   |   6 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  22 +-
 .../coordinator/GroupMetadataManager.scala      |  22 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 118 +++++----
 .../kafka/api/AuthorizerIntegrationTest.scala   |   4 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala | 249 ++++++++++++++++++-
 .../api/RequestResponseSerializationTest.scala  |   6 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  16 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   4 +-
 21 files changed, 693 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3d16736..5455506 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -744,22 +744,32 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
         @Override
         public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+            if (response.hasError()) {
+                Errors error = response.error();
+                log.debug("Offset fetch for group {} failed: {}", groupId, error.message());
+
+                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                    // just retry
+                    future.raise(error);
+                } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                    // re-discover the coordinator and retry
+                    coordinatorDead();
+                    future.raise(error);
+                } else {
+                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
+                }
+                return;
+            }
+
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
             for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
                 OffsetFetchResponse.PartitionData data = entry.getValue();
                 if (data.hasError()) {
-                    Errors error = Errors.forCode(data.errorCode);
+                    Errors error = data.error;
                     log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());
 
-                    if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
-                        // just retry
-                        future.raise(error);
-                    } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
-                        // re-discover the coordinator and retry
-                        coordinatorDead();
-                        future.raise(error);
-                    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 4946960..dc0a860 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -323,8 +323,14 @@ public class Protocol {
 
     /*
      * Wire formats of version 0 and 1 are the same, but with different functionality.
-     * Version 0 will read the offsets from ZK;
+     * Wire format of version 2 is similar to version 1, with the exception of
+     * - accepting 'null' as list of topics
+     * - returning a top level error code
+     * Version 0 will read the offsets from ZK.
      * Version 1 will read the offsets from Kafka.
+     * Version 2 will read the offsets from Kafka, and returns all associated topic partition offsets if
+     * a 'null' is passed instead of a list of specific topic partitions. It also returns a top level error code
+     * for group or coordinator level errors.
      */
     public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
                                                                                         INT32,
@@ -365,8 +371,20 @@ public class Protocol {
     public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
     public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
 
-    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1};
+    public static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(new Field("group_id",
+                                                                              STRING,
+                                                                              "The consumer group id."),
+                                                                              new Field("topics",
+                                                                                        ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0),
+                                                                                        "Topics to fetch offsets. If the topic array is null fetch offsets for all topics."));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema(new Field("responses",
+                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)),
+                                                                     new Field("error_code",
+                                                                               INT16));
+
+    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2};
 
     /* List offset api */
     public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 4b08a60..0ff49be 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ObsoleteBrokerException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -22,7 +23,6 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,6 +38,7 @@ public class OffsetFetchRequest extends AbstractRequest {
     private static final String PARTITION_KEY_NAME = "partition";
 
     public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
+        private static final List<TopicPartition> ALL_TOPIC_PARTITIONS = null;
         private final String groupId;
         private final List<TopicPartition> partitions;
 
@@ -47,8 +48,18 @@ public class OffsetFetchRequest extends AbstractRequest {
             this.partitions = partitions;
         }
 
+        public static Builder allTopicPartitions(String groupId) {
+            return new Builder(groupId, ALL_TOPIC_PARTITIONS);
+        }
+
+        public boolean isAllTopicPartitions() {
+            return this.partitions == ALL_TOPIC_PARTITIONS;
+        }
+
         @Override
         public OffsetFetchRequest build() {
+            if (isAllTopicPartitions() && version() < 2)
+                throw new ObsoleteBrokerException("The broker is too old to send this request.");
             return new OffsetFetchRequest(groupId, partitions, version());
         }
 
@@ -66,60 +77,68 @@ public class OffsetFetchRequest extends AbstractRequest {
     private final String groupId;
     private final List<TopicPartition> partitions;
 
-    // v0 and v1 have the same fields.
+    public static OffsetFetchRequest forAllPartitions(String groupId) {
+        return new OffsetFetchRequest.Builder(groupId, (List<TopicPartition>) null).setVersion((short) 2).build();
+    }
+
+    // v0, v1, and v2 have the same fields.
     private OffsetFetchRequest(String groupId, List<TopicPartition> partitions, short version) {
         super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_FETCH.id, version)), version);
-        Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
-
         struct.set(GROUP_ID_KEY_NAME, groupId);
-        List<Struct> topicArray = new ArrayList<>();
-        for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Integer partiitonId : entries.getValue()) {
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partiitonId);
-                partitionArray.add(partitionData);
+        if (partitions != null) {
+            Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
+
+            List<Struct> topicArray = new ArrayList<>();
+            for (Map.Entry<String, List<Integer>> entries : topicsData.entrySet()) {
+                Struct topicData = struct.instance(TOPICS_KEY_NAME);
+                topicData.set(TOPIC_KEY_NAME, entries.getKey());
+                List<Struct> partitionArray = new ArrayList<>();
+                for (Integer partitionId : entries.getValue()) {
+                    Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                    partitionData.set(PARTITION_KEY_NAME, partitionId);
+                    partitionArray.add(partitionData);
+                }
+                topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+                topicArray.add(topicData);
             }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+            struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        } else
+            struct.set(TOPICS_KEY_NAME, null);
+
         this.groupId = groupId;
         this.partitions = partitions;
     }
 
     public OffsetFetchRequest(Struct struct, short versionId) {
         super(struct, versionId);
-        partitions = new ArrayList<>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                partitions.add(new TopicPartition(topic, partition));
+
+        Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
+        if (topicArray != null) {
+            partitions = new ArrayList<>();
+            for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicResponse = (Struct) topicResponseObj;
+                String topic = topicResponse.getString(TOPIC_KEY_NAME);
+                for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                    Struct partitionResponse = (Struct) partitionResponseObj;
+                    int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                    partitions.add(new TopicPartition(topic, partition));
+                }
             }
-        }
+        } else
+            partitions = null;
+
+
         groupId = struct.getString(GROUP_ID_KEY_NAME);
     }
 
     @Override
     public AbstractResponse getErrorResponse(Throwable e) {
-        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
-
-        for (TopicPartition partition: partitions) {
-            responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
-                    OffsetFetchResponse.NO_METADATA,
-                    Errors.forException(e).code()));
-        }
-
         short versionId = version();
         switch (versionId) {
             case 0:
             case 1:
-                return new OffsetFetchResponse(responseData);
+            case 2:
+                return new OffsetFetchResponse(Errors.forException(e), partitions, versionId);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
@@ -142,4 +161,8 @@ public class OffsetFetchRequest extends AbstractRequest {
     public static OffsetFetchRequest parse(ByteBuffer buffer) {
         return parse(buffer, ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id));
     }
+
+    public boolean isAllPartitions() {
+        return partitions == null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index ae4e066..6be568b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -14,6 +14,8 @@ package org.apache.kafka.common.requests;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,9 +29,11 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
 public class OffsetFetchResponse extends AbstractResponse {
-    
+
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
+    private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id);
     private static final String RESPONSES_KEY_NAME = "responses";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // topic level fields
     private static final String TOPIC_KEY_NAME = "topic";
@@ -39,47 +43,52 @@ public class OffsetFetchResponse extends AbstractResponse {
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String COMMIT_OFFSET_KEY_NAME = "offset";
     private static final String METADATA_KEY_NAME = "metadata";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     public static final long INVALID_OFFSET = -1L;
     public static final String NO_METADATA = "";
 
     /**
-     * Possible error codeS:
+     * Possible error codes:
      *
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)
-     *  GROUP_LOAD_IN_PROGRESS (14)
-     *  NOT_COORDINATOR_FOR_GROUP (16)
-     *  TOPIC_AUTHORIZATION_FAILED (29)
-     *  GROUP_AUTHORIZATION_FAILED (30)
+     * - Partition errors:
+     *   - UNKNOWN_TOPIC_OR_PARTITION (3)
+     *   - TOPIC_AUTHORIZATION_FAILED (29)
+     *
+     * - Group or coordinator errors:
+     *   - GROUP_LOAD_IN_PROGRESS (14)
+     *   - NOT_COORDINATOR_FOR_GROUP (16)
+     *   - GROUP_AUTHORIZATION_FAILED (30)
      */
 
+    public static final List<Errors> PARTITION_ERRORS = Arrays.asList(
+            Errors.UNKNOWN_TOPIC_OR_PARTITION,
+            Errors.TOPIC_AUTHORIZATION_FAILED);
+
     private final Map<TopicPartition, PartitionData> responseData;
+    private final Errors error;
 
     public static final class PartitionData {
         public final long offset;
         public final String metadata;
-        public final short errorCode;
+        public final Errors error;
 
-        public PartitionData(long offset, String metadata, short errorCode) {
+        public PartitionData(long offset, String metadata, Errors error) {
             this.offset = offset;
             this.metadata = metadata;
-            this.errorCode = errorCode;
+            this.error = error;
         }
 
         public boolean hasError() {
-            return this.errorCode != Errors.NONE.code();
+            return this.error != Errors.NONE;
         }
     }
 
-    public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-
+    private List<Struct> getTopicArray(Map<TopicPartition, PartitionData> responseData) {
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
         for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            Struct topicData = this.struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entries.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
@@ -88,19 +97,73 @@ public class OffsetFetchResponse extends AbstractResponse {
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
                 partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
                 partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
                 partitionArray.add(partitionData);
             }
             topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
             topicArray.add(topicData);
         }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+        return topicArray;
+    }
+
+    /**
+     * Unified constructor
+     * @param responseData Fetched offset information grouped by topic-partition
+     * @param topLevelErrorCode Potential coordinator or group level error code (for api version 2 and later)
+     * @param version The request API version
+     */
+    public OffsetFetchResponse(Errors topLevelError, Map<TopicPartition, PartitionData> responseData, int version) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version)));
+
+        this.struct.set(RESPONSES_KEY_NAME, getTopicArray(responseData).toArray());
         this.responseData = responseData;
+        this.error = topLevelError;
+        if (version > 1)
+            this.struct.set(ERROR_CODE_KEY_NAME, this.error.code());
+    }
+
+    /**
+     * Unified constructor (used only if there are errors in the response)
+     * @param partitions partitions to be included in the response
+     * @param topLevelErrorCode The error code to be reported in the response
+     * @param version The request API version
+     */
+    public OffsetFetchResponse(Errors topLevelError, List<TopicPartition> partitions, int version) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version)));
+
+        assert topLevelError != Errors.NONE;
+        this.responseData = new HashMap<>();
+        this.error = topLevelError;
+        if (version < 2) {
+            for (TopicPartition partition : partitions) {
+                this.responseData.put(partition, new OffsetFetchResponse.PartitionData(
+                        OffsetFetchResponse.INVALID_OFFSET,
+                        OffsetFetchResponse.NO_METADATA,
+                        topLevelError));
+            }
+        } else
+            this.struct.set(ERROR_CODE_KEY_NAME, this.error.code());
+
+        this.struct.set(RESPONSES_KEY_NAME, getTopicArray(this.responseData).toArray());
+    }
+
+    public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
+        this(Errors.NONE, responseData, CURRENT_VERSION);
+    }
+
+    /**
+     * Constructor for version 2 and above when there is a coordinator or group level error
+     * @param topLevelErrorCode Coordinator or group level error code
+     */
+    public OffsetFetchResponse(Errors topLevelError) {
+        this(topLevelError, new ArrayList<TopicPartition>(), CURRENT_VERSION);
     }
 
     public OffsetFetchResponse(Struct struct) {
         super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
+        Errors topLevelError = Errors.NONE;
+        this.responseData = new HashMap<TopicPartition, PartitionData>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -109,17 +172,46 @@ public class OffsetFetchResponse extends AbstractResponse {
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
                 String metadata = partitionResponse.getString(METADATA_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, metadata, errorCode);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
+                Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
+                if (error != Errors.NONE && !PARTITION_ERRORS.contains(error))
+                    topLevelError = error;
+                PartitionData partitionData = new PartitionData(offset, metadata, error);
+                this.responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
+
+        // for version 2 and later use the top-level error code (in ERROR_CODE_KEY_NAME) from the response.
+        // for older versions there is no top-level error in the response and all errors are partition errors,
+        // so if there is a group or coordinator error at the partition level use that as the top-level error.
+        // this way clients can depend on the top-level error regardless of the offset fetch version.
+        this.error = struct.hasField(ERROR_CODE_KEY_NAME) ? Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)) : topLevelError;
+    }
+
+    public void maybeThrowFirstPartitionError() {
+        Collection<PartitionData> partitionsData = this.responseData.values();
+        for (PartitionData data : partitionsData) {
+            if (data.hasError())
+                throw data.error.exception();
+        }
+    }
+
+    public boolean hasError() {
+        return this.error != Errors.NONE;
+    }
+
+    public Errors error() {
+        return this.error;
     }
 
     public Map<TopicPartition, PartitionData> responseData() {
         return responseData;
     }
 
+    public static OffsetFetchResponse parse(ByteBuffer buffer, int version) {
+        Schema schema = ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version);
+        return new OffsetFetchResponse(schema.read(buffer));
+    }
+
     public static OffsetFetchResponse parse(ByteBuffer buffer) {
         return new OffsetFetchResponse(CURRENT_SCHEMA.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bf91c28..2eeed55 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -497,7 +497,7 @@ public class KafkaConsumerTest {
 
         // fetch offset for one topic
         client.prepareResponseFrom(
-                offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE.code()),
+                offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE),
                 coordinator);
 
         assertEquals(offset1, consumer.committed(tp0).offset());
@@ -508,7 +508,7 @@ public class KafkaConsumerTest {
         Map<TopicPartition, Long> offsets = new HashMap<>();
         offsets.put(tp0, offset1);
         offsets.put(tp1, offset2);
-        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator);
+        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
 
         assertEquals(offset1, consumer.committed(tp0).offset());
         assertEquals(offset2, consumer.committed(tp1).offset());
@@ -999,7 +999,7 @@ public class KafkaConsumerTest {
 
         // fetch offset for one topic
         client.prepareResponseFrom(
-                offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE.code()),
+                offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
                 coordinator);
         assertEquals(0, consumer.committed(tp0).offset());
 
@@ -1064,7 +1064,7 @@ public class KafkaConsumerTest {
 
         // fetch offset for one topic
         client.prepareResponseFrom(
-                offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE.code()),
+                offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
                 coordinator);
         assertEquals(0, consumer.committed(tp0).offset());
 
@@ -1365,7 +1365,7 @@ public class KafkaConsumerTest {
         return new SyncGroupResponse(error, buf);
     }
 
-    private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> offsets, short error) {
+    private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> offsets, Errors error) {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionData = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
             partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fa2752c..1ce1af0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1143,7 +1143,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -1156,8 +1156,8 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L));
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(Errors.GROUP_LOAD_IN_PROGRESS));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -1170,7 +1170,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
     }
 
@@ -1181,9 +1181,9 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -1196,7 +1196,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE, "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
         assertEquals(null, subscriptions.committed(tp));
@@ -1465,8 +1465,12 @@ public class ConsumerCoordinatorTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
-        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
+    private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
+        return new OffsetFetchResponse(topLevelError);
+    }
+
+    private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) {
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, partitionLevelError);
         return new OffsetFetchResponse(Collections.singletonMap(tp, data));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index de676c7..005514c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -15,6 +15,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
@@ -90,6 +91,8 @@ public class RequestResponseTest {
         checkSerialization(createOffsetCommitRequest(2));
         checkSerialization(createOffsetCommitRequest(2).getErrorResponse(new UnknownServerException()), null);
         checkSerialization(createOffsetCommitResponse(), null);
+        checkSerialization(OffsetFetchRequest.forAllPartitions("group1"));
+        checkSerialization(OffsetFetchRequest.forAllPartitions("group1").getErrorResponse(new NotCoordinatorForGroupException()), 2);
         checkSerialization(createOffsetFetchRequest());
         checkSerialization(createOffsetFetchRequest().getErrorResponse(new UnknownServerException()), null);
         checkSerialization(createOffsetFetchResponse(), null);
@@ -426,8 +429,8 @@ public class RequestResponseTest {
 
     private OffsetFetchResponse createOffsetFetchResponse() {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
-        responseData.put(new TopicPartition("test", 1), new OffsetFetchResponse.PartitionData(100L, null, Errors.NONE.code()));
+        responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE));
+        responseData.put(new TopicPartition("test", 1), new OffsetFetchResponse.PartitionData(100L, null, Errors.NONE));
         return new OffsetFetchResponse(responseData);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index e43de5d..33089d1 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -27,6 +27,8 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
@@ -114,6 +116,16 @@ class AdminClient(val time: Time,
     listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
   }
 
+  def listGroupOffsets(groupId: String): Map[TopicPartition, Long] = {
+    val coordinator = findCoordinator(groupId)
+    val responseBody = send(coordinator, ApiKeys.OFFSET_FETCH, OffsetFetchRequest.Builder.allTopicPartitions(groupId))
+    val response = responseBody.asInstanceOf[OffsetFetchResponse]
+    if (response.hasError)
+      throw response.error.exception
+    response.maybeThrowFirstPartitionError
+    response.responseData().asScala.map { responseData => (responseData._1, responseData._2.offset) }.toMap
+  }
+
   /**
    * Case class used to represent a consumer of a consumer group
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index a832c38..d4e407a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -20,11 +20,13 @@ package kafka.admin
 import java.util.Properties
 
 import joptsimple.{OptionParser, OptionSpec}
+
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
 import kafka.common.{TopicAndPartition, _}
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
+
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -82,7 +84,8 @@ object ConsumerGroupCommand extends Logging {
                 case Some("Dead") =>
                   printError(s"Consumer group '$groupId' does not exist.")
                 case Some("Empty") =>
-                  printError(s"Consumer group '$groupId' has no active members.")
+                  System.err.println(s"Consumer group '$groupId' has no active members.")
+                  printAssignment(assignments, true)
                 case Some("PreparingRebalance") | Some("AwaitingSync") =>
                   System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
                   printAssignment(assignments, true)
@@ -108,6 +111,8 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  val MISSING_COLUMN_VALUE = "-"
+
   def printError(msg: String, e: Option[Throwable] = None): Unit = {
     println(s"Error: $msg")
     e.foreach(debug("Exception in consumer group command", _))
@@ -121,11 +126,11 @@ object ConsumerGroupCommand extends Logging {
 
     groupAssignment.foreach { consumerAssignment =>
       print("%-30s %-10s %-15s %-15s %-10s %-50s".format(
-        consumerAssignment.topic.getOrElse("-"), consumerAssignment.partition.getOrElse("-"),
-        consumerAssignment.offset.getOrElse("-"), consumerAssignment.logEndOffset.getOrElse("-"),
-        consumerAssignment.lag.getOrElse("-"), consumerAssignment.consumerId.getOrElse("-")))
+        consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
+        consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
+        consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
       if (useNewConsumer)
-        print("%-30s %s".format(consumerAssignment.host.getOrElse("-"), consumerAssignment.clientId.getOrElse("-")))
+        print("%-30s %s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
       println()
     }
   }
@@ -237,9 +242,7 @@ object ConsumerGroupCommand extends Logging {
       // mapping of topic partition -> consumer id
       val consumerIdByTopicPartition = topicPartitions.map { topicPartition =>
         val owner = zkUtils.readDataMaybeNull(new ZKGroupTopicDirs(group, topicPartition.topic).consumerOwnerDir + "/" + topicPartition.partition)._1
-        var consumerId = ""
-        owner.foreach(o => consumerId = o.substring(0, o.lastIndexOf('-')))
-        topicPartition -> consumerId
+        topicPartition -> owner.map(o => o.substring(0, o.lastIndexOf('-'))).getOrElse(MISSING_COLUMN_VALUE)
       }.toMap
 
       // mapping of consumer id -> list of topic partitions
@@ -398,23 +401,36 @@ object ConsumerGroupCommand extends Logging {
           case None =>
             None
           case Some(consumers) =>
-            if (consumers.isEmpty)
-              Some(Array[PartitionAssignmentState]())
-            else {
-              val consumer = getConsumer()
-              Some(consumers.sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary =>
-                val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
-                val partitionOffsets = topicPartitions.flatMap { topicPartition =>
-                  Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
-                    topicPartition -> offsetAndMetadata.offset
-                  }
-                }.toMap
-                collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions,
-                  partitionOffsets.get, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
-                  Some(s"${consumerSummary.clientId}"))
-              })
-            }
-        }
+            var assignedTopicPartitions = Array[TopicPartition]()
+            val offsets = adminClient.listGroupOffsets(group)
+            val rowsWithConsumer =
+              if (offsets.isEmpty)
+                List[PartitionAssignmentState]()
+              else {
+                consumers.sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary =>
+                  val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
+                  assignedTopicPartitions = assignedTopicPartitions ++ consumerSummary.assignment
+                  val partitionOffsets: Map[TopicAndPartition, Option[Long]] = consumerSummary.assignment.map { topicPartition =>
+                    new TopicAndPartition(topicPartition) -> offsets.get(topicPartition)
+                  }.toMap
+                  collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions,
+                    partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
+                    Some(s"${consumerSummary.clientId}"))
+                }
+              }
+
+            val rowsWithoutConsumer = offsets.filterNot {
+              case (topicPartition, offset) => assignedTopicPartitions.contains(topicPartition)
+              }.flatMap {
+                case (topicPartition, offset) =>
+                  val topicAndPartition = new TopicAndPartition(topicPartition)
+                  collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
+                      Map(topicAndPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
+                      Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+                }
+
+            Some(rowsWithConsumer ++ rowsWithoutConsumer)
+      }
       )
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 4cd10f4..730f313 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.record.Record
  *
  * Note that the ID we initialize for each version is important.
  * We consider a version newer than another, if it has a higher ID (to avoid depending on lexicographic order)
- * 
+ *
  * Since the api protocol may change more than once within the same release and to facilitate people deploying code from
  * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example,
  * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index d78fbf3..2908901 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -25,9 +25,10 @@ import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PARTITION_ERRORS
 
 object OffsetFetchRequest extends Logging {
-  val CurrentVersion: Short = 1
+  val CurrentVersion: Short = 2
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
@@ -59,7 +60,7 @@ case class OffsetFetchRequest(groupId: String,
     extends RequestOrResponse(Some(ApiKeys.OFFSET_FETCH.id)) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
-  
+
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -82,7 +83,7 @@ case class OffsetFetchRequest(groupId: String,
     2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
-    shortStringLength(groupId) + 
+    shortStringLength(groupId) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
       count + shortStringLength(t._1) + /* topic */
@@ -90,13 +91,20 @@ case class OffsetFetchRequest(groupId: String,
       t._2.size * 4 /* partition */
     })
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val responseMap = requestInfo.map {
-      case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
-        Errors.forException(e).code
-      ))
-    }.toMap
-    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val requestVersion = request.header.apiVersion
+
+    val thrownError = Errors.forException(e)
+    val responseMap =
+      if (requestVersion < 2) {
+        requestInfo.map {
+          topicAndPartition => (topicAndPartition, OffsetMetadataAndError(thrownError.code))
+        }.toMap
+      } else {
+        Map[kafka.common.TopicAndPartition, kafka.common.OffsetMetadataAndError]()
+      }
+
+    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, errorCode=thrownError.code)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
@@ -107,7 +115,7 @@ case class OffsetFetchRequest(groupId: String,
     offsetFetchRequest.append("; CorrelationId: " + correlationId)
     offsetFetchRequest.append("; ClientId: " + clientId)
     offsetFetchRequest.append("; GroupId: " + groupId)
-    if(details)
+    if (details)
       offsetFetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetFetchRequest.toString()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index e3523f8..791cd6c 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,10 +23,15 @@ import kafka.api.ApiUtils._
 import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
 import kafka.utils.Logging
 
+import org.apache.kafka.common.protocol.Errors
+
 object OffsetFetchResponse extends Logging {
-  val CurrentVersion: Short = 0
 
   def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
+    readFrom(buffer, OffsetFetchRequest.CurrentVersion)
+  }
+
+  def readFrom(buffer: ByteBuffer, requestVersion: Int): OffsetFetchResponse = {
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
@@ -40,12 +45,20 @@ object OffsetFetchResponse extends Logging {
         (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
       })
     })
-    OffsetFetchResponse(Map(pairs:_*), correlationId)
+
+    val errorCode = requestVersion match {
+      case 0 | 1 => Errors.NONE.code
+      case _ => buffer.getShort
+    }
+
+    OffsetFetchResponse(Map(pairs:_*), requestVersion, correlationId, errorCode)
   }
 }
 
 case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
-                               correlationId: Int = 0)
+                               requestVersion: Int = OffsetFetchRequest.CurrentVersion,
+                               correlationId: Int = 0,
+                               errorCode: Short = Errors.NONE.code)
     extends RequestOrResponse() {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
@@ -56,13 +69,17 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
     requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
       writeShortString(buffer, t1._1) // topic
       buffer.putInt(t1._2.size)       // number of partitions for this topic
-      t1._2.foreach( t2 => { // TopicAndPartition -> OffsetMetadataAndError 
+      t1._2.foreach( t2 => { // TopicAndPartition -> OffsetMetadataAndError
         buffer.putInt(t2._1.partition)
         buffer.putLong(t2._2.offset)
         writeShortString(buffer, t2._2.metadata)
         buffer.putShort(t2._2.error)
       })
     })
+
+    // the top level error_code was introduced in v2
+    if (requestVersion > 1)
+      buffer.putShort(errorCode)
   }
 
   override def sizeInBytes =
@@ -80,7 +97,8 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
         shortStringLength(offsetsAndMetadata._2.metadata) +
         2 /* error */
       })
-    })
+    }) +
+    (if (requestVersion > 1) 2 else 0) /* error */
 
   override def describe(details: Boolean):String = { toString }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5d219ff..f7e8bcb 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -76,7 +76,7 @@ class SimpleConsumer(val host: String,
       isClosed = true
     }
   }
-  
+
   private def sendRequest(request: RequestOrResponse): NetworkReceive = {
     lock synchronized {
       var response: NetworkReceive = null
@@ -166,7 +166,7 @@ class SimpleConsumer(val host: String,
    * @param request a [[kafka.api.OffsetFetchRequest]] object.
    * @return a [[kafka.api.OffsetFetchResponse]] object.
    */
-  def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload())
+  def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload(), request.versionId)
 
   private def getOrMakeConnection() {
     if(!isClosed && !blockingChannel.isConnected) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index db40482..4cbfad6 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -454,23 +454,19 @@ class GroupCoordinator(val brokerId: Int,
     delayedOffsetStore.foreach(groupManager.store)
   }
 
-
   def handleFetchOffsets(groupId: String,
-                         partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
-    if (!isActive.get) {
-      partitions.map { topicPartition =>
-        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
-    } else if (!isCoordinatorForGroup(groupId)) {
+                         partitions: Option[Seq[TopicPartition]]): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
+    if (!isActive.get)
+      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Map())
+    else if (!isCoordinatorForGroup(groupId)) {
       debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
-      partitions.map { topicPartition =>
-        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      partitions.map { topicPartition =>
-        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
-    } else {
+      (Errors.NOT_COORDINATOR_FOR_GROUP, Map())
+    } else if (isCoordinatorLoadingInProgress(groupId))
+      (Errors.GROUP_LOAD_IN_PROGRESS, Map())
+    else {
       // return offsets blindly regardless the current group state since the group may be using
       // Kafka commit storage without automatic group management
-      groupManager.getOffsets(groupId, partitions)
+      (Errors.NONE, groupManager.getOffsets(groupId, partitions))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 637b0c4..74b46ad 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -328,31 +328,31 @@ class GroupMetadataManager(val brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
-    trace("Getting offsets %s for group %s.".format(topicPartitions, groupId))
+  def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitions.map { topicPartition =>
-        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
       }.toMap
     } else {
       group synchronized {
         if (group.is(Dead)) {
-          topicPartitions.map { topicPartition =>
-            (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+            (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
           }.toMap
         } else {
-            if (topicPartitions.isEmpty) {
+            if (topicPartitionsOpt.isEmpty) {
               // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
               group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
-                (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+                (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE))
               }
             } else {
-              topicPartitions.map { topicPartition =>
+              topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
                 group.offset(topicPartition) match {
-                  case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+                  case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
                   case Some(offsetAndMetadata) =>
-                    (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+                    (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE))
                 }
               }.toMap
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5c0201b..3a4b9ea 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -639,7 +639,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (offsetRequest.duplicatePartitions().contains(topicPartition)) {
         debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
             s"failed because the partition is duplicated in the request.")
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code(),
+        (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code,
                                                               ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                                               ListOffsetResponse.UNKNOWN_OFFSET))
       } else {
@@ -894,51 +894,85 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
 
     val offsetFetchResponse =
-    // reject the request if not authorized to the group
-    if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
-      val unauthorizedGroupResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_AUTHORIZATION_FAILED.code)
-      val results = offsetFetchRequest.partitions.asScala.map { topicPartition => (topicPartition, unauthorizedGroupResponse)}.toMap
-      new OffsetFetchResponse(results.asJava)
-    } else {
-      val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
-        authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
-      }
-      val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-      val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap
-
-      if (header.apiVersion == 0) {
-        // version 0 reads offsets from ZK
-        val responseInfo = authorizedTopicPartitions.map { topicPartition =>
-          val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
-          try {
-            if (!metadataCache.contains(topicPartition.topic))
-              (topicPartition, unknownTopicPartitionResponse)
-            else {
-              val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
-              payloadOpt match {
-                case Some(payload) =>
-                  (topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code))
-                case None =>
-                  (topicPartition, unknownTopicPartitionResponse)
+      // reject the request if not authorized to the group
+      if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
+        new OffsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, offsetFetchRequest.partitions, header.apiVersion)
+      else {
+        val partitions =
+          if (offsetFetchRequest.isAllPartitions)
+            List[TopicPartition]()
+          else
+            offsetFetchRequest.partitions.asScala.toList
+
+        val (authorizedPartitions, unauthorizedPartitions) =
+          partitions.partition { partition => authorize(request.session, Describe, new Resource(auth.Topic, partition.topic)) }
+
+        val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(
+            OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        val unauthorizedStatus = unauthorizedPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap
+
+        if (header.apiVersion == 0) {
+          // version 0 reads offsets from ZK
+          val responseInfo = authorizedPartitions.map { topicPartition =>
+            val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+            try {
+              if (!metadataCache.contains(topicPartition.topic))
+                (topicPartition, unknownTopicPartitionResponse)
+              else {
+                val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+                payloadOpt match {
+                  case Some(payload) =>
+                    (topicPartition, new OffsetFetchResponse.PartitionData(
+                        payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
+                  case None =>
+                    (topicPartition, unknownTopicPartitionResponse)
+                }
               }
+            } catch {
+              case e: Throwable =>
+                (topicPartition, new OffsetFetchResponse.PartitionData(
+                    OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
             }
-          } catch {
-            case e: Throwable =>
-              (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "",
-                Errors.forException(e).code))
+          }.toMap
+          new OffsetFetchResponse(Errors.NONE, (responseInfo ++ unauthorizedStatus).asJava, header.apiVersion)
+        }
+        else {
+          // versions 1 and above read offsets from Kafka
+          val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+            if (offsetFetchRequest.isAllPartitions)
+              None
+            else
+              Some(authorizedPartitions))
+
+          // Note that we do not need to filter the partitions in the
+          // metadata cache as the topic partitions will be filtered
+          // in coordinator's offset manager through the offset cache
+          if (header.apiVersion == 1) {
+            val authorizedStatus =
+              if (offsets._1 != Errors.NONE) {
+                authorizedPartitions.map { partition =>
+                  (partition, new OffsetFetchResponse.PartitionData(
+                      OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, offsets._1))}.toMap
+              }
+              else
+                offsets._2.toMap
+            new OffsetFetchResponse(Errors.NONE, (authorizedStatus ++ unauthorizedStatus).asJava, header.apiVersion)
           }
-        }.toMap
-        new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava)
-      } else {
-        // version 1 reads offsets from Kafka;
-        val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
-
-        // Note that we do not need to filter the partitions in the
-        // metadata cache as the topic partitions will be filtered
-        // in coordinator's offset manager through the offset cache
-        new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava)
+          else if (offsets._1 == Errors.NONE) {
+            if (offsetFetchRequest.isAllPartitions) {
+              // filter out unauthorized topics in case all group offsets are requested
+              val authorizedStatus = offsets._2.filter {
+                case (partition, _) => authorize(request.session, Describe, new Resource(auth.Topic, partition.topic))
+              }
+              new OffsetFetchResponse((authorizedStatus).asJava)
+            }
+            else
+              new OffsetFetchResponse((offsets._2.toMap ++ unauthorizedStatus).asJava)
+          }
+          else
+            new OffsetFetchResponse(offsets._1)
+        }
       }
-    }
 
     trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
     requestChannel.sendResponse(new Response(request, offsetFetchResponse))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 9e1efa6..18f7fc8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -105,12 +105,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   )
 
   val RequestKeyToErrorCode = Map[ApiKeys, (Nothing) => Short](
-    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
+    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
     ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
     ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
-    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error.code),
     ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
     ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.errorCode()),

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index a6ebbe3..8e10a87 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -16,6 +16,10 @@
  */
 package kafka.admin
 
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import java.util.Collections
 import java.util.Properties
 
 import org.easymock.EasyMock
@@ -23,6 +27,7 @@ import org.junit.Before
 import org.junit.Test
 
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
+import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
 import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
 import kafka.consumer.OldConsumer
 import kafka.consumer.Whitelist
@@ -30,6 +35,11 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
+import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+
 
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
@@ -48,12 +58,12 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
     props.setProperty("group.id", group)
-    props.setProperty("zookeeper.connect", zkConnect)
   }
 
   @Test
   def testDescribeNonExistingGroup() {
     // mocks
+    props.setProperty("zookeeper.connect", zkConnect)
     val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
 
     // stubs
@@ -74,6 +84,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   @Test
   def testDescribeExistingGroup() {
     // mocks
+    props.setProperty("zookeeper.connect", zkConnect)
     val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
 
     // stubs
@@ -88,8 +99,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.nonEmpty)
-      }, "Expected rows and a member id column in describe group results.")
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+      }, "Expected rows and a consumer id column in describe group results.")
 
     // cleanup
     consumerGroupCommand.close()
@@ -99,6 +110,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   @Test
   def testDescribeExistingGroupWithNoMembers() {
     // mocks
+    props.setProperty("zookeeper.connect", zkConnect)
     val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
 
     // stubs
@@ -109,14 +121,19 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     EasyMock.replay(consumerMock)
 
     // action/test
-    val (_, a1) = consumerGroupCommand.describeGroup() // there should be a member here
+    TestUtils.waitUntilTrue(() => {
+        val (_, assignments) = consumerGroupCommand.describeGroup()
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+      }, "Expected rows and a consumer id column in describe group results.")
     consumerMock.stop()
+
     TestUtils.waitUntilTrue(() => {
         val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.isDefined &&
-        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.isEmpty) // the member should be gone
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
       }, "Expected no active member in describe group results.")
 
     // cleanup
@@ -126,6 +143,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   @Test
   def testDescribeConsumersWithNoAssignedPartitions() {
     // mocks
+    props.setProperty("zookeeper.connect", zkConnect)
     val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
     val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
 
@@ -150,4 +168,223 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     consumer1Mock.stop()
     consumer2Mock.stop()
   }
+
+  @Test
+  def testDescribeNonExistingGroupWithNewConsumer() {
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+
+    // note the group to be queried is a different (non-existing) group
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    TestUtils.waitUntilTrue(() => {
+        try {
+          val (state, assignments) = consumerGroupCommand.describeGroup()
+          println(state == Some("Dead") && assignments == Some(List()))
+          state == Some("Dead") && assignments == Some(List())
+        } catch {
+          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
+            // Do nothing while the group initializes
+            false
+          case e: Throwable =>
+            e.printStackTrace()
+            throw e
+        }
+      }, "Expected the state to be 'Dead' with no members in the group.")
+
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testDescribeExistingGroupWithNewConsumer() {
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    TestUtils.waitUntilTrue(() => {
+        try {
+          val (state, assignments) = consumerGroupCommand.describeGroup()
+          state == Some("Stable") &&
+          assignments.isDefined &&
+          assignments.get.count(_.group == group) == 1 &&
+          assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+          assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+          assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+        } catch {
+          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
+            // Do nothing while the group initializes
+            false
+          case e: Throwable =>
+            throw e
+        }
+      }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
+
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    TestUtils.waitUntilTrue(() => {
+        try {
+          val (state, _) = consumerGroupCommand.describeGroup()
+          state == Some("Stable")
+        } catch {
+          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
+            // Do nothing while the group initializes
+            false
+          case e: Throwable =>
+            throw e
+        }
+      }, "Expected the group to initially become stable.")
+
+    // stop the consumer so the group has no active member anymore
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+        try {
+          val (state, assignments) = consumerGroupCommand.describeGroup()
+          state == Some("Empty") &&
+          assignments.isDefined &&
+          assignments.get.count(_.group == group) == 1 &&
+          assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+          assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+          assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+        } catch {
+          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
+            // Do nothing while the group initializes
+            false
+          case e: Throwable =>
+            throw e
+        } finally {
+          consumerGroupCommand.close()
+        }
+      }, "Expected no active member in describe group results.")
+  }
+
+  @Test
+  def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
+    // run two consumers in the group consuming from a single-partition topic
+    val executor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    TestUtils.waitUntilTrue(() => {
+        try {
+          val (state, assignments) = consumerGroupCommand.describeGroup()
+          state == Some("Stable") &&
+          assignments.isDefined &&
+          assignments.get.count(_.group == group) == 2 &&
+          assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
+          assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
+        } catch {
+          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
+            // Do nothing while the group initializes
+            false
+          case e: Throwable =>
+            throw e
+        }
+      }, "Expected rows for consumers with no assigned partitions in describe group results.")
+
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+    // run two consumers in the group consuming from a two-partition topic
+    val executor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    TestUtils.waitUntilTrue(() => {
+        try {
+          val (state, assignments) = consumerGroupCommand.describeGroup()
+          state == Some("Stable") &&
+          assignments.isDefined &&
+          assignments.get.count(_.group == group) == 2 &&
+          assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
+          assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
+        } catch {
+          case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
+            // Do nothing while the group initializes
+            false
+          case e: Throwable =>
+            throw e
+        }
+      }, "Expected two rows (one row per consumer) in describe group results.")
+
+    consumerGroupCommand.close()
+  }
+}
+
+
+class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable {
+  val props = new Properties
+  props.put("bootstrap.servers", broker)
+  props.put("group.id", groupId)
+  props.put("key.deserializer", classOf[StringDeserializer].getName)
+  props.put("value.deserializer", classOf[StringDeserializer].getName)
+  val consumer = new KafkaConsumer(props)
+
+  def run() {
+    try {
+      consumer.subscribe(Collections.singleton(topic))
+      while (true)
+        consumer.poll(Long.MaxValue)
+    } catch {
+      case e: WakeupException => // OK
+    } finally {
+      consumer.close()
+    }
+  }
+
+  def shutdown() {
+    consumer.wakeup()
+  }
+}
+
+
+class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
+  val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
+  var consumers = List[ConsumerThread]()
+  for (i <- 1 to numConsumers) {
+    val consumer = new ConsumerThread(broker, i, groupId, topic)
+    consumers ++= List(consumer)
+    executor.submit(consumer);
+  }
+
+  Runtime.getRuntime().addShutdownHook(new Thread() {
+    override def run() {
+      shutdown()
+    }
+  })
+
+  def shutdown() {
+    consumers.foreach(_.shutdown)
+    executor.shutdown();
+    try {
+      executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+    } catch {
+      case e: InterruptedException =>
+        e.printStackTrace()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 7806765..e6090c1 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -150,7 +150,7 @@ object SerializationTestUtils {
     new OffsetFetchResponse(collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", Errors.NONE.code),
       TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-    ))
+    ), errorCode = Errors.NONE.code)
   }
 
   def createConsumerMetadataRequest: GroupCoordinatorRequest = GroupCoordinatorRequest("group 1", clientId = "client 1")
@@ -183,8 +183,8 @@ class RequestResponseSerializationTest extends JUnitSuite {
     val requestsAndResponses =
       collection.immutable.Seq(producerRequest, producerResponse,
                                fetchRequest, offsetRequest, offsetResponse,
-                               offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,
-                               offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
+                               offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse,
+                               offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
                                consumerMetadataResponseNoCoordinator)
 


Mime
View raw message