kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ismael Juma (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4596) KIP-73 rebalance throttling breaks on plans for specific partitions
Date Mon, 16 Jan 2017 11:16:26 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823797#comment-15823797
] 

Ismael Juma commented on KAFKA-4596:
------------------------------------

[~ewencp], I'm tentatively assigning this to Ben as he knows this code well and the fix is
trivial, but important. Ben is back on Wednesday and if he can't do it, we'll find an alternative.

> KIP-73 rebalance throttling breaks on plans for specific partitions
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4596
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4596
>             Project: Kafka
>          Issue Type: Bug
>         Environment: Kafka 0.10.1.1
>            Reporter: Tom Crayford
>            Assignee: Ben Stopford
>             Fix For: 0.10.2.0
>
>
> The reassign-partitions.sh command fails if you both *throttle* and give it a specific
partition reassignment. For example, upon reassigning {code}__consumer_offsets{code} partition
19, you get the following error:
> {code}
> Save this to use as the --reassignment-json-file option during rollback
> Warning: You must run Verify periodically, until the reassignment completes, to ensure
the throttle is removed. You can also alter the throttle by rerunning the Execute command
passing a new value.
> The throttle limit was set to 1048576 B/s
> Partitions reassignment failed due to key not found: [__consumer_offsets,30]
> java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:59)
>         at scala.collection.MapLike$class.apply(MapLike.scala:141)
>         at scala.collection.AbstractMap.apply(Map.scala:59)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
> 2)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
> 1)
>         at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>         at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
>         at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
>         at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
>         at kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
>         at kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
>         at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
>         at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
>         at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
>         at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
>         at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
> {code}
> This effectively breaks the throttling feature unless you want to rebalance many many
partitions at once.
> For reference the command that was run is:
> {code}
> kafka-reassign-partitions.sh --reassignment-json-file 9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
> -4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
> {code}
> and the contents of the plan file is:
> {code}
> {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
> {code}
> This seems like a simple logic error to me, where we're trying to look up a partition
that's not been proposed, when we should not be. It looks like the logic assumes that {code}Map.apply{code}
doesn't error if the lookup value isn't found, when in fact it does.
> I checked that this cluster does indeed have the __consumer_offsets topic populated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message