kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
Date Wed, 03 Jan 2018 15:45:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309815#comment-16309815

Randall Hauch commented on KAFKA-6252:

As mentioned above, connectors run into this is because the Task doesn't properly implement
`stop()`. For example, a source task has a {{poll()}} method that is expected to block while
there are no new records to return. However, that blocking must be interrupted when {{stop()}}
is called. If the source connector uses a thread, then {{stop()}} should interrupt that thread
and set the state such that {{poll()}} will return an empty list. 

Other source connector implementation may use a {{BlockingQueue}} and call the queue's {{take()}}
method to grab items out of a queue. In this case, {{take()}} blocks indefinitely until there
is an item in the queue, but if the task's {{stop()}} method is called while {{poll()}} is
blocked on the queue's {{take()}} method, the task will likely never add another item to the
queue and {{take()}} -- and thus {{poll()}} -- will never return.

The proper way to use a blocking queue is to use {{BlockingQueue.poll(timeout, unit)}} method
to block for a max amount of time, and to then handle the case when no item is retrieved from
the queue. The task's {{poll()}} can return an empty list.

An even better approach is to use {{BlockingQueue.drain(...)}} to drain any existing items
in the queue. This works really well when using {{BlockingQueue<SourceRecord>}}, since
the queue can be drained directly into the {{List<SourceRecord>}} instance that you
can then return. If the queue contains something other than {{SourceRecord}}, then simply
drain to a new list and then process those items to create the {{SourceRecord}} objects and
add them to the list that you'll return.

> A metric named 'XX' already exists, can't register another one.
> ---------------------------------------------------------------
>                 Key: KAFKA-6252
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6252
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>         Environment: Linux
>            Reporter: Alexis Sellier
>            Priority: Critical
> When a connector crashes (or is not implemented correctly by not stopping/interrupting
{{poll()}}), It cannot be restarted and an exception like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName [name=offset-commit-max-time-ms,
group=connector-task-metrics, description=The maximum time in milliseconds taken by this task
to commit offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already exists,
can't register another one.
> 	at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
> 	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
> 	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
> 	at org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.<init>(WorkerTask.java:328)
> 	at org.apache.kafka.connect.runtime.WorkerTask.<init>(WorkerTask.java:69)
> 	at org.apache.kafka.connect.runtime.WorkerSinkTask.<init>(WorkerSinkTask.java:98)
> 	at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
> 	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all the cases

This message was sent by Atlassian JIRA

View raw message