kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5624) Unsafe use of expired sensors
Date Tue, 20 Feb 2018 16:58:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370258#comment-16370258
] 

ASF GitHub Bot commented on KAFKA-5624:
---------------------------------------

hachikuji closed pull request #4404: KAFKA-5624: Add expiry check to sensor.add() methods
URL: https://github.com/apache/kafka/pull/4404
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 837ac2e4b43..06c8c7f362b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -207,9 +207,11 @@ public void checkQuotas(long timeMs) {
 
     /**
      * Register a compound statistic with this sensor with no config override
+     * @param stat The stat to register
+     * @return true if stat is added to sensor, false if sensor is expired
      */
-    public void add(CompoundStat stat) {
-        add(stat, null);
+    public boolean add(CompoundStat stat) {
+        return add(stat, null);
     }
 
     /**
@@ -217,8 +219,12 @@ public void add(CompoundStat stat) {
      * @param stat The stat to register
      * @param config The configuration for this stat. If null then the stat will use the
default configuration for this
      *        sensor.
+     * @return true if stat is added to sensor, false if sensor is expired
      */
-    public synchronized void add(CompoundStat stat, MetricConfig config) {
+    public synchronized boolean add(CompoundStat stat, MetricConfig config) {
+        if (hasExpired())
+            return false;
+
         this.stats.add(Utils.notNull(stat));
         Object lock = new Object();
         for (NamedMeasurable m : stat.stats()) {
@@ -226,15 +232,17 @@ public synchronized void add(CompoundStat stat, MetricConfig config)
{
             this.registry.registerMetric(metric);
             this.metrics.add(metric);
         }
+        return true;
     }
 
     /**
      * Register a metric with this sensor
      * @param metricName The name of the metric
      * @param stat The statistic to keep
+     * @return true if metric is added to sensor, false if sensor is expired
      */
-    public void add(MetricName metricName, MeasurableStat stat) {
-        add(metricName, stat, null);
+    public boolean add(MetricName metricName, MeasurableStat stat) {
+        return add(metricName, stat, null);
     }
 
     /**
@@ -242,8 +250,12 @@ public void add(MetricName metricName, MeasurableStat stat) {
      * @param metricName The name of the metric
      * @param stat The statistic to keep
      * @param config A special configuration for this metric. If null use the sensor default
configuration.
+     * @return true if metric is added to sensor, false if sensor is expired
      */
-    public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig
config) {
+    public synchronized boolean add(MetricName metricName, MeasurableStat stat, MetricConfig
config) {
+        if (hasExpired())
+            return false;
+
         KafkaMetric metric = new KafkaMetric(new Object(),
                                              Utils.notNull(metricName),
                                              Utils.notNull(stat),
@@ -252,6 +264,7 @@ public synchronized void add(MetricName metricName, MeasurableStat stat,
MetricC
         this.registry.registerMetric(metric);
         this.metrics.add(metric);
         this.stats.add(stat);
+        return true;
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 8bcc775df56..23fc5411b3c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -256,7 +256,7 @@ public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception
{
         mockedSensor.record(anyDouble(), anyLong());
         expectLastCall().andThrow(new OutOfMemoryError());
         expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName);
-        mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
+        expect(mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName))).andReturn(true);
 
         replay(mockedMetrics, mockedSensor, metricName);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index d22111e128e..3f7551ef985 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -20,7 +20,17 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
 public class SensorTest {
@@ -59,4 +69,29 @@ public void testShouldRecord() {
             0, Sensor.RecordingLevel.DEBUG);
         assertFalse(debugSensor.shouldRecord());
     }
+
+    @Test
+    public void testExpiredSensor() {
+        MetricConfig config = new MetricConfig();
+        Time mockTime = new MockTime();
+        Metrics metrics =  new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()),
mockTime, true);
+
+        long inactiveSensorExpirationTimeSeconds = 60L;
+        Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
+            inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
+
+        assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
+
+        Map<String, String> emptyTags = Collections.emptyMap();
+        MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
+        MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
+        Meter meter = new Meter(rateMetricName, totalMetricName);
+        assertTrue(sensor.add(meter));
+
+        mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
+        assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
+        assertFalse(sensor.add(meter));
+
+        metrics.close();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Unsafe use of expired sensors
> -----------------------------
>
>                 Key: KAFKA-5624
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5624
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Manikumar
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Seems a couple unhandled cases following sensor expiration:
> 1. Static sensors (such as {{ClientQuotaManager.delayQueueSensor}}) can be expired due
to inactivity, but the references will remain valid and usable. Probably a good idea to either
ensure we use a "get or create" pattern when accessing the sensor or add a new static registration
option which makes the sensor ineligible for expiration.
> 2. It is possible to register metrics through the sensor even after it is expired. We
should probably raise an exception instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message