kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2419; Garbage collect unused sensors
Date Wed, 07 Oct 2015 20:08:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2047a9afe -> 118912e76


KAFKA-2419; Garbage collect unused sensors

As discussed in KAFKA-2419 - I've added a time based sensor retention config to Sensor. Sensors that have not been "recorded" for 'n' seconds are eligible for expiration.

In addition to the time based retention, I've also altered several tests to close the Metrics and scheduler objects since they can cause leaks while running tests. This causes TestUtils.verifyNonDaemonThreadStatus to fail.

Author: Aditya Auradkar <aauradka@aauradka-mn1.linkedin.biz>
Author: Aditya Auradkar <aauradka@aauradka-mn1.(none)>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Joel Koshy <jjkoshy.w@gmail.com>

Closes #233 from auradkar/K-2419


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/118912e7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/118912e7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/118912e7

Branch: refs/heads/trunk
Commit: 118912e76e5c867a8727f90d40bb969e0e6b65c5
Parents: 2047a9a
Author: Aditya Auradkar <aauradka@aauradka-mn1.linkedin.biz>
Authored: Wed Oct 7 13:08:07 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Oct 7 13:08:07 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/metrics/Metrics.java    |  63 +++++++-
 .../org/apache/kafka/common/metrics/Sensor.java |  20 ++-
 .../consumer/internals/CoordinatorTest.java     |   6 +
 .../clients/consumer/internals/FetcherTest.java |  10 +-
 .../producer/internals/BufferPoolTest.java      |   6 +
 .../internals/RecordAccumulatorTest.java        |   6 +
 .../clients/producer/internals/SenderTest.java  |  77 ++++++----
 .../kafka/common/metrics/JmxReporterTest.java   |  18 ++-
 .../kafka/common/metrics/MetricsTest.java       |  64 +++++++-
 .../kafka/common/network/SSLSelectorTest.java   |   6 +-
 .../kafka/common/network/SelectorTest.java      |   5 +-
 .../org/apache/kafka/test/MetricsBench.java     |  38 ++---
 .../scala/kafka/server/ClientQuotaManager.scala |  13 +-
 .../controller/ControllerFailoverTest.scala     |   8 +-
 .../unit/kafka/network/SocketServerTest.scala   |  80 +++++-----
 .../server/HighwatermarkPersistenceTest.scala   | 149 ++++++++++---------
 .../unit/kafka/server/ISRExpirationTest.scala   |   1 +
 .../unit/kafka/server/LeaderElectionTest.scala  |  38 ++---
 .../unit/kafka/server/ReplicaManagerTest.scala  |  56 ++++---
 .../unit/kafka/server/SimpleFetchTest.scala     |   1 +
 20 files changed, 450 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 42936a1..be744ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -18,12 +18,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A registry of sensors and metrics.
@@ -56,6 +61,8 @@ public class Metrics implements Closeable {
     private final ConcurrentMap<Sensor, List<Sensor>> childrenSensors;
     private final List<MetricsReporter> reporters;
     private final Time time;
+    private final ScheduledThreadPoolExecutor metricsScheduler;
+    private static final Logger log = LoggerFactory.getLogger(Metrics.class);
 
     /**
      * Create a metrics repository with no metric reporters and default configuration.
@@ -95,6 +102,13 @@ public class Metrics implements Closeable {
         this.time = time;
         for (MetricsReporter reporter : reporters)
             reporter.init(new ArrayList<KafkaMetric>());
+        this.metricsScheduler = new ScheduledThreadPoolExecutor(1);
+        // Creating a daemon thread to not block shutdown
+        this.metricsScheduler.setThreadFactory(new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                return Utils.newThread("SensorExpiryThread", runnable, true);
+            }
+        });
     }
 
     /**
@@ -135,9 +149,23 @@ public class Metrics implements Closeable {
      * @return The sensor that is created
      */
     public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
+        return sensor(name, config, Long.MAX_VALUE, parents);
+    }
+
+    /**
+     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+     * receive every value recorded with this sensor.
+     * @param name The name of the sensor
+     * @param config A default configuration to use for this sensor for metrics that don't have their own config
+     * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time,
+     *                                        it is eligible for removal
+     * @param parents The parent sensors
+     * @return The sensor that is created
+     */
+    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor... parents) {
         Sensor s = getSensor(name);
         if (s == null) {
-            s = new Sensor(this, name, parents, config == null ? this.config : config, time);
+            s = new Sensor(this, name, parents, config == null ? this.config : config, time, inactiveSensorExpirationTimeSeconds);
             this.sensors.put(name, s);
             if (parents != null) {
                 for (Sensor parent : parents) {
@@ -149,6 +177,7 @@ public class Metrics implements Closeable {
                     children.add(s);
                 }
             }
+            log.debug("Added sensor with name {}", name);
         }
         return s;
     }
@@ -167,6 +196,7 @@ public class Metrics implements Closeable {
                     if (sensors.remove(name, sensor)) {
                         for (KafkaMetric metric : sensor.metrics())
                             removeMetric(metric.metricName());
+                        log.debug("Removed sensor with name {}", name);
                         childSensors = childrenSensors.remove(sensor);
                     }
                 }
@@ -244,6 +274,30 @@ public class Metrics implements Closeable {
         return this.metrics;
     }
 
+    /**
+     * This iterates over every Sensor and triggers a removeSensor if it has expired
+     * Package private for testing
+     */
+    class ExpireSensorTask implements Runnable {
+        public void run() {
+            for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
+                // removeSensor also locks the sensor object. This is fine because synchronized is reentrant
+                // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
+                // Calling record on C would cause a record on P as well.
+                // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
+                // that will cause C to also get removed.
+                // Since the expiration time is typically high it is not expected to be a significant concern
+                // and thus not necessary to optimize
+                synchronized (sensorEntry.getValue()) {
+                    if (sensorEntry.getValue().hasExpired()) {
+                        log.debug("Removing expired sensor {}", sensorEntry.getKey());
+                        removeSensor(sensorEntry.getKey());
+                    }
+                }
+            }
+        }
+    }
+
     /* For testing use only. */
     Map<Sensor, List<Sensor>> childrenSensors() {
         return Collections.unmodifiableMap(childrenSensors);
@@ -254,6 +308,13 @@ public class Metrics implements Closeable {
      */
     @Override
     public void close() {
+        this.metricsScheduler.shutdown();
+        try {
+            this.metricsScheduler.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException ex) {
+            // ignore and continue shutdown
+        }
+
         for (MetricsReporter reporter : this.reporters)
             reporter.close();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 4d55771..0c5bcb7 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -17,6 +17,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
@@ -37,16 +38,20 @@ public final class Sensor {
     private final List<KafkaMetric> metrics;
     private final MetricConfig config;
     private final Time time;
+    private volatile long lastRecordTime;
+    private final long inactiveSensorExpirationTimeMs;
 
-    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) {
+    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time, long inactiveSensorExpirationTimeSeconds) {
         super();
         this.registry = registry;
         this.name = Utils.notNull(name);
         this.parents = parents == null ? new Sensor[0] : parents;
-        this.metrics = new ArrayList<KafkaMetric>();
-        this.stats = new ArrayList<Stat>();
+        this.metrics = new ArrayList<>();
+        this.stats = new ArrayList<>();
         this.config = config;
         this.time = time;
+        this.inactiveSensorExpirationTimeMs = TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds, TimeUnit.SECONDS);
+        this.lastRecordTime = time.milliseconds();
         checkForest(new HashSet<Sensor>());
     }
 
@@ -91,6 +96,7 @@ public final class Sensor {
      *         bound
      */
     public void record(double value, long timeMs) {
+        this.lastRecordTime = time.milliseconds();
         synchronized (this) {
             // increment all the stats
             for (int i = 0; i < this.stats.size(); i++)
@@ -173,6 +179,14 @@ public final class Sensor {
         this.stats.add(stat);
     }
 
+    /**
+     * Return true if the Sensor is eligible for removal due to inactivity.
+     *        false otherwise
+     */
+    public boolean hasExpired() {
+        return (time.milliseconds() - this.lastRecordTime) > this.inactiveSensorExpirationTimeMs;
+    }
+
     synchronized List<KafkaMetric> metrics() {
         return Collections.unmodifiableList(this.metrics);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 12aee11..8e3c98e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -111,6 +112,11 @@ public class CoordinatorTest {
                 autoCommitIntervalMs);
     }
 
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
     @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b3169d8..f5f9ef1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -83,7 +84,8 @@ public class FetcherTest {
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
     private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
-    private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, new Metrics(time));
+    private Metrics fetcherMetrics = new Metrics(time);
+    private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
 
     @Before
     public void setup() throws Exception {
@@ -97,6 +99,12 @@ public class FetcherTest {
         records.flip();
     }
 
+    @After
+    public void teardown() {
+        this.metrics.close();
+        this.fetcherMetrics.close();
+    }
+
     @Test
     public void testFetchNormal() {
         List<ConsumerRecord<byte[], byte[]>> records;

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index ded5d3e..f8567e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -39,6 +40,11 @@ public class BufferPoolTest {
     String metricGroup = "TestMetrics";
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
 
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
     /**
      * Test the simple non-blocking allocation paths
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index dcc52b6..887499d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.junit.After;
 import org.junit.Test;
 
 public class RecordAccumulatorTest {
@@ -67,6 +68,11 @@ public class RecordAccumulatorTest {
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private final long maxBlockTimeMs = 1000;
 
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index bcf6a3a..bcc618a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -76,6 +77,11 @@ public class SenderTest {
         metricTags.put("client-id", CLIENT_ID);
     }
 
+    @After
+    public void tearDown() {
+        this.metrics.close();
+    }
+
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
@@ -114,44 +120,49 @@ public class SenderTest {
     public void testRetries() throws Exception {
         // create a sender with retries = 1
         int maxRetries = 1;
-        Sender sender = new Sender(client,
-                                   metadata,
-                                   this.accumulator,
-                                   MAX_REQUEST_SIZE,
-                                   ACKS_ALL,
-                                   maxRetries,
-                                   new Metrics(),
-                                   time,
-                                   "clientId",
-                                   REQUEST_TIMEOUT);
-        // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
-        assertEquals(1, client.inFlightRequestCount());
-        client.disconnect(client.requests().peek().request().destination());
-        assertEquals(0, client.inFlightRequestCount());
-        sender.run(time.milliseconds()); // receive error
-        sender.run(time.milliseconds()); // reconnect
-        sender.run(time.milliseconds()); // resend
-        assertEquals(1, client.inFlightRequestCount());
-        long offset = 0;
-        client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
-        sender.run(time.milliseconds());
-        assertTrue("Request should have retried and completed", future.isDone());
-        assertEquals(offset, future.get().offset());
-
-        // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
-        sender.run(time.milliseconds()); // send produce request
-        for (int i = 0; i < maxRetries + 1; i++) {
+        Metrics m = new Metrics();
+        try {
+            Sender sender = new Sender(client,
+                                       metadata,
+                                       this.accumulator,
+                                       MAX_REQUEST_SIZE,
+                                       ACKS_ALL,
+                                       maxRetries,
+                                       m,
+                                       time,
+                                       "clientId",
+                                       REQUEST_TIMEOUT);
+            // do a successful retry
+            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            sender.run(time.milliseconds()); // connect
+            sender.run(time.milliseconds()); // send produce request
+            assertEquals(1, client.inFlightRequestCount());
             client.disconnect(client.requests().peek().request().destination());
+            assertEquals(0, client.inFlightRequestCount());
             sender.run(time.milliseconds()); // receive error
             sender.run(time.milliseconds()); // reconnect
             sender.run(time.milliseconds()); // resend
+            assertEquals(1, client.inFlightRequestCount());
+            long offset = 0;
+            client.respond(produceResponse(tp, offset, Errors.NONE.code(), 0));
+            sender.run(time.milliseconds());
+            assertTrue("Request should have retried and completed", future.isDone());
+            assertEquals(offset, future.get().offset());
+
+            // do an unsuccessful retry
+            future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            sender.run(time.milliseconds()); // send produce request
+            for (int i = 0; i < maxRetries + 1; i++) {
+                client.disconnect(client.requests().peek().request().destination());
+                sender.run(time.milliseconds()); // receive error
+                sender.run(time.milliseconds()); // reconnect
+                sender.run(time.milliseconds()); // resend
+            }
+            sender.run(time.milliseconds());
+            completedWithError(future, Errors.NETWORK_EXCEPTION);
+        } finally {
+            m.close();
         }
-        sender.run(time.milliseconds());
-        completedWithError(future, Errors.NETWORK_EXCEPTION);
     }
 
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 07b1b60..90cd76f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -26,12 +26,16 @@ public class JmxReporterTest {
     @Test
     public void testJmxRegistration() throws Exception {
         Metrics metrics = new Metrics();
-        metrics.addReporter(new JmxReporter());
-        Sensor sensor = metrics.sensor("kafka.requests");
-        sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
-        sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total());
-        Sensor sensor2 = metrics.sensor("kafka.blah");
-        sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total());
-        sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total());
+        try {
+            metrics.addReporter(new JmxReporter());
+            Sensor sensor = metrics.sensor("kafka.requests");
+            sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
+            sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total());
+            Sensor sensor2 = metrics.sensor("kafka.blah");
+            sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total());
+            sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total());
+        } finally {
+            metrics.close();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 175a036..8d3e33d 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.MockTime;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class MetricsTest {
@@ -43,7 +45,17 @@ public class MetricsTest {
     private static final double EPS = 0.000001;
     private MockTime time = new MockTime();
     private MetricConfig config = new MetricConfig();
-    private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time);
+    private Metrics metrics;
+
+    @Before
+    public void setup() {
+        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time);
+    }
+
+    @After
+    public void tearDown() {
+        this.metrics.close();
+    }
 
     @Test
     public void testMetricName() {
@@ -200,6 +212,56 @@ public class MetricsTest {
     }
 
     @Test
+    public void testRemoveInactiveMetrics() {
+        Sensor s1 = metrics.sensor("test.s1", null, 1);
+        s1.add(new MetricName("test.s1.count", "grp1"), new Count());
+
+        Sensor s2 = metrics.sensor("test.s2", null, 3);
+        s2.add(new MetricName("test.s2.count", "grp1"), new Count());
+
+        Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
+        purger.run();
+        assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
+        assertNotNull("MetricName test.s1.count must be present",
+                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+        assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
+        assertNotNull("MetricName test.s2.count must be present",
+                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+
+        time.sleep(1001);
+        purger.run();
+        assertNull("Sensor test.s1 should have been purged", metrics.getSensor("test.s1"));
+        assertNull("MetricName test.s1.count should have been purged",
+                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+        assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
+        assertNotNull("MetricName test.s2.count must be present",
+                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+
+        // record a value in sensor s2. This should reset the clock for that sensor.
+        // It should not get purged at the 3 second mark after creation
+        s2.record();
+        time.sleep(2000);
+        purger.run();
+        assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
+        assertNotNull("MetricName test.s2.count must be present",
+                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+
+        // After another 1 second sleep, the metric should be purged
+        time.sleep(1000);
+        purger.run();
+        assertNull("Sensor test.s2 should have been purged", metrics.getSensor("test.s1"));
+        assertNull("MetricName test.s2.count should have been purged",
+                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+
+        // After purging, it should be possible to recreate a metric
+        s1 = metrics.sensor("test.s1", null, 1);
+        s1.add(new MetricName("test.s1.count", "grp1"), new Count());
+        assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
+        assertNotNull("MetricName test.s1.count must be present",
+                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+    }
+
+    @Test
     public void testRemoveMetric() {
         metrics.addMetric(new MetricName("test1", "grp1"), new Count());
         metrics.addMetric(new MetricName("test2", "grp1"), new Count());

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
index c28d427..c60053f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
  */
 public class SSLSelectorTest extends SelectorTest {
 
+    private Metrics metrics;
+
     @Before
     public void setup() throws Exception {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
@@ -48,13 +50,15 @@ public class SSLSelectorTest extends SelectorTest {
 
         this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+        this.metrics = new Metrics();
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
     }
 
     @After
     public void teardown() throws Exception {
         this.selector.close();
         this.server.close();
+        this.metrics.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index bfc4be5..6aa60ce 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -44,6 +44,7 @@ public class SelectorTest {
     protected Time time;
     protected Selectable selector;
     protected ChannelBuilder channelBuilder;
+    private Metrics metrics;
 
     @Before
     public void setup() throws Exception {
@@ -54,13 +55,15 @@ public class SelectorTest {
         this.time = new MockTime();
         this.channelBuilder = new PlaintextChannelBuilder();
         this.channelBuilder.configure(configs);
-        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+        this.metrics = new Metrics();
+        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
     }
 
     @After
     public void teardown() throws Exception {
         this.selector.close();
         this.server.close();
+        this.metrics.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 633d4bb..5222cd0 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -29,23 +29,27 @@ public class MetricsBench {
     public static void main(String[] args) {
         long iters = Long.parseLong(args[0]);
         Metrics metrics = new Metrics();
-        Sensor parent = metrics.sensor("parent");
-        Sensor child = metrics.sensor("child", parent);
-        for (Sensor sensor : Arrays.asList(parent, child)) {
-            sensor.add(new MetricName(sensor.name() + ".avg", "grp1"), new Avg());
-            sensor.add(new MetricName(sensor.name() + ".count", "grp1"), new Count());
-            sensor.add(new MetricName(sensor.name() + ".max", "grp1"), new Max());
-            sensor.add(new Percentiles(1024,
-                                       0.0,
-                                       iters,
-                                       BucketSizing.CONSTANT,
-                                       new Percentile(new MetricName(sensor.name() + ".median", "grp1"), 50.0),
-                                       new Percentile(new MetricName(sensor.name() +  ".p_99", "grp1"), 99.0)));
+        try {
+            Sensor parent = metrics.sensor("parent");
+            Sensor child = metrics.sensor("child", parent);
+            for (Sensor sensor : Arrays.asList(parent, child)) {
+                sensor.add(new MetricName(sensor.name() + ".avg", "grp1"), new Avg());
+                sensor.add(new MetricName(sensor.name() + ".count", "grp1"), new Count());
+                sensor.add(new MetricName(sensor.name() + ".max", "grp1"), new Max());
+                sensor.add(new Percentiles(1024,
+                        0.0,
+                        iters,
+                        BucketSizing.CONSTANT,
+                        new Percentile(new MetricName(sensor.name() + ".median", "grp1"), 50.0),
+                        new Percentile(new MetricName(sensor.name() +  ".p_99", "grp1"), 99.0)));
+            }
+            long start = System.nanoTime();
+            for (int i = 0; i < iters; i++)
+                parent.record(i);
+            double ellapsed = (System.nanoTime() - start) / (double) iters;
+            System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
+        } finally {
+            metrics.close();
         }
-        long start = System.nanoTime();
-        for (int i = 0; i < iters; i++)
-            parent.record(i);
-        double ellapsed = (System.nanoTime() - start) / (double) iters;
-        System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 39dd65a..b21785f 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -57,6 +57,8 @@ object ClientQuotaManagerConfig {
   val DefaultNumQuotaSamples = 11
   val DefaultQuotaWindowSizeSeconds = 1
   val MaxThrottleTimeSeconds = 30
+  // Purge sensors after 1 hour of inactivity
+  val InactiveSensorExpirationTimeSeconds  = 3600
 }
 
 /**
@@ -195,14 +197,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       try {
         quotaSensor = metrics.getSensor(quotaSensorName)
         if (quotaSensor == null) {
-          // create the throttle time sensor also
-          throttleTimeSensor = metrics.sensor(throttleTimeSensorName)
+          // create the throttle time sensor also. Use default metric config
+          throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
+                                              null,
+                                              ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
           throttleTimeSensor.add(new MetricName("throttle-time",
                                                 apiKey,
                                                 "Tracking average throttle-time per client",
                                                 "client-id",
                                                 clientId), new Avg())
-          quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId)))
+
+          quotaSensor = metrics.sensor(quotaSensorName,
+                                       getQuotaMetricConfig(quota(clientId)),
+                                       ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
           quotaSensor.add(clientRateMetricName(clientId), new Rate())
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 985c64f..c93eca5 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -41,6 +41,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   val msgQueueSize = 1
   val topic = "topic1"
   val overridingProps = new Properties()
+  val metrics = new Metrics()
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
   override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
@@ -54,6 +55,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   @After
   override def tearDown() {
     super.tearDown()
+    this.metrics.close()
   }
 
   /**
@@ -83,7 +85,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     // Replace channel manager with our mock manager
     controller.kafkaController.controllerContext.controllerChannelManager.shutdown()
     val channelManager = new MockChannelManager(controller.kafkaController.controllerContext, 
-                                                  controller.kafkaController.config)
+                                                  controller.kafkaController.config, metrics)
     channelManager.startup()
     controller.kafkaController.controllerContext.controllerChannelManager = channelManager
     channelManager.shrinkBlockingQueue(0)
@@ -149,8 +151,8 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   }
 }
 
-class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig)
-  extends ControllerChannelManager(controllerContext, config, new SystemTime, new Metrics) {
+class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig, metrics: Metrics)
+  extends ControllerChannelManager(controllerContext, config, new SystemTime, metrics) {
 
   def stopSendThread(brokerId: Int) {
     val requestThread = brokerStateInfo(brokerId).requestSendThread

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 1585e71..533538d 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -52,7 +52,8 @@ class SocketServerTest extends JUnitSuite {
   props.put("max.connections.per.ip", "5")
   props.put("connections.max.idle.ms", "60000")
   val config: KafkaConfig = KafkaConfig.fromProps(props)
-  val server: SocketServer = new SocketServer(config, new Metrics(), new SystemTime())
+  val metrics = new Metrics()
+  val server: SocketServer = new SocketServer(config, metrics, new SystemTime())
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -87,6 +88,7 @@ class SocketServerTest extends JUnitSuite {
 
   @After
   def cleanup() {
+    metrics.close()
     server.shutdown()
   }
 
@@ -180,15 +182,20 @@ class SocketServerTest extends JUnitSuite {
     val overrideNum = 6
     val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
     val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
-    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new SystemTime())
-    overrideServer.startup()
-    // make the maximum allowable number of connections and then leak them
-    val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
-    // now try one more (should fail)
-    val conn = connect(overrideServer)
-    conn.setSoTimeout(3000)
-    assertEquals(-1, conn.getInputStream.read())
-    overrideServer.shutdown()
+    val serverMetrics = new Metrics()
+    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), serverMetrics, new SystemTime())
+    try {
+      overrideServer.startup()
+      // make the maximum allowable number of connections and then leak them
+      val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
+      // now try one more (should fail)
+      val conn = connect(overrideServer)
+      conn.setSoTimeout(3000)
+      assertEquals(-1, conn.getInputStream.read())
+    } finally {
+      overrideServer.shutdown()
+      serverMetrics.close()
+    }
   }
 
   @Test
@@ -197,31 +204,36 @@ class SocketServerTest extends JUnitSuite {
     val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0, enableSSL = true, trustStoreFile = Some(trustStoreFile))
     overrideprops.put("listeners", "SSL://localhost:0")
 
-    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new SystemTime())
+    val serverMetrics = new Metrics()
+    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), serverMetrics, new SystemTime())
     overrideServer.startup()
-    val sslContext = SSLContext.getInstance("TLSv1.2")
-    sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
-    val socketFactory = sslContext.getSocketFactory
-    val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket]
-    sslSocket.setNeedClientAuth(false)
-
-    val correlationId = -1
-    val clientId = SyncProducerConfig.DefaultClientId
-    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
-
-    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
-    emptyRequest.writeTo(byteBuffer)
-    byteBuffer.rewind()
-    val serializedBytes = new Array[Byte](byteBuffer.remaining)
-    byteBuffer.get(serializedBytes)
-
-    sendRequest(sslSocket, 0, serializedBytes)
-    processRequest(overrideServer.requestChannel)
-    assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
-    overrideServer.shutdown()
+    try {
+      val sslContext = SSLContext.getInstance("TLSv1.2")
+      sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
+      val socketFactory = sslContext.getSocketFactory
+      val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket]
+      sslSocket.setNeedClientAuth(false)
+
+      val correlationId = -1
+      val clientId = SyncProducerConfig.DefaultClientId
+      val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+      val ack = SyncProducerConfig.DefaultRequiredAcks
+      val emptyRequest =
+        new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
+
+      val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+      emptyRequest.writeTo(byteBuffer)
+      byteBuffer.rewind()
+      val serializedBytes = new Array[Byte](byteBuffer.remaining)
+      byteBuffer.get(serializedBytes)
+
+      sendRequest(sslSocket, 0, serializedBytes)
+      processRequest(overrideServer.requestChannel)
+      assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
+    } finally {
+      overrideServer.shutdown()
+      serverMetrics.close()
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index bab81df..c288e56 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -54,32 +54,37 @@ class HighwatermarkPersistenceTest {
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
+    val metrics = new Metrics
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new Metrics, new MockTime, new JMockTime, zkClient, scheduler,
+    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkClient, scheduler,
       logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
-    replicaManager.checkpointHighWatermarks()
-    var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-    assertEquals(0L, fooPartition0Hw)
-    val partition0 = replicaManager.getOrCreatePartition(topic, 0)
-    // create leader and follower replicas
-    val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
-    val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
-    partition0.addReplicaIfNotExists(leaderReplicaPartition0)
-    val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
-    partition0.addReplicaIfNotExists(followerReplicaPartition0)
-    replicaManager.checkpointHighWatermarks()
-    fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-    assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
-    // set the high watermark for local replica
-    partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
-    replicaManager.checkpointHighWatermarks()
-    fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-    assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
-    EasyMock.verify(zkClient)
-
-    // shutdown the replica manager upon test completion
-    replicaManager.shutdown(false)
+    try {
+      replicaManager.checkpointHighWatermarks()
+      var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
+      assertEquals(0L, fooPartition0Hw)
+      val partition0 = replicaManager.getOrCreatePartition(topic, 0)
+      // create leader and follower replicas
+      val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
+      val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
+      partition0.addReplicaIfNotExists(leaderReplicaPartition0)
+      val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
+      partition0.addReplicaIfNotExists(followerReplicaPartition0)
+      replicaManager.checkpointHighWatermarks()
+      fooPartition0Hw = hwmFor(replicaManager, topic, 0)
+      assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
+      // set the high watermark for local replica
+      partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
+      replicaManager.checkpointHighWatermarks()
+      fooPartition0Hw = hwmFor(replicaManager, topic, 0)
+      assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
+      EasyMock.verify(zkClient)
+    } finally {
+      // shutdown the replica manager upon test completion
+      replicaManager.shutdown(false)
+      metrics.close()
+      scheduler.shutdown()
+    }
   }
 
   @Test
@@ -92,56 +97,60 @@ class HighwatermarkPersistenceTest {
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
+    val metrics = new Metrics
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new Metrics, new MockTime(), new JMockTime, zkClient,
+    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkClient,
       scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
-    replicaManager.checkpointHighWatermarks()
-    var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-    assertEquals(0L, topic1Partition0Hw)
-    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
-    // create leader log
-    val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
-    // create a local replica for topic1
-    val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
-    topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
-    replicaManager.checkpointHighWatermarks()
-    topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-    assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw)
-    // set the high watermark for local replica
-    topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
-    replicaManager.checkpointHighWatermarks()
-    topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
-    assertEquals(5L, topic1Partition0Hw)
-    // add another partition and set highwatermark
-    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
-    // create leader log
-    val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
-    // create a local replica for topic2
-    val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
-    topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
-    replicaManager.checkpointHighWatermarks()
-    var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
-    assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw)
-    // set the highwatermark for local replica
-    topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L)
-    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset)
-    // change the highwatermark for topic1
-    topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L)
-    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
-    replicaManager.checkpointHighWatermarks()
-    // verify checkpointed hw for topic 2
-    topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
-    assertEquals(15L, topic2Partition0Hw)
-    // verify checkpointed hw for topic 1
-    topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-    assertEquals(10L, topic1Partition0Hw)
-    EasyMock.verify(zkClient)
-
-    // shutdown the replica manager upon test completion
-    replicaManager.shutdown(false)
-
+    try {
+      replicaManager.checkpointHighWatermarks()
+      var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
+      assertEquals(0L, topic1Partition0Hw)
+      val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
+      // create leader log
+      val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
+      // create a local replica for topic1
+      val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
+      topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
+      replicaManager.checkpointHighWatermarks()
+      topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
+      assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw)
+      // set the high watermark for local replica
+      topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
+      replicaManager.checkpointHighWatermarks()
+      topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
+      assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
+      assertEquals(5L, topic1Partition0Hw)
+      // add another partition and set highwatermark
+      val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
+      // create leader log
+      val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
+      // create a local replica for topic2
+      val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
+      topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
+      replicaManager.checkpointHighWatermarks()
+      var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
+      assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw)
+      // set the highwatermark for local replica
+      topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L)
+      assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset)
+      // change the highwatermark for topic1
+      topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L)
+      assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
+      replicaManager.checkpointHighWatermarks()
+      // verify checkpointed hw for topic 2
+      topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
+      assertEquals(15L, topic2Partition0Hw)
+      // verify checkpointed hw for topic 1
+      topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
+      assertEquals(10L, topic1Partition0Hw)
+      EasyMock.verify(zkClient)
+    } finally {
+      // shutdown the replica manager upon test completion
+      replicaManager.shutdown(false)
+      metrics.close()
+      scheduler.shutdown()
+    }
   }
 
   def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 26910a8..89a8fd9 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -58,6 +58,7 @@ class IsrExpirationTest {
   @After
   def tearDown() {
     replicaManager.shutdown(false)
+    metrics.close()
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index ac347ef..0efaa6a 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -138,24 +138,28 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
     val controllerContext = new ControllerContext(zkClient, zkConnection, 6000)
     controllerContext.liveBrokers = brokers.toSet
-    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, new Metrics)
+    val metrics = new Metrics
+    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)
     controllerChannelManager.startup()
-    val staleControllerEpoch = 0
-    val partitionStates = Map(
-      new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
-        Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
-        Set(0, 1).map(Integer.valueOf).asJava)
-    )
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch, partitionStates.asJava,
-      brokerEndPoints.toSet.asJava)
-
-    controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
-      staleControllerEpochCallback)
-    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
-                            "Controller epoch should be stale")
-    assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
-
-    controllerChannelManager.shutdown()
+    try {
+      val staleControllerEpoch = 0
+      val partitionStates = Map(
+        new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
+          Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
+          Set(0, 1).map(Integer.valueOf).asJava)
+      )
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch, partitionStates.asJava,
+        brokerEndPoints.toSet.asJava)
+
+      controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
+        staleControllerEpochCallback)
+      TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
+        "Controller epoch should be stale")
+      assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
+    } finally {
+      controllerChannelManager.shutdown()
+      metrics.close()
+    }
   }
 
   private def staleControllerEpochCallback(response: AbstractRequestResponse): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 301e268..1813349 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -45,14 +45,18 @@ class ReplicaManagerTest {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
-    val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
+    val metrics = new Metrics
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
-    val partition = rm.getOrCreatePartition(topic, 1)
-    partition.getOrCreateReplica(1)
-    rm.checkpointHighWatermarks()
-
-    // shutdown the replica manager upon test completion
-    rm.shutdown(false)
+    try {
+      val partition = rm.getOrCreatePartition(topic, 1)
+      partition.getOrCreateReplica(1)
+      rm.checkpointHighWatermarks()
+    } finally {
+      // shutdown the replica manager upon test completion
+      rm.shutdown(false)
+      metrics.close()
+    }
   }
 
   @Test
@@ -64,14 +68,18 @@ class ReplicaManagerTest {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
-    val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
+    val metrics = new Metrics
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
-    val partition = rm.getOrCreatePartition(topic, 1)
-    partition.getOrCreateReplica(1)
-    rm.checkpointHighWatermarks()
-
-    // shutdown the replica manager upon test completion
-    rm.shutdown(false)
+    try {
+      val partition = rm.getOrCreatePartition(topic, 1)
+      partition.getOrCreateReplica(1)
+      rm.checkpointHighWatermarks()
+    } finally {
+      // shutdown the replica manager upon test completion
+      rm.shutdown(false)
+      metrics.close()
+    }
   }
 
   @Test
@@ -82,18 +90,20 @@ class ReplicaManagerTest {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
-    val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
+    val metrics = new Metrics
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), Option(this.getClass.getName))
-    val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
-    def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
-      assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)
+    try {
+      val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
+      def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
+        assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)
+      }
+      rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
+    } finally {
+      rm.shutdown(false)
+      metrics.close()
     }
 
-    rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
-
-    rm.shutdown(false)
-
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
-
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/118912e7/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 884ec06..0485f7b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -123,6 +123,7 @@ class SimpleFetchTest {
   @After
   def tearDown() {
     replicaManager.shutdown(false)
+    metrics.close()
   }
 
   /**


Mime
View raw message