kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6321) ConsumerGroupCommand should use the new consumer to query the log end offsets.
Date Fri, 26 Jan 2018 04:42:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340567#comment-16340567
] 

ASF GitHub Bot commented on KAFKA-6321:
---------------------------------------

becketqin closed pull request #4344: KAFKA-6321: Consolidate calls to KafkaConsumer's `beginningOffsets()`
and `endOffsets()` in ConsumerGroupCommand
URL: https://github.com/apache/kafka/pull/4344
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index c437a1e736c..420eb56ab22 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -287,7 +287,10 @@ object ConsumerGroupCommand extends Logging {
 
     protected def opts: ConsumerGroupCommandOptions
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult =
+      getLogEndOffsets(Seq(topicPartition)).get(topicPartition).getOrElse(LogOffsetResult.Ignore)
+
+    protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult]
 
     def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]])
 
@@ -302,43 +305,40 @@ object ConsumerGroupCommand extends Logging {
                                             consumerIdOpt: Option[String],
                                             hostOpt: Option[String],
                                             clientIdOpt: Option[String]): Array[PartitionAssignmentState]
= {
-      if (topicPartitions.isEmpty)
+      if (topicPartitions.isEmpty) {
         Array[PartitionAssignmentState](
           PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None),
consumerIdOpt, hostOpt, clientIdOpt, None)
         )
-      else {
-        var assignmentRows: Array[PartitionAssignmentState] = Array()
-        topicPartitions
-          .sortBy(_.partition)
-          .foreach { topicPartition =>
-            assignmentRows = assignmentRows :+ describePartition(group, coordinator, topicPartition.topic,
topicPartition.partition, getPartitionOffset(topicPartition),
-              consumerIdOpt, hostOpt, clientIdOpt)
-          }
-        assignmentRows
       }
+      else
+        describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset,
consumerIdOpt, hostOpt, clientIdOpt)
     }
 
     private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] =
       offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
 
-    private def describePartition(group: String,
-                                  coordinator: Option[Node],
-                                  topic: String,
-                                  partition: Int,
-                                  offsetOpt: Option[Long],
-                                  consumerIdOpt: Option[String],
-                                  hostOpt: Option[String],
-                                  clientIdOpt: Option[String]): PartitionAssignmentState
= {
-      def getDescribePartitionResult(logEndOffsetOpt: Option[Long]): PartitionAssignmentState
=
-        PartitionAssignmentState(group, coordinator, Option(topic), Option(partition), offsetOpt,
-                                 getLag(offsetOpt, logEndOffsetOpt), consumerIdOpt, hostOpt,
-                                 clientIdOpt, logEndOffsetOpt)
-
-      getLogEndOffset(new TopicPartition(topic, partition)) match {
-        case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
-        case LogOffsetResult.Unknown => getDescribePartitionResult(None)
-        case LogOffsetResult.Ignore => null
+    private def describePartitions(group: String,
+                                   coordinator: Option[Node],
+                                   topicPartitions: Seq[TopicPartition],
+                                   getPartitionOffset: TopicPartition => Option[Long],
+                                   consumerIdOpt: Option[String],
+                                   hostOpt: Option[String],
+                                   clientIdOpt: Option[String]): Array[PartitionAssignmentState]
= {
+
+      def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]):
PartitionAssignmentState = {
+        val offset = getPartitionOffset(topicPartition)
+        PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition),
offset,
+          getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)
       }
+
+      getLogEndOffsets(topicPartitions).map {
+        logEndOffsetResult =>
+          logEndOffsetResult._2 match {
+            case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1,
Some(logEndOffset))
+            case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1,
None)
+            case LogOffsetResult.Ignore => null
+          }
+      }.toArray
     }
 
     def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
@@ -423,21 +423,23 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match
{
-        case Some(-1) => LogOffsetResult.Unknown
-        case Some(brokerId) =>
-          getZkConsumer(brokerId).map { consumer =>
-            val topicAndPartition = new TopicAndPartition(topicPartition)
-            val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
-            val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-            consumer.close()
-            LogOffsetResult.LogOffset(logEndOffset)
-          }.getOrElse(LogOffsetResult.Ignore)
-        case None =>
-          printError(s"No broker for partition '$topicPartition'")
-          LogOffsetResult.Ignore
-      }
+    protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
+      topicPartitions.map { topicPartition => (topicPartition,
+        zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match
{
+          case Some(-1) => LogOffsetResult.Unknown
+          case Some(brokerId) =>
+            getZkConsumer(brokerId).map { consumer =>
+              val topicAndPartition = new TopicAndPartition(topicPartition)
+              val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
+              val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+              consumer.close()
+              LogOffsetResult.LogOffset(logEndOffset)
+            }.getOrElse(LogOffsetResult.Ignore)
+          case None =>
+            printError(s"No broker for partition '$topicPartition'")
+            LogOffsetResult.Ignore
+        }
+      )}.toMap
     }
 
     private def getPartitionOffsets(group: String,
@@ -596,27 +598,34 @@ object ConsumerGroupCommand extends Logging {
         consumerGroupSummary.state, consumerGroupSummary.consumers.get.size)
     }
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      val offsets = getConsumer.endOffsets(List(topicPartition).asJava)
-      val logStartOffset = offsets.get(topicPartition)
-      LogOffsetResult.LogOffset(logStartOffset)
+    protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
+      val offsets = getConsumer.endOffsets(topicPartitions.asJava)
+      topicPartitions.map { topicPartition =>
+        val logEndOffset = offsets.get(topicPartition)
+        topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+      }.toMap
     }
 
-    protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = {
-      val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava)
-      val logStartOffset = offsets.get(topicPartition)
-      LogOffsetResult.LogOffset(logStartOffset)
+    protected def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition,
LogOffsetResult] = {
+      val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
+      topicPartitions.map { topicPartition =>
+        val logStartOffset = offsets.get(topicPartition)
+        topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+      }.toMap
     }
 
-    protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long):
LogOffsetResult = {
+    protected def getLogTimestampOffsets(topicPartitions: Seq[TopicPartition], timestamp:
java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
       val consumer = getConsumer
-      consumer.assign(List(topicPartition).asJava)
-      val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava)
-      if (offsetsForTimes != null && !offsetsForTimes.isEmpty && offsetsForTimes.get(topicPartition)
!= null)
-        LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset)
-      else {
-        getLogEndOffset(topicPartition)
-      }
+      consumer.assign(topicPartitions.asJava)
+
+      val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
+        consumer.offsetsForTimes(topicPartitions.map(_ -> timestamp).toMap.asJava).asScala.partition(_._2
!= null)
+
+      val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
+        case (topicPartition, offsetAndTimestamp) => topicPartition -> LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
+      }.toMap
+
+      successfulLogTimestampOffsets ++ getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet.toSeq)
     }
 
     def close() {
@@ -703,57 +712,60 @@ object ConsumerGroupCommand extends Logging {
         }.toMap
     }
 
-    private def prepareOffsetsToReset(groupId: String, partitionsToReset: Iterable[TopicPartition]):
Map[TopicPartition, OffsetAndMetadata] = {
+    private def prepareOffsetsToReset(groupId: String, partitionsToReset: Seq[TopicPartition]):
Map[TopicPartition, OffsetAndMetadata] = {
       if (opts.options.has(opts.resetToOffsetOpt)) {
         val offset = opts.options.valueOf(opts.resetToOffsetOpt)
-        partitionsToReset.map {
-          topicPartition =>
-            val newOffset: Long = checkOffsetRange(topicPartition, offset)
-            (topicPartition, new OffsetAndMetadata(newOffset))
-        }.toMap
+        checkOffsetsRange(partitionsToReset.map((_, offset)).toMap).map {
+          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+        }
       } else if (opts.options.has(opts.resetToEarliestOpt)) {
+        val logStartOffsets = getLogStartOffsets(partitionsToReset)
         partitionsToReset.map { topicPartition =>
-          getLogStartOffset(topicPartition) match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+          logStartOffsets.get(topicPartition) match {
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting
offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetToLatestOpt)) {
+        val logEndOffsets = getLogEndOffsets(partitionsToReset)
         partitionsToReset.map { topicPartition =>
-          getLogEndOffset(topicPartition) match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+          logEndOffsets.get(topicPartition) match {
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending
offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetShiftByOpt)) {
         val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
-        partitionsToReset.map { topicPartition =>
+        val requestedOffsets = partitionsToReset.map { topicPartition =>
           val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
           val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
             throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition
since there is no current committed offset"))
-          val shiftedOffset = currentOffset + shiftBy
-          val newOffset: Long = checkOffsetRange(topicPartition, shiftedOffset)
-          (topicPartition, new OffsetAndMetadata(newOffset))
+          (topicPartition, currentOffset + shiftBy)
         }.toMap
+        checkOffsetsRange(requestedOffsets).map {
+          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+        }
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
         val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+        val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
         partitionsToReset.map { topicPartition =>
-          val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+          val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset
by timestamp of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetByDurationOpt)) {
+        val duration = opts.options.valueOf(opts.resetByDurationOpt)
+        val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
+        val now = new Date()
+        durationParsed.negate().addTo(now)
+        val timestamp = now.getTime
+        val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
         partitionsToReset.map { topicPartition =>
-          val duration = opts.options.valueOf(opts.resetByDurationOpt)
-          val now = new Date()
-          val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
-          durationParsed.negate().addTo(now)
-          val timestamp = now.getTime
-          val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+          val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
-            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
             case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset
by timestamp of topic partition: $topicPartition")
           }
         }.toMap
@@ -761,40 +773,55 @@ object ConsumerGroupCommand extends Logging {
         val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
         val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
         val resetPlan = parseResetPlan(resetPlanCsv)
-        resetPlan.keySet.map { topicPartition =>
-          val newOffset: Long = checkOffsetRange(topicPartition, resetPlan(topicPartition).offset())
-          (topicPartition, new OffsetAndMetadata(newOffset))
+        val requestedOffsets = resetPlan.keySet.map { topicPartition =>
+          (topicPartition, resetPlan(topicPartition).offset())
         }.toMap
+        checkOffsetsRange(requestedOffsets).map {
+          case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
+        }
       } else if (opts.options.has(opts.resetToCurrentOpt)) {
         val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
-        partitionsToReset.map { topicPartition =>
-          currentCommittedOffsets.get(topicPartition).map { offset =>
-            (topicPartition, new OffsetAndMetadata(offset))
-          }.getOrElse(
-            getLogEndOffset(topicPartition) match {
-              case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
-              case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting
ending offset of topic partition: $topicPartition")
-            }
-          )
+        val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset)
=
+          partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
+
+        val preparedOffsetsForParititionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map
{ topicPartition =>
+          (topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition)
match {
+            case Some(offset) => offset
+            case _ => throw new IllegalStateException(s"Expected a valid current offset
for topic partition: $topicPartition")
+          }))
         }.toMap
+
+        val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset).map
{
+          case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
+          case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting ending offset of topic partition: $topicPartition")
+        }
+
+        preparedOffsetsForParititionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
       } else {
         CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following
scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
       }
     }
 
-    private def checkOffsetRange(topicPartition: TopicPartition, offset: Long) = {
-      getLogEndOffset(topicPartition) match {
-        case LogOffsetResult.LogOffset(endOffset) if offset > endOffset =>
-          warn(s"New offset ($offset) is higher than latest offset. Value will be set to
$endOffset")
-          endOffset
+    private def checkOffsetsRange(requestedOffsets: Map[TopicPartition, Long]) = {
+      val logStartOffsets = getLogStartOffsets(requestedOffsets.keySet.toSeq)
+      val logEndOffsets = getLogEndOffsets(requestedOffsets.keySet.toSeq)
+      requestedOffsets.map { case (topicPartition, offset) => (topicPartition,
+        logEndOffsets.get(topicPartition) match {
+          case Some(LogOffsetResult.LogOffset(endOffset)) if offset > endOffset =>
+            warn(s"New offset ($offset) is higher than latest offset for topic partition
$topicPartition. Value will be set to $endOffset")
+            endOffset
 
-        case _ => getLogStartOffset(topicPartition) match {
-          case LogOffsetResult.LogOffset(startOffset) if offset < startOffset =>
-            warn(s"New offset ($offset) is lower than earliest offset. Value will be set
to $startOffset")
-            startOffset
+          case Some(_) => logStartOffsets.get(topicPartition) match {
+            case Some(LogOffsetResult.LogOffset(startOffset)) if offset < startOffset
=>
+              warn(s"New offset ($offset) is lower than earliest offset for topic partition
$topicPartition. Value will be set to $startOffset")
+              startOffset
 
-          case _ => offset
-        }
+            case _ => offset
+          }
+
+          case None => // the control should not reach here
+            throw new IllegalStateException(s"Unexpected non-existing offset value for topic
partition $topicPartition")
+        })
       }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ConsumerGroupCommand should use the new consumer to query the log end offsets.
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-6321
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6321
>             Project: Kafka
>          Issue Type: Improvement
>          Components: tools
>    Affects Versions: 1.0.0
>            Reporter: Jiangjie Qin
>            Assignee: Vahid Hashemian
>            Priority: Major
>             Fix For: 1.1.0
>
>
> Currently the ConsumerGroupCommand is querying the log end offsets one partition at a
time. It should switch to use Consumer.endOffsets().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message