kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6765) Intermittent test failure in CustomQuotaCallbackTest
Date Tue, 17 Apr 2018 13:47:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440871#comment-16440871
] 

ASF GitHub Bot commented on KAFKA-6765:
---------------------------------------

rajinisivaram closed pull request #4869: KAFKA-6765: Handle exception while reading throttle
metric value in test
URL: https://github.com/apache/kafka/pull/4869
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index ea18cd3fef8..dee69f5a345 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -487,7 +487,8 @@ public void addMetric(MetricName metricName, MetricConfig config, Measurable
mea
 
     /**
      * Add a metric to monitor an object that implements MetricValueProvider. This metric
won't be associated with any
-     * sensor. This is a way to expose existing values as metrics.
+     * sensor. This is a way to expose existing values as metrics. User is expected to add
any additional
+     * synchronization to update and access metric values, if required.
      *
      * @param metricName The name of the metric
      * @param metricValueProvider The metric value provider associated with this metric
@@ -503,7 +504,8 @@ public void addMetric(MetricName metricName, MetricConfig config, MetricValuePro
 
     /**
      * Add a metric to monitor an object that implements MetricValueProvider. This metric
won't be associated with any
-     * sensor. This is a way to expose existing values as metrics.
+     * sensor. This is a way to expose existing values as metrics. User is expected to add
any additional
+     * synchronization to update and access metric values, if required.
      *
      * @param metricName The name of the metric
      * @param metricValueProvider The metric value provider associated with this metric
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 22f273d9d93..e4bf1aeee69 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -229,7 +229,7 @@ public synchronized boolean add(CompoundStat stat, MetricConfig config)
{
             return false;
 
         this.stats.add(Utils.notNull(stat));
-        Object lock = new Object();
+        Object lock = metricLock(stat);
         for (NamedMeasurable m : stat.stats()) {
             final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config ==
null ? this.config : config, time);
             if (!metrics.containsKey(metric.metricName())) {
@@ -265,7 +265,7 @@ public synchronized boolean add(final MetricName metricName, final MeasurableSta
             return true;
         } else {
             final KafkaMetric metric = new KafkaMetric(
-                new Object(),
+                metricLock(stat),
                 Utils.notNull(metricName),
                 Utils.notNull(stat),
                 config == null ? this.config : config,
@@ -289,4 +289,12 @@ public boolean hasExpired() {
     synchronized List<KafkaMetric> metrics() {
         return Collections.unmodifiableList(new LinkedList<>(this.metrics.values()));
     }
+
+    /**
+     * KafkaMetrics of sensors which use SampledStat should be synchronized on the Sensor
object
+     * to allow concurrent reads and updates. For simplicity, all sensors are synchronized
on Sensor.
+     */
+    private Object metricLock(Stat stat) {
+        return this;
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 55f8e2349bf..6acc39d35a6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -24,9 +24,16 @@
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -39,8 +46,10 @@
 import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
 import org.junit.Before;
@@ -53,6 +62,7 @@
     private MockTime time = new MockTime();
     private MetricConfig config = new MetricConfig();
     private Metrics metrics;
+    private ExecutorService executorService;
 
     @Before
     public void setup() {
@@ -60,7 +70,11 @@ public void setup() {
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        }
         this.metrics.close();
     }
 
@@ -588,9 +602,124 @@ public void testMetricInstances() {
                 // this is expected
             }
         }
+    }
+
+    @Test
+    public void testConcurrentAccess() throws Exception {
+        final Random random = new Random();
+        final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
+        metrics = new Metrics(new MockTime(10));
+        SensorCreator sensorCreator = new SensorCreator(metrics);
+
+        final AtomicBoolean alive = new AtomicBoolean(true);
+        executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                while (alive.get()) {
+                    for (Sensor sensor : sensors) {
+                        sensor.record(random.nextInt(10000));
+                    }
+                }
+            }
+        });
 
+        for (int i = 0; i < 10000; i++) {
+            if (sensors.size() > 5) {
+                Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast();
+                metrics.removeSensor(sensor.name());
+            }
+            StatType statType = StatType.forId(random.nextInt(StatType.values().length));
+            sensors.add(sensorCreator.createSensor(statType, i));
+            for (Sensor sensor : sensors) {
+                for (KafkaMetric metric : sensor.metrics()) {
+                    assertNotNull("Invalid metric value", metric.metricValue());
+                }
+            }
+        }
+        alive.set(false);
     }
 
-    
+    enum StatType {
+        AVG(0),
+        TOTAL(1),
+        COUNT(2),
+        MAX(3),
+        MIN(4),
+        RATE(5),
+        SIMPLE_RATE(6),
+        SUM(7),
+        VALUE(8),
+        PERCENTILES(9),
+        METER(10);
+
+        int id;
+        StatType(int id) {
+            this.id = id;
+        }
 
+        static StatType forId(int id) {
+            for (StatType statType : StatType.values()) {
+                if (statType.id == id)
+                    return statType;
+            }
+            return null;
+        }
+    }
+
+    private static class SensorCreator {
+
+        private final Metrics metrics;
+
+        SensorCreator(Metrics metrics) {
+            this.metrics = metrics;
+        }
+
+        private Sensor createSensor(StatType statType, int index) {
+            Sensor sensor = metrics.sensor("kafka.requests");
+            Map<String, String> tags = Collections.singletonMap("tag", "tag" + index);
+            switch (statType) {
+                case AVG:
+                    sensor.add(metrics.metricName("test.metric.avg", "avg", tags), new Avg());
+                    break;
+                case TOTAL:
+                    sensor.add(metrics.metricName("test.metric.total", "total", tags), new
Total());
+                    break;
+                case COUNT:
+                    sensor.add(metrics.metricName("test.metric.count", "count", tags), new
Count());
+                    break;
+                case MAX:
+                    sensor.add(metrics.metricName("test.metric.max", "max", tags), new Max());
+                    break;
+                case MIN:
+                    sensor.add(metrics.metricName("test.metric.min", "min", tags), new Min());
+                    break;
+                case RATE:
+                    sensor.add(metrics.metricName("test.metric.rate", "rate", tags), new
Rate());
+                    break;
+                case SIMPLE_RATE:
+                    sensor.add(metrics.metricName("test.metric.simpleRate", "simpleRate",
tags), new SimpleRate());
+                    break;
+                case SUM:
+                    sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new Sum());
+                    break;
+                case VALUE:
+                    sensor.add(metrics.metricName("test.metric.value", "value", tags), new
Value());
+                    break;
+                case PERCENTILES:
+                    sensor.add(metrics.metricName("test.metric.percentiles", "percentiles",
tags),
+                               new Percentiles(100, -100, 100, Percentiles.BucketSizing.CONSTANT,
+                                               new Percentile(metrics.metricName("test.median",
"percentiles"), 50.0),
+                                               new Percentile(metrics.metricName("test.perc99_9",
"percentiles"), 99.9)));
+                    break;
+                case METER:
+                    sensor.add(new Meter(metrics.metricName("test.metric.meter.rate", "meter",
tags),
+                               metrics.metricName("test.metric.meter.total", "meter", tags)));
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid stat type " + statType);
+            }
+            return sensor;
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Intermittent test failure in CustomQuotaCallbackTest
> ----------------------------------------------------
>
>                 Key: KAFKA-6765
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6765
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.2.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.2.0
>
>
> Exception stack trace:
> {quote}
> java.lang.NullPointerException at org.apache.kafka.common.metrics.stats.SampledStat.purgeObsoleteSamples(SampledStat.java:104)
at org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:74) at org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:68)
at kafka.api.QuotaTestClients$.metricValue(BaseQuotaTest.scala:163) at kafka.api.QuotaTestClients.produceUntilThrottled(BaseQuotaTest.scala:193)
at kafka.api.CustomQuotaCallbackTest$GroupedUser.produceConsume(CustomQuotaCallbackTest.scala:272)
at kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:146)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message