kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
Date Sat, 06 Jan 2018 05:27:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314383#comment-16314383
] 

ASF GitHub Bot commented on KAFKA-6252:
---------------------------------------

ewencp closed pull request #4397: KAFKA-6252: Close the metric group to clean up any existing
metrics
URL: https://github.com/apache/kafka/pull/4397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2d80f..9b934f3428a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -247,6 +247,8 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State
             ConnectMetricsRegistry registry = connectMetrics.registry();
             this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
                     registry.connectorTagName(), connName);
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
             metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 234ce8adf14..587e4c68cf5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -652,6 +652,8 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics)
{
             metricGroup = connectMetrics
                                   .group(registry.sinkTaskGroupName(), registry.connectorTagName(),
id.connector(), registry.taskTagName(),
                                          Integer.toString(id.task()));
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9072cd47c81..a172cdb45f0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -494,6 +494,8 @@ public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics)
             metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
+            // remove any previously created metrics in this group to prevent collisions.
+            metricGroup.close();
 
             sourceRecordPoll = metricGroup.sensor("source-record-poll");
             sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new
Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ec069245b3d..d563f9bdede 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -313,6 +313,8 @@ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics,
TaskS
             metricGroup = connectMetrics.group(registry.taskGroupName(),
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>()
{
                 @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 2de7cb6e107..d247df83945 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -16,13 +16,19 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
 import org.apache.kafka.connect.util.MockTime;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -136,4 +142,40 @@ public void testMetricGroupIdWithoutTags() {
         assertNotNull(id1.tags());
         assertNotNull(id2.tags());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testRecreateWithClose() {
+        int numMetrics = addToGroup(metrics, false);
+        int numMetricsInRecreatedGroup = addToGroup(metrics, true);
+        Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRecreateWithoutClose() {
+        int numMetrics = addToGroup(metrics, false);
+        int numMetricsInRecreatedGroup = addToGroup(metrics, false);
+        // we should never get here
+        throw new RuntimeException("Created " + numMetricsInRecreatedGroup
+                + " metrics in recreated group. Original=" + numMetrics);
+    }
+
+    private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
+                registry.connectorTagName(), "conn_name");
+
+        if (shouldClose) {
+            metricGroup.close();
+        }
+
+        Sensor sensor = metricGroup.sensor("my_sensor");
+        sensor.add(metricName("x1"), new Max());
+        sensor.add(metricName("y2"), new Avg());
+
+        return metricGroup.metrics().metrics().size();
+    }
+
+    static MetricName metricName(String name) {
+        return new MetricName(name, "test_group", "metrics for testing", Collections.<String,
String>emptyMap());
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> A metric named 'XX' already exists, can't register another one.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6252
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6252
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>         Environment: Linux
>            Reporter: Alexis Sellier
>            Assignee: Arjun Satish
>            Priority: Critical
>             Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not stopping/interrupting
{{poll()}}), It cannot be restarted and an exception like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName [name=offset-commit-max-time-ms,
group=connector-task-metrics, description=The maximum time in milliseconds taken by this task
to commit offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already exists,
can't register another one.
> 	at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
> 	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
> 	at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
> 	at org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.<init>(WorkerTask.java:328)
> 	at org.apache.kafka.connect.runtime.WorkerTask.<init>(WorkerTask.java:69)
> 	at org.apache.kafka.connect.runtime.WorkerSinkTask.<init>(WorkerSinkTask.java:98)
> 	at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
> 	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message