kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479)
Date Wed, 31 Jan 2018 21:23:18 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1ed6da7  KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479)
1ed6da7 is described below

commit 1ed6da7cc8eb2231f73509c907c2e67af2f249d2
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
AuthorDate: Wed Jan 31 13:23:12 2018 -0800

    KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479)
    
    Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../common/errors/GroupIdNotFoundException.java    |  31 +++
 .../common/errors/GroupNotEmptyException.java      |  31 +++
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   5 +-
 .../org/apache/kafka/common/protocol/Errors.java   |  24 +-
 .../kafka/common/requests/AbstractRequest.java     |   2 +
 .../kafka/common/requests/AbstractResponse.java    |   2 +
 .../kafka/common/requests/DeleteGroupsRequest.java | 117 ++++++++++
 .../common/requests/DeleteGroupsResponse.java      | 129 +++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  13 ++
 core/src/main/scala/kafka/admin/AclCommand.scala   |   2 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |  43 ++++
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   3 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  84 ++++---
 core/src/main/scala/kafka/api/ApiVersion.scala     |  10 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  48 +++-
 .../coordinator/group/GroupMetadataManager.scala   |  17 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  20 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  40 +++-
 .../kafka/admin/DeleteConsumerGroupsTest.scala     | 251 +++++++++++++++++++++
 .../coordinator/group/GroupCoordinatorTest.scala   |  88 ++++++++
 .../group/GroupMetadataManagerTest.scala           |  20 ++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 22 files changed, 941 insertions(+), 47 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
new file mode 100644
index 0000000..1ff30f1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class GroupIdNotFoundException extends ApiException {
+    private final String groupId;
+
+    public GroupIdNotFoundException(String groupId) {
+        super("The group id " + groupId + " was not found");
+        this.groupId = groupId;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
new file mode 100644
index 0000000..264e613
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class GroupNotEmptyException extends ApiException {
+    private final String groupId;
+
+    public GroupNotEmptyException(String groupId) {
+        super("The group " + groupId + " is not empty");
+        this.groupId = groupId;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b408e80..e0cdfd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -43,6 +43,8 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -183,7 +185,8 @@ public enum ApiKeys {
     CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
     RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
     EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
-    DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions());
+    DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
+    DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions());
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bd5b800..e2b8aea 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -17,8 +17,6 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
-import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@@ -26,12 +24,15 @@ import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.DuplicateSequenceException;
+import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
 import org.apache.kafka.common.errors.DelegationTokenDisabledException;
+import org.apache.kafka.common.errors.DelegationTokenExpiredException;
 import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
 import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@@ -41,6 +42,7 @@ import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidPartitionsException;
 import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -52,6 +54,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
 import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -70,10 +73,9 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
-import org.apache.kafka.common.errors.DelegationTokenExpiredException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -594,6 +596,18 @@ public enum Errors {
         public ApiException build(String message) {
             return new InvalidPrincipalTypeException(message);
         }
+    }),
+    NON_EMPTY_GROUP(68, "The group is not empty", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new GroupNotEmptyException(message);
+        }
+    }),
+    GROUP_ID_NOT_FOUND(69, "The group id does not exist", new ApiExceptionBuilder() {
+        @Override
+        public ApiException build(String message) {
+            return new GroupIdNotFoundException(message);
+        }
     });
 
     private interface ApiExceptionBuilder {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index cd213d9..d2b93c4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -222,6 +222,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new ExpireDelegationTokenRequest(struct, apiVersion);
             case DESCRIBE_DELEGATION_TOKEN:
                 return new DescribeDelegationTokenRequest(struct, apiVersion);
+            case DELETE_GROUPS:
+                return new DeleteGroupsRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index fb01298..608f6c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -154,6 +154,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new ExpireDelegationTokenResponse(struct);
             case DESCRIBE_DELEGATION_TOKEN:
                 return new DescribeDelegationTokenResponse(struct);
+            case DELETE_GROUPS:
+                return new DeleteGroupsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
new file mode 100644
index 0000000..bda6b33
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class DeleteGroupsRequest extends AbstractRequest {
+    private static final String GROUPS_KEY_NAME = "groups";
+
+    /* DeleteGroups api */
+    private static final Schema DELETE_GROUPS_REQUEST_V0 = new Schema(
+            new Field(GROUPS_KEY_NAME, new ArrayOf(STRING), "An array of groups to be deleted."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_GROUPS_REQUEST_V0};
+    }
+
+    private final Set<String> groups;
+
+    public static class Builder extends AbstractRequest.Builder<DeleteGroupsRequest> {
+        private final Set<String> groups;
+
+        public Builder(Set<String> groups) {
+            super(ApiKeys.DELETE_GROUPS);
+            this.groups = groups;
+        }
+
+        @Override
+        public DeleteGroupsRequest build(short version) {
+            return new DeleteGroupsRequest(groups, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=DeleteGroupsRequest").
+                append(", groups=(").append(Utils.join(groups, ", ")).append(")").
+                append(")");
+            return bld.toString();
+        }
+    }
+
+    private DeleteGroupsRequest(Set<String> groups, short version) {
+        super(version);
+        this.groups = groups;
+    }
+
+    public DeleteGroupsRequest(Struct struct, short version) {
+        super(version);
+        Object[] groupsArray = struct.getArray(GROUPS_KEY_NAME);
+        Set<String> groups = new HashSet<>(groupsArray.length);
+        for (Object group : groupsArray)
+            groups.add((String) group);
+
+        this.groups = groups;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.DELETE_GROUPS.requestSchema(version()));
+        struct.set(GROUPS_KEY_NAME, groups.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Errors error = Errors.forException(e);
+        Map<String, Errors> groupErrors = new HashMap<>(groups.size());
+        for (String group : groups)
+            groupErrors.put(group, error);
+
+        switch (version()) {
+            case 0:
+                return new DeleteGroupsResponse(throttleTimeMs, groupErrors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    version(), ApiKeys.DELETE_GROUPS.name, ApiKeys.DELETE_GROUPS.latestVersion()));
+        }
+    }
+
+    public Set<String> groups() {
+        return groups;
+    }
+
+    public static DeleteGroupsRequest parse(ByteBuffer buffer, short version) {
+        return new DeleteGroupsRequest(ApiKeys.DELETE_GROUPS.parseRequest(version, buffer), version);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
new file mode 100644
index 0000000..d97bb0d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
+public class DeleteGroupsResponse extends AbstractResponse {
+    private static final String GROUP_ERROR_CODES_KEY_NAME = "group_error_codes";
+
+    private static final Schema GROUP_ERROR_CODE = new Schema(
+            GROUP_ID,
+            ERROR_CODE);
+
+    private static final Schema DELETE_GROUPS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(GROUP_ERROR_CODES_KEY_NAME, new ArrayOf(GROUP_ERROR_CODE), "An array of per group error codes."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_GROUPS_RESPONSE_V0};
+    }
+
+
+    /**
+     * Possible error codes:
+     *
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_NOT_AVAILABLE(15)
+     * NOT_COORDINATOR (16)
+     * INVALID_GROUP_ID(24)
+     * GROUP_AUTHORIZATION_FAILED(30)
+     * NON_EMPTY_GROUP(68)
+     * GROUP_ID_NOT_FOUND(69)
+     */
+
+    private final Map<String, Errors> errors;
+    private final int throttleTimeMs;
+
+    public DeleteGroupsResponse(Map<String, Errors> errors) {
+        this(DEFAULT_THROTTLE_TIME, errors);
+    }
+
+    public DeleteGroupsResponse(int throttleTimeMs, Map<String, Errors> errors) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.errors = errors;
+    }
+
+    public DeleteGroupsResponse(Struct struct) {
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+        Object[] groupErrorCodesStructs = struct.getArray(GROUP_ERROR_CODES_KEY_NAME);
+        Map<String, Errors> errors = new HashMap<>();
+        for (Object groupErrorCodeStructObj : groupErrorCodesStructs) {
+            Struct groupErrorCodeStruct = (Struct) groupErrorCodeStructObj;
+            String group = groupErrorCodeStruct.get(GROUP_ID);
+            Errors error = Errors.forCode(groupErrorCodeStruct.get(ERROR_CODE));
+            errors.put(group, error);
+        }
+
+        this.errors = errors;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DELETE_GROUPS.responseSchema(version));
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+        List<Struct> groupErrorCodeStructs = new ArrayList<>(errors.size());
+        for (Map.Entry<String, Errors> groupError : errors.entrySet()) {
+            Struct groupErrorCodeStruct = struct.instance(GROUP_ERROR_CODES_KEY_NAME);
+            groupErrorCodeStruct.set(GROUP_ID, groupError.getKey());
+            groupErrorCodeStruct.set(ERROR_CODE, groupError.getValue().code());
+            groupErrorCodeStructs.add(groupErrorCodeStruct);
+        }
+        struct.set(GROUP_ERROR_CODES_KEY_NAME, groupErrorCodeStructs.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Map<String, Errors> errors() {
+        return errors;
+    }
+
+    public boolean hasError(String group) {
+        return errors.containsKey(group) && errors.get(group) != Errors.NONE;
+    }
+
+    public Errors get(String group) {
+        return errors.get(group);
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(errors);
+    }
+
+    public static DeleteGroupsResponse parse(ByteBuffer buffer, short version) {
+        return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.responseSchema(version).read(buffer));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2740616..b5420b5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -112,6 +112,9 @@ public class RequestResponseTest {
         checkRequest(createDescribeGroupRequest());
         checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException());
         checkResponse(createDescribeGroupResponse(), 0);
+        checkRequest(createDeleteGroupsRequest());
+        checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException());
+        checkResponse(createDeleteGroupsResponse(), 0);
         checkRequest(createListOffsetRequest(1));
         checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
         checkResponse(createListOffsetResponse(1), 1);
@@ -641,6 +644,16 @@ public class RequestResponseTest {
         return new LeaveGroupResponse(Errors.NONE);
     }
 
+    private DeleteGroupsRequest createDeleteGroupsRequest() {
+        return new DeleteGroupsRequest.Builder(Collections.singleton("test-group")).build();
+    }
+
+    private DeleteGroupsResponse createDeleteGroupsResponse() {
+        Map<String, Errors> result = new HashMap<>();
+        result.put("test-group", Errors.NONE);
+        return new DeleteGroupsResponse(result);
+    }
+
     @SuppressWarnings("deprecation")
     private ListOffsetRequest createListOffsetRequest(int version) {
         if (version == 0) {
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index fa6333c..6dd2272 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -32,7 +32,7 @@ object AclCommand extends Logging {
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
     Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
-    Group -> Set(Read, Describe, All),
+    Group -> Set(Read, Describe, Delete, All),
     Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
     TransactionalId -> Set(Describe, Write, All),
     DelegationToken -> Set(Describe, All)
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index c092169..772277f 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -369,6 +369,49 @@ class AdminClient(val time: Time,
     (response.error, response.tokens().asScala.toList)
   }
 
+  def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
+
+    def coordinatorLookup(group: String): Either[Node, Errors] = {
+      try {
+        Left(findCoordinator(group))
+      } catch {
+        case e: Throwable =>
+          if (e.isInstanceOf[TimeoutException])
+            Right(Errors.COORDINATOR_NOT_AVAILABLE)
+          else
+            Right(Errors.forException(e))
+      }
+    }
+
+    var errors: Map[String, Errors] = Map()
+    var groupsPerCoordinator: Map[Node, List[String]] = Map()
+
+    groups.foreach { group =>
+      coordinatorLookup(group) match {
+        case Right(error) =>
+          errors += group -> error
+        case Left(coordinator) =>
+          groupsPerCoordinator.get(coordinator) match {
+            case Some(gList) =>
+              val gListNew = group :: gList
+              groupsPerCoordinator += coordinator -> gListNew
+            case None =>
+              groupsPerCoordinator += coordinator -> List(group)
+          }
+      }
+    }
+
+    groupsPerCoordinator.foreach { case (coordinator, groups) =>
+      val responseBody = send(coordinator, ApiKeys.DELETE_GROUPS, new DeleteGroupsRequest.Builder(groups.toSet.asJava))
+      val response = responseBody.asInstanceOf[DeleteGroupsResponse]
+      groups.foreach {
+        case group if response.hasError(group) => errors += group -> response.errors.get(group)
+        case group => errors += group -> Errors.NONE
+      }
+    }
+
+    errors
+  }
 
   def close() {
     running = false
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index f21b942..2ae03aa 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -429,9 +429,10 @@ object AdminUtils extends Logging with AdminUtilities {
    * @param topic Topic of the consumer group information we wish to delete
    */
   @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
-  def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
+  def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String): Set[String] = {
     val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
     groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+    groups
   }
 
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 3aa821c..77c5b4d 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -75,12 +75,8 @@ object ConsumerGroupCommand extends Logging {
         consumerGroupService.listGroups().foreach(println(_))
       else if (opts.options.has(opts.describeOpt))
         consumerGroupService.describeGroup()
-      else if (opts.options.has(opts.deleteOpt)) {
-        consumerGroupService match {
-          case service: ZkConsumerGroupService => service.deleteGroups()
-          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
-        }
-      }
+      else if (opts.options.has(opts.deleteOpt))
+        consumerGroupService.deleteGroups()
       else if (opts.options.has(opts.resetOffsetsOpt)) {
         val offsetsToReset = consumerGroupService.resetOffsets()
         if (opts.options.has(opts.exportOpt)) {
@@ -344,6 +340,8 @@ object ConsumerGroupCommand extends Logging {
     def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
 
     def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
+
+    def deleteGroups(): Map[String, Errors]
   }
 
   @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
@@ -362,13 +360,15 @@ object ConsumerGroupCommand extends Logging {
       zkUtils.getConsumerGroups().toList
     }
 
-    def deleteGroups() {
+    def deleteGroups(): Map[String, Errors] = {
       if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt))
-        deleteForTopic()
+        deleteGroupsInfoForTopic()
       else if (opts.options.has(opts.groupOpt))
-        deleteForGroup()
+        deleteGroupsInfo()
       else if (opts.options.has(opts.topicOpt))
-        deleteAllForTopic()
+        deleteAllGroupsInfoForTopic()
+
+      Map()
     }
 
     def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
@@ -476,45 +476,57 @@ object ConsumerGroupCommand extends Logging {
       }.toMap
     }
 
-    private def deleteForGroup() {
+    private def deleteGroupsInfo(): Map[String, Errors] = {
       val groups = opts.options.valuesOf(opts.groupOpt)
-      groups.asScala.foreach { group =>
+      groups.asScala.map { group =>
         try {
-          if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
+          if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) {
             println(s"Deleted all consumer group information for group '$group' in zookeeper.")
-          else
+            group -> Errors.NONE
+          }
+          else {
             printError(s"Delete for group '$group' failed because its consumers are still active.")
+            group -> Errors.NON_EMPTY_GROUP
+          }
         }
         catch {
           case e: ZkNoNodeException =>
             printError(s"Delete for group '$group' failed because group does not exist.", Some(e))
+            group -> Errors.forException(e)
         }
-      }
+      }.toMap
     }
 
-    private def deleteForTopic() {
+    private def deleteGroupsInfoForTopic(): Map[String, Errors] = {
       val groups = opts.options.valuesOf(opts.groupOpt)
       val topic = opts.options.valueOf(opts.topicOpt)
       Topic.validate(topic)
-      groups.asScala.foreach { group =>
+      groups.asScala.map { group =>
         try {
-          if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+          if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) {
             println(s"Deleted consumer group information for group '$group' topic '$topic' in zookeeper.")
-          else
+            group -> Errors.NONE
+          }
+          else {
             printError(s"Delete for group '$group' topic '$topic' failed because its consumers are still active.")
+            group -> Errors.NON_EMPTY_GROUP
+          }
         }
         catch {
           case e: ZkNoNodeException =>
             printError(s"Delete for group '$group' topic '$topic' failed because group does not exist.", Some(e))
+            group -> Errors.forException(e)
         }
-      }
+      }.toMap
     }
 
-    private def deleteAllForTopic() {
+    private def deleteAllGroupsInfoForTopic(): Map[String, Errors] = {
       val topic = opts.options.valueOf(opts.topicOpt)
       Topic.validate(topic)
-      AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
+      val deletedGroups = AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
       println(s"Deleted consumer group information for all inactive consumer groups for topic '$topic' in zookeeper.")
+      deletedGroups.map(_ -> Errors.NONE).toMap
+
     }
 
     private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
@@ -830,6 +842,27 @@ object ConsumerGroupCommand extends Logging {
       rows.foldRight("")(_ + "\n" + _)
     }
 
+    override def deleteGroups(): Map[String, Errors] = {
+      val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList
+      val result = adminClient.deleteConsumerGroups(groupsToDelete)
+      val successfullyDeleted = result.filter {
+        case (_, error) => error == Errors.NONE
+      }.keySet
+
+      if (successfullyDeleted.size == result.size)
+        println(s"Deletion of requested consumer groups (${successfullyDeleted.mkString("'", ", ", "'")}) was successful.")
+      else {
+        printError("Deletion of some consumer groups failed:")
+        result.foreach {
+          case (group, error) if error != Errors.NONE => println(s"* Group '$group' could not be deleted due to: ${error.toString}")
+          case _ => // no need to print successful deletions individually
+        }
+        if (successfullyDeleted.nonEmpty)
+          println(s"\nThese consumer groups were deleted successfully: ${successfullyDeleted.mkString("'", ", ", "'")}")
+      }
+
+      result
+    }
   }
 
   sealed trait LogOffsetResult
@@ -987,10 +1020,9 @@ object ConsumerGroupCommand extends Logging {
             s"The new consumer is used by default if the $bootstrapServerOpt option is provided.")
         }
 
-        if (options.has(deleteOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " +
-            "there is no need to delete group metadata for the new consumer as the group is deleted when the last " +
-            "committed offset for that group expires.")
+        if (options.has(deleteOpt) && options.has(topicOpt))
+          CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the option $topicOpt is only " +
+            s"valid with $zkConnectOpt. The new consumer does not support topic-specific offset deletion from a consumer group.")
       }
 
       if (describeOptPresent)
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index e509fc5..f95fb89 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -72,7 +72,9 @@ object ApiVersion {
     "0.11.0" -> KAFKA_0_11_0_IV2,
     // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
     "1.0-IV0" -> KAFKA_1_0_IV0,
-    "1.0" -> KAFKA_1_0_IV0
+    "1.0" -> KAFKA_1_0_IV0,
+    // Introduced DeleteGroupsRequest V0 via KIP-229
+    "1.1-IV0" -> KAFKA_1_1_IV0
   )
 
   private val versionPattern = "\\.".r
@@ -184,3 +186,9 @@ case object KAFKA_1_0_IV0 extends ApiVersion {
   val id: Int = 13
 }
 
+case object KAFKA_1_1_IV0 extends ApiVersion {
+  val version: String = "1.1-IV0"
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val id: Int = 14
+}
+
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index ee4fc4b..5ae8552 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -338,6 +338,52 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
+    if (!isActive.get) {
+      groupIds.map(_ -> Errors.COORDINATOR_NOT_AVAILABLE).toMap
+    } else {
+      var groupErrors: Map[String, Errors] = Map()
+      var eligibleGroups: Seq[GroupMetadata] = Seq()
+
+      groupIds.foreach { groupId =>
+        if (!validGroupId(groupId))
+          groupErrors += groupId -> Errors.INVALID_GROUP_ID
+        else if (!isCoordinatorForGroup(groupId))
+          groupErrors += groupId -> Errors.NOT_COORDINATOR
+        else if (isCoordinatorLoadInProgress(groupId))
+          groupErrors += groupId -> Errors.COORDINATOR_LOAD_IN_PROGRESS
+        else {
+          groupManager.getGroup(groupId) match {
+            case None =>
+              groupErrors += groupId ->
+                (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
+            case Some(group) =>
+              group.inLock {
+                group.currentState match {
+                  case Dead =>
+                    groupErrors += groupId ->
+                      (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
+                  case Empty =>
+                    group.transitionTo(Dead)
+                    eligibleGroups :+= group
+                  case _ =>
+                    groupErrors += groupId -> Errors.NON_EMPTY_GROUP
+                }
+              }
+          }
+        }
+      }
+
+      if (eligibleGroups.nonEmpty) {
+        groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue)
+        groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap
+        info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}")
+      }
+
+      groupErrors
+    }
+  }
+
   def handleHeartbeat(groupId: String,
                       memberId: String,
                       generationId: Int,
@@ -522,7 +568,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
-    groupManager.cleanupGroupMetadata(Some(topicPartitions))
+    groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds())
   }
 
   private def validateGroup(groupId: String): Option[Errors] = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 6599698..3391fc3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -158,6 +158,13 @@ class GroupMetadataManager(brokerId: Int,
 
   def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
 
+  // return true iff group is owned and the group doesn't exist
+  def groupNotExists(groupId: String) = inLock(partitionLock) {
+    isGroupLocal(groupId) && getGroup(groupId).forall { group =>
+      group.inLock(group.is(Dead))
+    }
+  }
+
   // visible for testing
   private[group] def isGroupOpenForProducer(producerId: Long, groupId: String) = openGroupsForProducer.get(producerId) match {
     case Some(groups) =>
@@ -706,14 +713,16 @@ class GroupMetadataManager(brokerId: Int,
 
   // visible for testing
   private[group] def cleanupGroupMetadata(): Unit = {
-    cleanupGroupMetadata(None)
+    cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds())
   }
 
-  def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) {
-    val startMs = time.milliseconds()
+  def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]],
+                           groups: Iterable[GroupMetadata],
+                           startMs: Long) {
     var offsetsRemoved = 0
 
-    groupMetadataCache.foreach { case (groupId, group) =>
+    groups.foreach { group =>
+      val groupId = group.groupId
       val (removedOffsets, groupIsDead, generation) = group.inLock {
         val removedOffsets = deletedTopicPartitions match {
           case Some(topicPartitions) => group.removeOffsets(topicPartitions)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 13f5164..2dd6951 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -141,7 +141,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
         case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
-        case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> handleDescribeTokensRequest(request)
+        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
+        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
     val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
 
-    if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource))    
+    if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource))
       for (topicPartition <- fetchRequest.fetchData.asScala.keys)
         unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED,
           FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
@@ -1221,6 +1222,21 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
+    val deleteGroupsRequest = request.body[DeleteGroupsRequest]
+    var groups = deleteGroupsRequest.groups.asScala.toSet
+
+    val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
+      authorize(request.session, Delete, new Resource(Group, group))
+    }
+
+    val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
+      unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED)
+
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new DeleteGroupsResponse(requestThrottleMs, groupDeletionResult.asJava))
+  }
+
   def handleHeartbeatRequest(request: RequestChannel.Request) {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 39c1ea3..248d219 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -78,6 +78,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
   val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
   val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
   val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
@@ -125,6 +126,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse],
       ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse],
       ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse],
+      ApiKeys.DELETE_GROUPS -> classOf[DeleteGroupsResponse],
       ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse],
       ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
       ApiKeys.CONTROLLED_SHUTDOWN -> classOf[requests.ControlledShutdownResponse],
@@ -162,6 +164,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
     ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
     ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
+    ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
     ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
@@ -202,6 +205,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl,
     ApiKeys.HEARTBEAT -> groupReadAcl,
     ApiKeys.LEAVE_GROUP -> groupReadAcl,
+    ApiKeys.DELETE_GROUPS -> groupDeleteAcl,
     ApiKeys.LEADER_AND_ISR -> clusterAcl,
     ApiKeys.STOP_REPLICA -> clusterAcl,
     ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
@@ -343,6 +347,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
 
+  private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(Set(group).asJava).build()
+
   private def leaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
       Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
@@ -468,7 +474,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
       ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
-      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
+      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -495,7 +502,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testFetchFollowerRequest() {
     val key = ApiKeys.FETCH
     val request = createFetchFollowerRequest
-      
+
     removeAllAcls()
     val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
@@ -503,7 +510,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val readAcls = topicReadAcl.get(topicResource).get
     addAndVerifyAcls(readAcls, topicResource)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
- 
+
     val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
     addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
@@ -961,6 +968,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testDeleteGroupApiWithDeleteGroupAcl() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
+    val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+    assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+  }
+
+  @Test
+  def testDeleteGroupApiWithNoDeleteGroupAcl() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
+    val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+    assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+  }
+
+  @Test
+  def testDeleteGroupApiWithNoDeleteGroupAcl2() {
+    val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+    assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+  }
+
+  @Test
   def testUnauthorizedDeleteTopicsWithoutDescribe() {
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
new file mode 100644
index 0000000..cc236d5
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import kafka.admin.ConsumerGroupCommandTest
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert._
+import org.junit.Test
+
+class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
+
+  @Test(expected = classOf[joptsimple.OptionException])
+  def testDeleteWithTopicOption() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic")
+    getConsumerGroupService(cgcArgs)
+    fail("Expected an error due to presence of mutually exclusive options")
+  }
+
+  @Test
+  def testDeleteCmdNonExistingGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val missingGroup = "missing.group"
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
+    val service = getConsumerGroupService(cgcArgs)
+
+    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+    assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
+        output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND.toString}"))
+  }
+
+  @Test
+  def testDeleteNonExistingGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val missingGroup = "missing.group"
+
+    // note the group to be deleted is a different (non-existing) group
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
+    val service = getConsumerGroupService(cgcArgs)
+
+    val result = service.deleteGroups()
+    assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
+      result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
+  }
+
+  @Test
+  def testDeleteCmdInvalidGroupId() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val invalidGroupId = ""
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
+    val service = getConsumerGroupService(cgcArgs)
+
+    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
+      output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}"))
+  }
+
+  @Test
+  def testDeleteInvalidGroupId() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val invalidGroupId = ""
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
+    val service = getConsumerGroupService(cgcArgs)
+
+    val result = service.deleteGroups()
+    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
+      result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
+  }
+
+  @Test
+  def testDeleteCmdNonEmptyGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // run one consumer in the group
+    addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().contains(group)
+    }, "The group did not initialize as expected.")
+
+    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+    assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group",
+      output.contains(s"Group '$group' could not be deleted due to: ${Errors.NON_EMPTY_GROUP}"))
+  }
+
+  @Test
+  def testDeleteNonEmptyGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // run one consumer in the group
+    addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().contains(group)
+    }, "The group did not initialize as expected.")
+
+    val result = service.deleteGroups()
+    assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group",
+      result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NON_EMPTY_GROUP))
+  }
+
+  @Test
+  def testDeleteCmdEmptyGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // run one consumer in the group
+    val executor = addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().contains(group)
+    }, "The group did not initialize as expected.")
+
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      service.collectGroupState().state == "Empty"
+    }, "The group did become empty as expected.")
+
+    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+    assertTrue(s"The consumer group could not be deleted as expected",
+      output.contains(s"Deletion of requested consumer groups ('$group') was successful."))
+  }
+
+  @Test
+  def testDeleteEmptyGroup() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+
+    // run one consumer in the group
+    val executor = addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().contains(group)
+    }, "The group did not initialize as expected.")
+
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      service.collectGroupState().state == "Empty"
+    }, "The group did become empty as expected.")
+
+    val result = service.deleteGroups()
+    assertTrue(s"The consumer group could not be deleted as expected",
+      result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+  }
+
+  @Test
+  def testDeleteCmdWithMixOfSuccessAndError() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val missingGroup = "missing.group"
+
+    // run one consumer in the group
+    val executor = addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().contains(group)
+    }, "The group did not initialize as expected.")
+
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      service.collectGroupState().state == "Empty"
+    }, "The group did become empty as expected.")
+
+    val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
+    val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
+    assertTrue(s"The consumer group deletion did not work as expected",
+      output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND}") &&
+      output.contains(s"These consumer groups were deleted successfully: '$group'"))
+  }
+
+  @Test
+  def testDeleteWithMixOfSuccessAndError() {
+    TestUtils.createOffsetsTopic(zkClient, servers)
+    val missingGroup = "missing.group"
+
+    // run one consumer in the group
+    val executor = addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      service.listGroups().contains(group)
+    }, "The group did not initialize as expected.")
+
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      service.collectGroupState().state == "Empty"
+    }, "The group did become empty as expected.")
+
+    val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
+    val result = service2.deleteGroups()
+    assertTrue(s"The consumer group deletion did not work as expected",
+      result.size == 2 &&
+        result.keySet.contains(group) && result.get(group).contains(Errors.NONE) &&
+        result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
+  }
+
+  @Test
+  def testDeleteCmdWithShortInitialization() {
+    // run one consumer in the group
+    val executor = addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
+    assertTrue(s"The consumer group deletion did not work as expected",
+      output.contains(s"Group '$group' could not be deleted due to: ${Errors.COORDINATOR_NOT_AVAILABLE}"))
+  }
+
+  @Test
+  def testDeleteWithShortInitialization() {
+    // run one consumer in the group
+    val executor = addConsumerGroupExecutor(numConsumers = 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    val result = service.deleteGroups()
+    assertTrue(s"The consumer group deletion did not work as expected",
+      result.size == 1 &&
+        result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE))
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index a62e7aa..2c9e81d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
+import kafka.cluster.Partition
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
@@ -1271,6 +1272,93 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testDeleteNonEmptyGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
+  }
+
+  @Test
+  def testDeleteGroupWithInvalidGroupId() {
+    val invalidGroupId = ""
+    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
+    assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
+  }
+
+  @Test
+  def testDeleteGroupWithWrongCoordinator() {
+    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
+    assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
+  }
+
+  @Test
+  def testDeleteEmptyGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
+    assertEquals(Errors.NONE, leaveGroupResult)
+
+    val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val partition = EasyMock.niceMock(classOf[Partition])
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.replay(replicaManager, partition)
+
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
+  }
+
+  @Test
+  def testDeleteEmptyGroupWithStoredOffsets() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Stable.toString, describeGroupResult._2.state)
+    assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
+    assertEquals(Errors.NONE, leaveGroupResult)
+
+    val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val partition = EasyMock.niceMock(classOf[Partition])
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.replay(replicaManager, partition)
+
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
+
+    assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
+  }
+
+  @Test
   def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() {
     val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
     timer.advanceClock(GroupInitialRebalanceDelay / 2)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 62ebf29..b358c4e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -497,6 +497,26 @@ class GroupMetadataManagerTest {
     }
   }
 
+  @Test
+  def testGroupNotExists() {
+    // group is not owned
+    assertFalse(groupMetadataManager.groupNotExists(groupId))
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+    // group is owned but does not exist yet
+    assertTrue(groupMetadataManager.groupNotExists(groupId))
+
+    val group = new GroupMetadata(groupId, initialState = Empty)
+    groupMetadataManager.addGroup(group)
+
+    // group is owned but not Dead
+    assertFalse(groupMetadataManager.groupNotExists(groupId))
+
+    group.transitionTo(Dead)
+    // group is owned and Dead
+    assertTrue(groupMetadataManager.groupNotExists(groupId))
+  }
+
   private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = {
     val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset)
     val commitRecords = createCommittedOffsetRecords(offsets)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a90fa64..bfbae2b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -312,12 +312,15 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
           new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
 
-        case ApiKeys.DESCRIBE_DELEGATION_TOKEN=>
+        case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
           new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
 
-        case ApiKeys.RENEW_DELEGATION_TOKEN=>
+        case ApiKeys.RENEW_DELEGATION_TOKEN =>
           new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
 
+        case ApiKeys.DELETE_GROUPS =>
+          new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -416,6 +419,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
+      case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message