kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jason Gustafson (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-8066) ReplicaFetcherThread fails to startup because of failing to register the metric.
Date Wed, 01 May 2019 20:01:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-8066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jason Gustafson resolved KAFKA-8066.
------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.2.1
                   2.1.2
                   2.0.2

> ReplicaFetcherThread fails to startup because of failing to register the metric.
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8066
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2, 2.1.2
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Major
>             Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> After KAFKA-6051, we close leaderEndPoint in replica fetcher thread initiateShutdown
to try to preempt in-progress fetch request and accelerate repica fetcher thread shutdown.
However, the selector may fail to close the channel and throw an Exception when the replica
fetcher thread is still actively fetching. In this case, the sensor will not be cleaned up.
Basically, if `close(id)` throws an exception in `Selector.close()`, then `sensors.close()`
will not be called and thus the sensors will not get unregistered (See codes below).
> {code:java}
>     public void close() {
>         List<String> connections = new ArrayList<>(channels.keySet());
>         for (String id : connections)
>             close(id);
>         try {
>             this.nioSelector.close();
>         } catch (IOException | SecurityException e) {
>             log.error("Exception closing nioSelector:", e);
>         }
>         sensors.close();
>         channelBuilder.close();
>     }
> {code}
> If this happen, when the broker want to start up the ReplicaFetcherThread with the same
fetch id to the same destination broker again (e.g. due to leadership changes or new partitions
get created), the ReplicaFetcherThread will fail to start up because the selector will throw
an IllegalArgumentException if the metric with the same name already exists:
> {noformat}
> 2019/02/27 10:24:26.938 ERROR [KafkaApis] [kafka-request-handler-6] [kafka-server] []
[KafkaApi-38031] Error when handling request {}
> java.lang.IllegalArgumentException: A metric named 'MetricName [name=connection-count,
group=replica-fetcher-metrics, description=The current number of active connections., tags={broker-id=29712,
fetcher-id=3}]' already exists, can't register another one.
>         at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:559) ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:502) ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:485) ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:470) ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:963)
~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:170) ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) ~[kafka-clients-2.0.0.66.jar:?]
>         at kafka.server.ReplicaFetcherBlockingSend.<init>(ReplicaFetcherBlockingSend.scala:61)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
~[kafka_2.11-2.0.0.66.jar:?]
>         at scala.Option.getOrElse(Option.scala:121) ~[scala-library-2.11.12.jar:?]
>         at kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:67)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:32)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.AbstractFetcherManager.kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(AbstractFetcherManager.scala:132)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:146)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:137)
~[kafka_2.11-2.0.0.66.jar:?]
>         at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
~[scala-library-2.11.12.jar:?]
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) ~[scala-library-2.11.12.jar:?]
>         at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
~[scala-library-2.11.12.jar:?]
>         at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:137)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1333) ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1107)
~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194) ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:110) ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) ~[kafka_2.11-2.0.0.66.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> {noformat}
> The fix should be adding a try-finally block for selector.close() to make sure sensors.close()
will be called even an exception is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message