kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5526; Additional `--describe` views for ConsumerGroupCommand (KIP-175)
Date Fri, 15 Dec 2017 18:30:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 69777260e -> 529786638


KAFKA-5526; Additional `--describe` views for ConsumerGroupCommand (KIP-175)

The `--describe` option of ConsumerGroupCommand is expanded, as proposed in [KIP-175](https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A+Additional+%27--describe%27+views+for+ConsumerGroupCommand), to support:
* `--describe` or `--describe --offsets`: listing of current group offsets
* `--describe --members` or `--describe --members --verbose`: listing of group members
* `--describe --state`: group status

Example: With a single partition topic `test1` and a double partition topic `test2`, consumers `consumer1` and `consumer11` subscribed to `test`, consumers `consumer2` and `consumer22` and `consumer222` subscribed to `test2`, and all consumers belonging to group `test-group`, this is an output example of the new options above for `test-group`:

```
--describe, or --describe --offsets:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test2           0          0               0               0               consumer2-bad9496d-0889-47ab-98ff-af17d9460382  /127.0.0.1      consumer2
test2           1          0               0               0               consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1      consumer22
test1           0          0               0               0               consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf  /127.0.0.1      consumer1
```

```
--describe --members

CONSUMER-ID                                      HOST            CLIENT-ID       #PARTITIONS
consumer2-bad9496d-0889-47ab-98ff-af17d9460382   /127.0.0.1      consumer2       1
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1      consumer222     0
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760  /127.0.0.1      consumer11      0
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411  /127.0.0.1      consumer22      1
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf   /127.0.0.1      consumer1       1
```

```
--describe --members --verbose

CONSUMER-ID                                      HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
consumer2-bad9496d-0889-47ab-98ff-af17d9460382   /127.0.0.1      consumer2       1               test2(0)
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1      consumer222     0               -
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760  /127.0.0.1      consumer11      0               -
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411  /127.0.0.1      consumer22      1               test2(1)
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf   /127.0.0.1      consumer1       1               test1(0)
```

```
--describe --state

COORDINATOR (ID)         ASSIGNMENT-STRATEGY       STATE                #MEMBERS
localhost:9092 (0)       range                     Stable               5
```

Note that this PR also addresses the issue reported in [KAFKA-6158](https://issues.apache.org/jira/browse/KAFKA-6158) by dynamically setting the width of columns `TOPIC`, `CONSUMER-ID`, `HOST`, `CLIENT-ID` and `COORDINATOR (ID)`. This avoid truncation of column values when they go over the current fixed width of these columns.

The code has been restructured to better support testing of individual values and also the console output. Unit tests have been updated and extended to take advantage of this restructuring.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #4271 from vahidhashemian/KAFKA-5526


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/52978663
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/52978663
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/52978663

Branch: refs/heads/trunk
Commit: 529786638baeb4335065df4e2b240aad42caca9a
Parents: 6977726
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Dec 15 10:26:00 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Dec 15 10:26:00 2017 -0800

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 309 +++++++---
 .../kafka/admin/DescribeConsumerGroupTest.scala | 597 ++++++++++++++++---
 .../admin/ResetConsumerGroupOffsetTest.scala    |   6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  26 +-
 4 files changed, 764 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 9e35ebc..918593b 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -20,14 +20,15 @@ package kafka.admin
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Date, Properties}
 import javax.xml.datatype.DatatypeFactory
-
 import joptsimple.{OptionParser, OptionSpec}
+
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
 import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
-import kafka.utils.Implicits._
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
+import kafka.utils.Implicits._
+
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
@@ -53,7 +54,7 @@ object ConsumerGroupCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
 
     // should have exactly one action
-    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has _)
+    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has)
     if (actions != 1)
       CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offset")
 
@@ -61,10 +62,10 @@ object ConsumerGroupCommand extends Logging {
 
     val consumerGroupService = {
       if (opts.useOldConsumer) {
-        System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n")
+        Console.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).")
         new ZkConsumerGroupService(opts)
       } else {
-        System.err.println("Note: This will not show information about old Zookeeper-based consumers.\n")
+        Console.err.println("Note: This will not show information about old Zookeeper-based consumers.")
         new KafkaConsumerGroupService(opts)
       }
     }
@@ -72,34 +73,8 @@ object ConsumerGroupCommand extends Logging {
     try {
       if (opts.options.has(opts.listOpt))
         consumerGroupService.listGroups().foreach(println(_))
-      else if (opts.options.has(opts.describeOpt)) {
-        val (state, assignments) = consumerGroupService.describeGroup()
-        val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
-        assignments match {
-          case None =>
-            // applies to both old and new consumer
-            printError(s"The consumer group '$groupId' does not exist.")
-          case Some(assignments) =>
-            if (opts.useOldConsumer)
-              printAssignment(assignments, false)
-            else
-              state match {
-                case Some("Dead") =>
-                  printError(s"Consumer group '$groupId' does not exist.")
-                case Some("Empty") =>
-                  System.err.println(s"Consumer group '$groupId' has no active members.")
-                  printAssignment(assignments, true)
-                case Some("PreparingRebalance") | Some("CompletingRebalance") =>
-                  System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
-                  printAssignment(assignments, true)
-                case Some("Stable") =>
-                  printAssignment(assignments, true)
-                case other =>
-                  // the control should never reach here
-                  throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
-              }
-        }
-      }
+      else if (opts.options.has(opts.describeOpt))
+        consumerGroupService.describeGroup()
       else if (opts.options.has(opts.deleteOpt)) {
         consumerGroupService match {
           case service: ZkConsumerGroupService => service.deleteGroups()
@@ -129,23 +104,6 @@ object ConsumerGroupCommand extends Logging {
     e.foreach(debug("Exception in consumer group command", _))
   }
 
-  def printAssignment(groupAssignment: Seq[PartitionAssignmentState], useNewConsumer: Boolean): Unit = {
-    print("\n%-30s %-10s %-15s %-15s %-10s %-50s".format("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID"))
-    if (useNewConsumer)
-      print("%-30s %s".format("HOST", "CLIENT-ID"))
-    println()
-
-    groupAssignment.foreach { consumerAssignment =>
-      print("%-30s %-10s %-15s %-15s %-10s %-50s".format(
-        consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
-        consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
-        consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
-      if (useNewConsumer)
-        print("%-30s %s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
-      println()
-    }
-  }
-
   def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
     print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
     println()
@@ -165,12 +123,151 @@ object ConsumerGroupCommand extends Logging {
                                                 consumerId: Option[String], host: Option[String],
                                                 clientId: Option[String], logEndOffset: Option[Long])
 
+  protected case class MemberAssignmentState(group: String, consumerId: String, host: String, clientId: String,
+                                             numPartitions: Int, assignment: List[TopicPartition])
+
+  protected case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int)
+
   sealed trait ConsumerGroupService {
 
     def listGroups(): List[String]
 
-    def describeGroup(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
-      collectGroupAssignment(opts.options.valueOf(opts.groupOpt))
+    private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]): Boolean = {
+      // numRows contains the number of data rows, if any, compiled from the API call in the caller method.
+      // if it's undefined or 0, there is no relevant group information to display.
+      numRows match {
+        case None =>
+          // applies to both old and new consumer
+          printError(s"The consumer group '$group' does not exist.")
+          false
+        case Some(num) =>
+          opts.useOldConsumer || {
+            state match {
+              case Some("Dead") =>
+                printError(s"Consumer group '$group' does not exist.")
+              case Some("Empty") =>
+                Console.err.println(s"Consumer group '$group' has no active members.")
+              case Some("PreparingRebalance") | Some("CompletingRebalance") =>
+                Console.err.println(s"Warning: Consumer group '$group' is rebalancing.")
+              case Some("Stable") =>
+              case other =>
+                // the control should never reach here
+                throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
+            }
+            state != Some("Dead") && num > 0
+          }
+      }
+    }
+
+    private def size(colOpt: Option[Seq[Object]]): Option[Int] = colOpt.map(_.size)
+
+    private def printOffsets(group: String, state: Option[String], assignments: Option[Seq[PartitionAssignmentState]]): Unit = {
+      if (shouldPrintMemberState(group, state, size(assignments))) {
+        // find proper columns width
+        var (maxTopicLen, maxConsumerIdLen, maxHostLen) = (15, 15, 15)
+        assignments match {
+          case None => // do nothing
+          case Some(consumerAssignments) =>
+            consumerAssignments.foreach { consumerAssignment =>
+              maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE).length)
+              maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE).length)
+              if (!opts.useOldConsumer)
+                maxHostLen = Math.max(maxHostLen, consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE).length)
+            }
+        }
+
+        print(s"\n%${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s "
+          .format("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID"))
+
+        if (!opts.useOldConsumer)
+          print(s"%${-maxHostLen}s %s".format("HOST", "CLIENT-ID"))
+        println()
+
+        assignments match {
+          case None => // do nothing
+          case Some(consumerAssignments) =>
+            consumerAssignments.foreach { consumerAssignment =>
+              print(s"%${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s ".format(
+                consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
+                consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
+                consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
+              if (!opts.useOldConsumer)
+                print(s"%${-maxHostLen}s %s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE),
+                  consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
+              println()
+            }
+        }
+      }
+    }
+
+    private def printMembers(group: String, state: Option[String], assignments: Option[Seq[MemberAssignmentState]], verbose: Boolean): Unit = {
+      if (shouldPrintMemberState(group, state, size(assignments))) {
+        // find proper columns width
+        var (maxConsumerIdLen, maxHostLen, maxClientIdLen) = (15, 15, 15)
+        assignments match {
+          case None => // do nothing
+          case Some(memberAssignments) =>
+            memberAssignments.foreach { memberAssignment =>
+              maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId.length)
+              maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
+              maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length)
+            }
+        }
+
+        print(s"\n%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s "
+          .format("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"))
+        if (verbose)
+          print("%s".format("ASSIGNMENT"))
+        println()
+
+        assignments match {
+          case None => // do nothing
+          case Some(memberAssignments) =>
+            memberAssignments.foreach { memberAssignment =>
+              print(s"%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s ".format(
+                memberAssignment.consumerId, memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions))
+              if (verbose) {
+                val partitions = memberAssignment.assignment match {
+                  case List() => MISSING_COLUMN_VALUE
+                  case assignment =>
+                    assignment.groupBy(_.topic).map {
+                      case (topic, partitionList) => topic + partitionList.map(_.partition).sorted.mkString("(", ",", ")")
+                    }.toList.sorted.mkString(", ")
+                }
+                print("%s".format(partitions))
+              }
+              println()
+            }
+        }
+      }
+    }
+
+    private def printState(group: String, state: GroupState): Unit = {
+      // this method is reachable only for the new consumer option (where the given state is always defined)
+      if (shouldPrintMemberState(group, Some(state.state), Some(1))) {
+        val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
+        val coordinatorColLen = Math.max(25, coordinator.length)
+        print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format("COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
+        print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format(coordinator, state.assignmentStrategy, state.state, state.numMembers))
+        println()
+      }
+    }
+
+    def describeGroup(): Unit = {
+      val group = opts.options.valuesOf(opts.groupOpt).asScala.head
+      val membersOptPresent = opts.options.has(opts.membersOpt)
+      val stateOptPresent = opts.options.has(opts.stateOpt)
+      val offsetsOptPresent = opts.options.has(opts.offsetsOpt)
+      val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)
+
+      if (subActions == 0 || offsetsOptPresent) {
+        val offsets = collectGroupOffsets()
+        printOffsets(group, offsets._1, offsets._2)
+      } else if (membersOptPresent) {
+        val members = collectGroupMembers(opts.options.has(opts.verboseOpt))
+        printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))
+      } else
+        printState(group, collectGroupState())
     }
 
     def close(): Unit
@@ -179,7 +276,11 @@ object ConsumerGroupCommand extends Logging {
 
     protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult
 
-    protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]])
+    def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]])
+
+    def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = throw new UnsupportedOperationException
+
+    def collectGroupState(): GroupState = throw new UnsupportedOperationException
 
     protected def collectConsumerAssignment(group: String,
                                             coordinator: Option[Node],
@@ -204,7 +305,7 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    protected def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] =
+    private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] =
       offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
 
     private def describePartition(group: String,
@@ -258,7 +359,8 @@ object ConsumerGroupCommand extends Logging {
         deleteAllForTopic()
     }
 
-    protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
+    def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
+      val group = opts.options.valueOf(opts.groupOpt)
       val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
       val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt
       val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt
@@ -418,13 +520,14 @@ object ConsumerGroupCommand extends Logging {
     private val adminClient = createAdminClient()
 
     // `consumer` is only needed for `describe`, so we instantiate it lazily
-    private var consumer: KafkaConsumer[String, String] = null
+    private var consumer: KafkaConsumer[String, String] = _
 
     def listGroups(): List[String] = {
       adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
     }
 
-    protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
+    def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
+      val group = opts.options.valueOf(opts.groupOpt)
       val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
       (Some(consumerGroupSummary.state),
         consumerGroupSummary.consumers match {
@@ -437,7 +540,7 @@ object ConsumerGroupCommand extends Logging {
               if (offsets.isEmpty)
                 List[PartitionAssignmentState]()
               else {
-                consumers.sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary =>
+                consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary =>
                   val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
                   assignedTopicPartitions = assignedTopicPartitions ++ consumerSummary.assignment
                   val partitionOffsets: Map[TopicAndPartition, Option[Long]] = consumerSummary.assignment.map { topicPartition =>
@@ -449,37 +552,51 @@ object ConsumerGroupCommand extends Logging {
                 }
               }
 
-            val rowsWithoutConsumer = offsets.filterNot {
-              case (topicPartition, offset) => assignedTopicPartitions.contains(topicPartition)
-              }.flatMap {
-                case (topicPartition, offset) =>
-                  val topicAndPartition = new TopicAndPartition(topicPartition)
-                  collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
-                      Map(topicAndPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
-                      Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
-                }
+            val rowsWithoutConsumer = offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
+              case (topicPartition, offset) =>
+                val topicAndPartition = new TopicAndPartition(topicPartition)
+                collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
+                    Map(topicAndPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
+                    Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+            }
 
             Some(rowsWithConsumer ++ rowsWithoutConsumer)
       }
       )
     }
 
+    override def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {
+      val group = opts.options.valueOf(opts.groupOpt)
+      val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
+      (Some(consumerGroupSummary.state),
+        consumerGroupSummary.consumers.map(_.map {
+          consumer => MemberAssignmentState(group, consumer.consumerId, consumer.host, consumer.clientId, consumer.assignment.length,
+            if (verbose) consumer.assignment else List())
+        })
+      )
+    }
+
+    override def collectGroupState(): GroupState = {
+      val group = opts.options.valueOf(opts.groupOpt)
+      val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
+      GroupState(group, consumerGroupSummary.coordinator, consumerGroupSummary.assignmentStrategy,
+        consumerGroupSummary.state, consumerGroupSummary.consumers.get.size)
+    }
+
     protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      val consumer = getConsumer()
-      val offsets = consumer.endOffsets(List(topicPartition).asJava)
+      val offsets = getConsumer.endOffsets(List(topicPartition).asJava)
       val logStartOffset = offsets.get(topicPartition)
       LogOffsetResult.LogOffset(logStartOffset)
     }
 
     protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      val consumer = getConsumer()
-      val offsets = consumer.beginningOffsets(List(topicPartition).asJava)
+      val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava)
       val logStartOffset = offsets.get(topicPartition)
       LogOffsetResult.LogOffset(logStartOffset)
     }
 
     protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long): LogOffsetResult = {
-      val consumer = getConsumer()
+      val consumer = getConsumer
       consumer.assign(List(topicPartition).asJava)
       val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava)
       if (offsetsForTimes != null && !offsetsForTimes.isEmpty && offsetsForTimes.get(topicPartition) != null)
@@ -500,7 +617,7 @@ object ConsumerGroupCommand extends Logging {
       AdminClient.create(props)
     }
 
-    private def getConsumer() = {
+    private def getConsumer = {
       if (consumer == null)
         consumer = createNewConsumer()
       consumer
@@ -531,7 +648,7 @@ object ConsumerGroupCommand extends Logging {
           val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
           val dryRun = opts.options.has(opts.dryRunOpt)
           if (!dryRun)
-            getConsumer().commitSync(preparedOffsets.asJava)
+            getConsumer.commitSync(preparedOffsets.asJava)
           preparedOffsets
         case currentState =>
           printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
@@ -544,7 +661,7 @@ object ConsumerGroupCommand extends Logging {
         val topicAndPartitions = topicArg.split(":")
         val topic = topicAndPartitions(0)
         topicAndPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt))
-      case topic => getConsumer().partitionsFor(topic).asScala
+      case topic => getConsumer.partitionsFor(topic).asScala
         .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
     }
 
@@ -677,7 +794,7 @@ object ConsumerGroupCommand extends Logging {
         try {
           new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
         } catch {
-          case e: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
+          case _: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
         }
       }
       date.getTime
@@ -736,7 +853,16 @@ object ConsumerGroupCommand extends Logging {
     val ResetToEarliestDoc = "Reset offsets to earliest offset."
     val ResetToLatestDoc = "Reset offsets to latest offset."
     val ResetToCurrentDoc = "Reset offsets to current offset."
-    val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative"
+    val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative."
+    val MembersDoc = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 --members"
+    val VerboseDoc = "Provide additional information, if any, when describing the group. This option may be used " +
+      "with '--offsets'/'--members'/'--state' and '--bootstrap-server' options only." + nl + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose"
+    val OffsetsDoc = "Describe the group and list all topic partitions in the group along with their offset lag. " +
+      "This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets"
+    val StateDoc = "Describe the group state. This option may be used with '--describe' and '--bootstrap-server' options only." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 --state"
 
     val parser = new OptionParser(false)
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
@@ -795,6 +921,20 @@ object ConsumerGroupCommand extends Logging {
                              .withRequiredArg()
                              .describedAs("number-of-offsets")
                              .ofType(classOf[Long])
+    val membersOpt = parser.accepts("members", MembersDoc)
+                           .availableIf(describeOpt)
+                           .availableUnless(zkConnectOpt)
+    val verboseOpt = parser.accepts("verbose", VerboseDoc)
+                           .availableIf(describeOpt)
+                           .availableUnless(zkConnectOpt)
+    val offsetsOpt = parser.accepts("offsets", OffsetsDoc)
+                           .availableIf(describeOpt)
+                           .availableUnless(zkConnectOpt)
+    val stateOpt = parser.accepts("state", StateDoc)
+                         .availableIf(describeOpt)
+                         .availableUnless(zkConnectOpt)
+    parser.mutuallyExclusive(membersOpt, offsetsOpt, stateOpt)
+
     val options = parser.parse(args : _*)
 
     val useOldConsumer = options.has(zkConnectOpt)
@@ -807,29 +947,30 @@ object ConsumerGroupCommand extends Logging {
     def checkArgs() {
       // check required args
       if (options.has(timeoutMsOpt) && (!describeOptPresent || useOldConsumer))
-        debug(s"Option '$timeoutMsOpt' is applicable only when both '$bootstrapServerOpt' and '$describeOpt' are used.")
+        debug(s"Option $timeoutMsOpt is applicable only when both $bootstrapServerOpt and $describeOpt are used.")
 
       if (useOldConsumer) {
         if (options.has(bootstrapServerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option '$bootstrapServerOpt' is not valid with '$zkConnectOpt'.")
+          CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.")
         else if (options.has(newConsumerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option '$newConsumerOpt' is not valid with '$zkConnectOpt'.")
+          CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.")
       } else {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
         if (options.has(newConsumerOpt)) {
-          Console.err.println("The --new-consumer option is deprecated and will be removed in a future major release." +
-            "The new consumer is used by default if the --bootstrap-server option is provided.")
+          Console.err.println(s"The $newConsumerOpt option is deprecated and will be removed in a future major release." +
+            s"The new consumer is used by default if the $bootstrapServerOpt option is provided.")
         }
 
         if (options.has(deleteOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option '$deleteOpt' is only valid with '$zkConnectOpt'. Note that " +
-            "there's no need to delete group metadata for the new consumer as the group is deleted when the last " +
+          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " +
+            "there is no need to delete group metadata for the new consumer as the group is deleted when the last " +
             "committed offset for that group expires.")
       }
 
       if (describeOptPresent)
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
+
       if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
         CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
       if (options.has(resetOffsetsOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index e367372..11f2865 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -22,29 +22,36 @@ import java.util.concurrent.TimeUnit
 import java.util.Collections
 import java.util.Properties
 
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
 import kafka.consumer.OldConsumer
 import kafka.consumer.Whitelist
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
 
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   private val topic = "foo"
   private val group = "test.group"
 
+  private val describeTypeOffsets = Array(Array(""), Array("--offsets"))
+  private val describeTypeMembers = Array(Array("--members"), Array("--members", "--verbose"))
+  private val describeTypeState = Array(Array("--state"))
+  private val describeTypes = describeTypeOffsets ++ describeTypeMembers ++ describeTypeState
+
   @deprecated("This field will be removed in a future release", "0.11.0.0")
   private val oldConsumers = new ArrayBuffer[OldConsumer]
-  private var consumerGroupService: ConsumerGroupService = _
-  private var consumerGroupExecutor: ConsumerGroupExecutor = _
+  private var consumerGroupService: List[ConsumerGroupService] = List()
+  private var consumerGroupExecutor: List[ConsumerGroupExecutor] = List()
 
   // configure the servers and clients
   override def generateConfigs = {
@@ -61,33 +68,30 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
   @After
   override def tearDown(): Unit = {
-    if (consumerGroupService != null)
-      consumerGroupService.close()
-    if (consumerGroupExecutor != null)
-      consumerGroupExecutor.shutdown()
+    consumerGroupService.foreach(_.close)
+    consumerGroupExecutor.foreach(_.shutdown)
     oldConsumers.foreach(_.stop())
     super.tearDown()
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
-  def testDescribeNonExistingGroup() {
+  def testDescribeNonExistingGroupWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
-    consumerGroupService = new ZkConsumerGroupService(opts)
-    TestUtils.waitUntilTrue(() => consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
+    TestUtils.waitUntilTrue(() => service.collectGroupOffsets()._2.isEmpty, "Expected no rows in describe group results.")
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
-  def testDescribeExistingGroup() {
+  def testDescribeExistingGroupWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
+
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
@@ -96,14 +100,13 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
-  def testDescribeExistingGroupWithNoMembers() {
+  def testDescribeExistingGroupWithNoMembersWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
@@ -111,7 +114,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     oldConsumers.head.stop()
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
@@ -120,14 +123,14 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
-  def testDescribeConsumersWithNoAssignedPartitions() {
+  def testDescribeConsumersWithNoAssignedPartitionsWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
+
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 2 &&
       assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
@@ -136,74 +139,242 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
-  def testDescribeNonExistingGroupWithNewConsumer() {
+  def testDescribeNonExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+    val missingGroup = "missing.group"
+
+    for (describeType <- describeTypes) {
+      // note the group to be queried is a different (non-existing) group
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", missingGroup) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      val output = TestUtils.grabConsoleOutput(service.describeGroup())
+      assertTrue(s"Expected error was not detected for describe option '${describeType.mkString(" ")}'",
+          output.contains(s"Consumer group '$missingGroup' does not exist."))
+    }
+  }
+
+  @Test(expected = classOf[joptsimple.OptionException])
+  def testDescribeWithMultipleSubActions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state")
+    getConsumerGroupService(cgcArgs)
+    fail("Expected an error due to presence of mutually exclusive options")
+  }
+
+  @Test
+  def testDescribeOffsetsOfNonExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+    // note the group to be queried is a different (non-existing) group
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val (state, assignments) = service.collectGroupOffsets()
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
+        state == Some("Dead") && assignments == Some(List()))
+  }
+
+  @Test
+  def testDescribeMembersOfNonExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
 
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
     // note the group to be queried is a different (non-existing) group
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
-    val (state, assignments) = consumerGroupService.describeGroup()
-    assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
+    val (state, assignments) = service.collectGroupMembers(false)
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
+        state == Some("Dead") && assignments == Some(List()))
+
+    val (state2, assignments2) = service.collectGroupMembers(true)
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).",
+        state2 == Some("Dead") && assignments2 == Some(List()))
   }
 
   @Test
-  def testDescribeExistingGroupWithNewConsumer() {
+  def testDescribeStateOfNonExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+    // note the group to be queried is a different (non-existing) group
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val state = service.collectGroupState()
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
+        state.state == "Dead" && state.numMembers == 0 &&
+        state.coordinator != null && servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    )
+  }
+
+  @Test
+  def testDescribeExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run one consumer in the group consuming from a single-partition topic
+      addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        output.trim.split("\n").length == 2 && error.isEmpty
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
+    }
+  }
 
+  @Test
+  def testDescribeOffsetsOfExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupService.describeGroup()
-        state == Some("Stable") &&
+      val (state, assignments) = service.collectGroupOffsets()
+      state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-    }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
+    }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.")
+  }
+
+  @Test
+  def testDescribeMembersOfExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Stable") &&
+        (assignments match {
+          case Some(memberAssignments) =>
+            memberAssignments.count(_.group == group) == 1 &&
+              memberAssignments.filter(_.group == group).head.consumerId != ConsumerGroupCommand.MISSING_COLUMN_VALUE &&
+              memberAssignments.filter(_.group == group).head.clientId != ConsumerGroupCommand.MISSING_COLUMN_VALUE &&
+              memberAssignments.filter(_.group == group).head.host != ConsumerGroupCommand.MISSING_COLUMN_VALUE
+          case None =>
+            false
+        })
+    }, s"Expected a 'Stable' group status, rows and valid member information for group $group.")
+
+    val (state, assignments) = service.collectGroupMembers(true)
+    assignments match {
+      case None =>
+        fail(s"Expected partition assignments for members of group $group")
+      case Some(memberAssignments) =>
+        assertTrue(s"Expected a topic partition assigned to the single group member for group $group",
+          memberAssignments.size == 1 &&
+          memberAssignments.head.assignment.size == 1)
+    }
   }
 
   @Test
-  def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
+  def testDescribeStateOfExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
 
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" &&
+        state.numMembers == 1 &&
+        state.assignmentStrategy == "range" &&
+        state.coordinator != null &&
+        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
+  }
+
+  @Test
+  def testDescribeStateOfExistingGroupWithRoundRobinAssignor() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic, "org.apache.kafka.clients.consumer.RoundRobinAssignor"))
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, _) = consumerGroupService.describeGroup()
-      state == Some("Stable")
-    }, "Expected the group to initially become stable.")
+      val state = service.collectGroupState()
+      state.state == "Stable" &&
+        state.numMembers == 1 &&
+        state.assignmentStrategy == "roundrobin" &&
+        state.coordinator != null &&
+        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
+  }
+
+  @Test
+  def testDescribeExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run one consumer in the group consuming from a single-partition topic
+      val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        output.trim.split("\n").length == 2 && error.isEmpty
+      }, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'")
+
+      // stop the consumer so the group has no active member anymore
+      executor.shutdown()
+
+      TestUtils.waitUntilTrue(() => {
+        TestUtils.grabConsoleError(service.describeGroup()).contains(s"Consumer group '$group' has no active members.")
+      }, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}")
+    }
+  }
+
+  @Test
+  def testDescribeOffsetsOfExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
 
-    // Group assignments in describeGroup rely on finding committed consumer offsets.
-    // Wait for an offset commit before shutting down the group executor.
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
-      assignments.exists(_.exists(_.group == group))
-    }, "Expected to find group in assignments after initial offset commit")
+      val (state, assignments) = service.collectGroupOffsets()
+      state == Some("Stable") && assignments.exists(_.exists(_.group == group))
+    }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.")
 
     // stop the consumer so the group has no active member anymore
-    consumerGroupExecutor.shutdown()
-
-    val (result, succeeded) = TestUtils.computeUntilTrue(consumerGroupService.describeGroup()) { case (state, assignments) =>
-      val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
-      def assignment = testGroupAssignments.head
-      state == Some("Empty") &&
-        testGroupAssignments.size == 1 &&
-        assignment.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
-        assignment.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
-        assignment.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    executor.shutdown()
+
+    val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets()) {
+      case (state, assignments) =>
+        val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
+        def assignment = testGroupAssignments.head
+        state == Some("Empty") &&
+          testGroupAssignments.size == 1 &&
+          assignment.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+          assignment.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+          assignment.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
     }
     val (state, assignments) = result
     assertTrue(s"Expected no active member in describe group results, state: $state, assignments: $assignments",
@@ -211,63 +382,303 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
-  def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
+  def testDescribeMembersOfExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Stable") && assignments.exists(_.exists(_.group == group))
+    }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.")
+
+    // stop the consumer so the group has no active member anymore
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Empty") && assignments.isDefined && assignments.get.isEmpty
+    }, s"Expected no member in describe group members results for group '$group'")
+  }
+
+  @Test
+  def testDescribeStateOfExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" &&
+        state.numMembers == 1 &&
+        state.coordinator != null &&
+        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    }, s"Expected the group '$group' to initially become stable, and have a single member.")
+
+    // stop the consumer so the group has no active member anymore
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Empty" && state.numMembers == 0 && state.assignmentStrategy == ""
+    }, s"Expected the group '$group' to become empty after the only member leaving.")
+  }
+
+  @Test
+  def testDescribeWithConsumersWithoutAssignedPartitions() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run one consumer in the group consuming from a single-partition topic
+      val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2
+        error.isEmpty && output.trim.split("\n").size == expectedNumRows
+      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'")
+    }
+  }
+
+  @Test
+  def testDescribeOffsetsWithConsumersWithoutAssignedPartitions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run two consumers in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = consumerGroupService.describeGroup()
+      val (state, assignments) = service.collectGroupOffsets()
+      state == Some("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.count { x => x.group == group && x.partition.isDefined } == 1
+    }, "Expected rows for consumers with no assigned partitions in describe group results")
+  }
+
+  @Test
+  def testDescribeMembersWithConsumersWithoutAssignedPartitions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run two consumers in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
       state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
-        assignments.get.count { x => x.group == group && x.partition.isEmpty } == 1
+        assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 1 &&
+        assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 1 &&
+        assignments.get.count(_.assignment.size > 0) == 0
     }, "Expected rows for consumers with no assigned partitions in describe group results")
+
+    val (state, assignments) = service.collectGroupMembers(true)
+    assertTrue("Expected additional columns in verbose vesion of describe members",
+        state == Some("Stable") && assignments.get.count(_.assignment.nonEmpty) > 0)
+  }
+
+  @Test
+  def testDescribeStateWithConsumersWithoutAssignedPartitions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run two consumers in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" && state.numMembers == 2
+    }, "Expected two consumers in describe group results")
   }
 
   @Test
-  def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
+  def testDescribeWithMultiPartitionTopicAndMultipleConsumers() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     val topic2 = "foo2"
     adminZkClient.createTopic(topic2, 2, 1)
 
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run two consumers in the group consuming from a two-partition topic
+      addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic2))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3
+        error.isEmpty && output.trim.split("\n").size == expectedNumRows
+      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'")
+    }
+  }
+
+  @Test
+  def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
     // run two consumers in the group consuming from a two-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic2))
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = consumerGroupService.describeGroup()
+      val (state, assignments) = service.collectGroupOffsets()
       state == Some("Stable") &&
-      assignments.isDefined &&
-      assignments.get.count(_.group == group) == 2 &&
-      assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
-      assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 2 &&
+        assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
+        assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
     }, "Expected two rows (one row per consumer) in describe group results.")
   }
 
   @Test
-  def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() {
-    // Let creation of the offsets topic happen during group initialisation to ensure that initialization doesn't
+  def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+    // run two consumers in the group consuming from a two-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic2))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 2 &&
+        assignments.get.count{ x => x.group == group && x.numPartitions == 1 } == 2 &&
+        assignments.get.count{ x => x.group == group && x.numPartitions == 0 } == 0
+    }, "Expected two rows (one row per consumer) in describe group members results.")
+
+    val (state, assignments) = service.collectGroupMembers(true)
+    assertTrue("Expected additional columns in verbose vesion of describe members",
+        state == Some("Stable") && assignments.get.count(_.assignment.isEmpty) == 0)
+  }
+
+  @Test
+  def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+    // run two consumers in the group consuming from a two-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic2))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" && state.group == group && state.numMembers == 2
+    }, "Expected a stable group with two members in describe group state result.")
+  }
+
+  @Test
+  def testDescribeGroupWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
+    // complete before the timeout expires
+
+    val describeType = describeTypes(Random.nextInt(describeTypes.length))
+    val group = this.group + describeType.mkString("")
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--timeout", "1", "--group", group) ++ describeType
+    val service = getConsumerGroupService(cgcArgs)
+
+    try {
+      TestUtils.grabConsoleOutputAndError(service.describeGroup())
+      fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})")
+    } catch {
+      case _: TimeoutException => // OK
+    }
+  }
+
+  @Test
+  def testDescribeGroupOffsetsWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
+    // complete before the timeout expires
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
+    val service = getConsumerGroupService(cgcArgs)
+
+    try {
+      service.collectGroupOffsets()
+      fail("The consumer group command should fail due to low initialization timeout")
+    } catch {
+      case _: TimeoutException => // OK
+    }
+  }
+
+  @Test
+  def testDescribeGroupMembersWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
     // complete before the timeout expires
 
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
 
     // set the group initialization timeout too low for the group to stabilize
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "1")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
+    val service = getConsumerGroupService(cgcArgs)
 
     try {
-      consumerGroupService.describeGroup()
+      service.collectGroupMembers(false)
+      fail("The consumer group command should fail due to low initialization timeout")
+    } catch {
+      case _: TimeoutException => // OK
+        try {
+          service.collectGroupMembers(true)
+          fail("The consumer group command should fail due to low initialization timeout (verbose)")
+        } catch {
+          case _: TimeoutException => // OK
+        }
+    }
+  }
+
+  @Test
+  def testDescribeGroupStateWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
+    // complete before the timeout expires
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--timeout", "1")
+    val service = getConsumerGroupService(cgcArgs)
+
+    try {
+      service.collectGroupState()
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
       case _: TimeoutException => // OK
@@ -281,15 +692,29 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     consumerProps.setProperty("zookeeper.connect", zkConnect)
     oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
   }
+
+  def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
+    val opts = new ConsumerGroupCommandOptions(args)
+    val service = if (opts.useOldConsumer) new ZkConsumerGroupService(opts) else new KafkaConsumerGroupService(opts)
+    consumerGroupService = service :: consumerGroupService
+    service
+  }
+
+  def addConsumerGroupExecutor(executor: ConsumerGroupExecutor): ConsumerGroupExecutor = {
+    consumerGroupExecutor = executor :: consumerGroupExecutor
+    executor
+  }
 }
 
 
-class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable {
+class ConsumerThread(broker: String, id: Int, groupId: String, topic: String, strategy: String)
+    extends Runnable {
   val props = new Properties
   props.put("bootstrap.servers", broker)
   props.put("group.id", groupId)
   props.put("key.deserializer", classOf[StringDeserializer].getName)
   props.put("value.deserializer", classOf[StringDeserializer].getName)
+  props.put("partition.assignment.strategy", strategy)
   val consumer = new KafkaConsumer(props)
 
   def run() {
@@ -310,11 +735,11 @@ class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) ex
 }
 
 
-class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
+class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String = "org.apache.kafka.clients.consumer.RangeAssignor") {
   val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
   private val consumers = new ArrayBuffer[ConsumerThread]()
   for (i <- 1 to numConsumers) {
-    val consumer = new ConsumerThread(broker, i, groupId, topic)
+    val consumer = new ConsumerThread(broker, i, groupId, topic, strategy)
     consumers += consumer
     executor.submit(consumer)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index f26d3c4..725073e 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -119,7 +119,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
       assignmentsOption match {
         case Some(assignments) =>
           val sumOffset = assignments.filter(_.topic.exists(_ == topic1))
@@ -164,7 +164,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
       assignmentsOption match {
         case Some(assignments) =>
           val sumOffset = (assignments.filter(_.topic.exists(_ == topic1))
@@ -330,7 +330,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic)
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
       assignmentsOption match {
         case Some(assignments) =>
           val sumOffset = assignments.filter(_.topic.exists(_ == topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1da2d7b..bb15e7b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1503,10 +1503,34 @@ object TestUtils extends Logging {
   def grabConsoleOutput(f: => Unit) : String = {
     val out = new ByteArrayOutputStream
     try scala.Console.withOut(out)(f)
-    finally scala.Console.out.flush
+    finally scala.Console.out.flush()
     out.toString
   }
 
+  /**
+   * Capture the console error during the execution of the provided function.
+   */
+  def grabConsoleError(f: => Unit) : String = {
+    val err = new ByteArrayOutputStream
+    try scala.Console.withErr(err)(f)
+    finally scala.Console.err.flush()
+    err.toString
+  }
+
+  /**
+   * Capture both the console output and console error during the execution of the provided function.
+   */
+  def grabConsoleOutputAndError(f: => Unit) : (String, String) = {
+    val out = new ByteArrayOutputStream
+    val err = new ByteArrayOutputStream
+    try scala.Console.withOut(out)(scala.Console.withErr(err)(f))
+    finally {
+      scala.Console.out.flush()
+      scala.Console.err.flush()
+    }
+    (out.toString, err.toString)
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {


Mime
View raw message