distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [14/20] incubator-distributedlog git commit: DL-107: Added unregistering gauges for distributedlog-core and distributedlog-benchmark
Date Wed, 28 Dec 2016 01:05:22 GMT
DL-107: Added unregistering gauges for distributedlog-core and distributedlog-benchmark


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c6ea1755
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c6ea1755
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c6ea1755

Branch: refs/heads/master
Commit: c6ea17555adfc325c0ec995d59c8d14a9dac2ad1
Parents: ab0868c
Author: Phillip Su <psu@twitter.com>
Authored: Mon Aug 22 17:47:28 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Tue Dec 27 16:49:28 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/benchmark/ReaderWorker.java  | 11 ++-
 .../BKDistributedLogNamespace.java              |  4 +
 .../twitter/distributedlog/ZooKeeperClient.java |  2 +
 .../readahead/ReadAheadTracker.java             | 32 ++++++--
 .../readahead/ReadAheadWorker.java              |  2 +
 .../util/LimitedPermitManager.java              | 14 +++-
 .../MonitoredScheduledThreadPoolExecutor.java   | 42 +++++++----
 .../distributedlog/util/OrderedScheduler.java   |  2 +
 .../util/SimplePermitLimiter.java               | 16 +++-
 .../distributedlog/zk/ZKWatcherManager.java     | 21 +++++-
 .../TestDistributedLogConfiguration.java        |  5 +-
 .../distributedlog/zk/TestZKWatcherManager.java |  3 +-
 .../service/DistributedLogServiceImpl.java      | 77 +++++++++++++-------
 .../distributedlog/service/MonitorService.java  | 35 ++++++---
 .../service/stream/StreamImpl.java              | 32 +++++---
 15 files changed, 220 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
index 3e82e30..9817d94 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
@@ -106,12 +106,13 @@ public class ReaderWorker implements Worker {
         final String streamName;
         DLSN prevDLSN = null;
         long prevSequenceId = Long.MIN_VALUE;
+        private static final String gaugeLabel = "sequence_id";
 
         StreamReader(int idx, StatsLogger statsLogger) {
             this.streamIdx = idx;
             int streamId = startStreamId + streamIdx;
             streamName = String.format("%s_%d", streamPrefix, streamId);
-            statsLogger.scope(streamName).registerGauge("sequence_id", this);
+            statsLogger.scope(streamName).registerGauge(gaugeLabel, this);
         }
 
         @Override
@@ -218,6 +219,10 @@ public class ReaderWorker implements Worker {
         public synchronized Number getSample() {
             return prevSequenceId;
         }
+
+        void unregisterGauge() {
+            statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this);
+        }
     }
 
     public ReaderWorker(DistributedLogConfiguration conf,
@@ -446,6 +451,10 @@ public class ReaderWorker implements Worker {
         for (DLZkServerSet serverSet: serverSets) {
             serverSet.close();
         }
+        // Unregister gauges to prevent GC spirals
+        for(StreamReader sr : streamReaders) {
+            sr.unregisterGauge();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index cae6f6a..0df2f1c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -1079,6 +1079,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
             LOG.info("Ledger Allocator stopped.");
         }
 
+        // Unregister gauge to avoid GC spiral
+        ((LimitedPermitManager)this.logSegmentRollingPermitManager).unregisterGauge();
+        ((SimplePermitLimiter)this.writeLimiter).unregisterGauge();
+
         // Shutdown log segment metadata stores
         Utils.close(writerSegmentMetadataStore);
         Utils.close(readerSegmentMetadataStore);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
index 9ea9e37..74cd6cf 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
@@ -395,6 +395,8 @@ public class ZooKeeperClient {
         }
         LOG.info("Close zookeeper client {}.", name);
         closeInternal();
+        // unregister gauges to prevent GC spiral
+        this.watcherManager.unregisterGauges();
         closed = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
index 5c0fd4b..39a627f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
@@ -32,13 +32,22 @@ public class ReadAheadTracker {
     final AtomicLong ticks = new AtomicLong(0);
     // which phase that the worker is in.
     ReadAheadPhase phase;
+    private final StatsLogger statsLogger;
+    // Gauges and their labels
+    private static final String phaseGaugeLabel = "phase";
+    private final Gauge<Number> phaseGauge;
+    private static final String ticksGaugeLabel = "ticks";
+    private final Gauge<Number> ticksGauge;
+    private static final String cachEntriesGaugeLabel = "cache_entries";
+    private final Gauge<Number> cacheEntriesGauge;
 
     ReadAheadTracker(String streamName,
                      final ReadAheadCache cache,
                      ReadAheadPhase initialPhase,
                      StatsLogger statsLogger) {
+        this.statsLogger = statsLogger;
         this.phase = initialPhase;
-        statsLogger.registerGauge("phase", new Gauge<Number>() {
+        phaseGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return ReadAheadPhase.SCHEDULE_READAHEAD.getCode();
@@ -48,8 +57,10 @@ public class ReadAheadTracker {
             public Number getSample() {
                 return phase.getCode();
             }
-        });
-        statsLogger.registerGauge("ticks", new Gauge<Number>() {
+        };
+        this.statsLogger.registerGauge(phaseGaugeLabel, phaseGauge);
+
+        ticksGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -59,8 +70,10 @@ public class ReadAheadTracker {
             public Number getSample() {
                 return ticks.get();
             }
-        });
-        statsLogger.registerGauge("cache_entries", new Gauge<Number>() {
+        };
+        this.statsLogger.registerGauge(ticksGaugeLabel, ticksGauge);
+
+        cacheEntriesGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -70,7 +83,8 @@ public class ReadAheadTracker {
             public Number getSample() {
                 return cache.getNumCachedRecords();
             }
-        });
+        };
+        this.statsLogger.registerGauge(cachEntriesGaugeLabel, cacheEntriesGauge);
     }
 
     ReadAheadPhase getPhase() {
@@ -81,4 +95,10 @@ public class ReadAheadTracker {
         this.ticks.incrementAndGet();
         this.phase = readAheadPhase;
     }
+
+    public void unregisterGauge() {
+        this.statsLogger.unregisterGauge(phaseGaugeLabel, phaseGauge);
+        this.statsLogger.unregisterGauge(ticksGaugeLabel, ticksGauge);
+        this.statsLogger.unregisterGauge(cachEntriesGaugeLabel, cacheEntriesGauge);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 9ba8ca4..83a34a3 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -356,6 +356,8 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
     public Future<Void> asyncClose() {
         LOG.info("Stopping Readahead worker for {}", fullyQualifiedName);
         running = false;
+        // Unregister associated gauages to prevent GC spiral
+        this.tracker.unregisterGauge();
 
         // Aside from unfortunate naming of variables, this allows
         // the currently active long poll to be interrupted and completed

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
index 7357410..4b917b2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
@@ -68,6 +68,8 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher
{
     final TimeUnit timeUnit;
     final ScheduledExecutorService executorService;
     final AtomicInteger epoch = new AtomicInteger(0);
+    private StatsLogger statsLogger = null;
+    private Gauge<Number> outstandingGauge = null;
 
     public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit,
                                 ScheduledExecutorService executorService) {
@@ -84,7 +86,8 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher
{
         this.period = period;
         this.timeUnit = timeUnit;
         this.executorService = executorService;
-        statsLogger.scope("permits").registerGauge("outstanding", new Gauge<Number>()
{
+        this.statsLogger = statsLogger;
+        this.outstandingGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -94,7 +97,8 @@ public class LimitedPermitManager implements PermitManager, Runnable, Watcher
{
             public Number getSample() {
                 return null == semaphore ? 0 : concurrency - semaphore.availablePermits();
             }
-        });
+        };
+        this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge);
     }
 
     @Override
@@ -176,4 +180,10 @@ public class LimitedPermitManager implements PermitManager, Runnable,
Watcher {
             }
         }
     }
+
+    public void unregisterGauge() {
+        if(this.statsLogger != null && this.outstandingGauge != null) {
+            this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
index 2bc7f82..512a456 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
@@ -110,6 +110,14 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
     protected final boolean traceTaskExecution;
     protected final OpStatsLogger taskExecutionStats;
     protected final OpStatsLogger taskPendingStats;
+    protected final StatsLogger statsLogger;
+    // Gauges and their labels
+    private static final String pendingTasksGaugeLabel = "pending_tasks";
+    private final Gauge<Number> pendingTasksGauge;
+    private static final String completedTasksGaugeLabel = "completed_tasks";
+    protected final Gauge<Number> completedTasksGauge;
+    private static final String totalTasksGaugeLabel = "total_tasks";
+    protected final Gauge<Number> totalTasksGauge;
 
     public MonitoredScheduledThreadPoolExecutor(int corePoolSize,
                                                 ThreadFactory threadFactory,
@@ -117,11 +125,10 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
                                                 boolean traceTaskExecution) {
         super(corePoolSize, threadFactory);
         this.traceTaskExecution = traceTaskExecution;
-
-        this.taskPendingStats = statsLogger.getOpStatsLogger("task_pending_time");
-        this.taskExecutionStats = statsLogger.getOpStatsLogger("task_execution_time");
-        // outstanding tasks
-        statsLogger.registerGauge("pending_tasks", new Gauge<Number>() {
+        this.statsLogger = statsLogger;
+        this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time");
+        this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time");
+        this.pendingTasksGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -131,9 +138,8 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
             public Number getSample() {
                 return getQueue().size();
             }
-        });
-        // completed tasks
-        statsLogger.registerGauge("completed_tasks", new Gauge<Number>() {
+        };
+        this.completedTasksGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -143,9 +149,8 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
             public Number getSample() {
                 return getCompletedTaskCount();
             }
-        });
-        // total tasks
-        statsLogger.registerGauge("total_tasks", new Gauge<Number>() {
+        };
+        this.totalTasksGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -155,7 +160,14 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
             public Number getSample() {
                 return getTaskCount();
             }
-        });
+        };
+
+        // outstanding tasks
+        this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge);
+        // completed tasks
+        this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge);
+        // total tasks
+        this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge);
     }
 
     private Runnable timedRunnable(Runnable r) {
@@ -236,6 +248,10 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe
         return null;
     }
 
-
+    void unregisterGauges() {
+        this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge);
+        this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge);
+        this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
index d3385a6..9f34902 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
@@ -310,6 +310,8 @@ public class OrderedScheduler implements ScheduledExecutorService {
     @Override
     public void shutdown() {
         for (MonitoredScheduledThreadPoolExecutor executor : executors) {
+            // Unregister gauges
+            executor.unregisterGauges();
             executor.shutdown();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
index 298a5ff..2482ece 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
@@ -47,6 +47,9 @@ public class SimplePermitLimiter implements PermitLimiter {
     final int permitsMax;
     final boolean darkmode;
     final Feature disableWriteLimitFeature;
+    private StatsLogger statsLogger = null;
+    private Gauge<Number> permitsGauge = null;
+    private String permitsGaugeLabel = "";
 
     public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger statsLogger,
                                boolean singleton, Feature disableWriteLimitFeature) {
@@ -57,7 +60,8 @@ public class SimplePermitLimiter implements PermitLimiter {
 
         // stats
         if (singleton) {
-            statsLogger.registerGauge("num_permits", new Gauge<Number>() {
+            this.statsLogger = statsLogger;
+            this.permitsGauge = new Gauge<Number>() {
                 @Override
                 public Number getDefaultValue() {
                     return 0;
@@ -66,7 +70,9 @@ public class SimplePermitLimiter implements PermitLimiter {
                 public Number getSample() {
                     return permits.get();
                 }
-            });
+            };
+            this.permitsGaugeLabel = "permits";
+            statsLogger.registerGauge(permitsGaugeLabel, permitsGauge);
         }
         acquireFailureCounter = statsLogger.getCounter("acquireFailure");
         permitsMetric = statsLogger.getOpStatsLogger("permits");
@@ -97,4 +103,10 @@ public class SimplePermitLimiter implements PermitLimiter {
     public int getPermits() {
         return permits.get();
     }
+
+    public void unregisterGauge() {
+        if (this.statsLogger != null && this.permitsGauge != null) {
+            this.statsLogger.unregisterGauge(permitsGaugeLabel, permitsGauge);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
index 03b2841..8ef33ea 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
@@ -78,6 +78,11 @@ public class ZKWatcherManager implements Watcher {
     private final String name;
     private final ZooKeeperClient zkc;
     private final StatsLogger statsLogger;
+    // Gauges and their labels
+    private final Gauge<Number> totalWatchesGauge;
+    private static final String totalWatchesGauageLabel = "total_watches";
+    private final Gauge<Number> numChildWatchesGauge;
+    private static final String numChildWatchesGauageLabel = "num_child_watches";
 
     protected final ConcurrentMap<String, Set<Watcher>> childWatches;
     protected final AtomicInteger allWatchesGauge;
@@ -94,7 +99,7 @@ public class ZKWatcherManager implements Watcher {
         this.allWatchesGauge = new AtomicInteger(0);
 
         // stats
-        this.statsLogger.registerGauge("total_watches", new Gauge<Number>() {
+        totalWatchesGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -104,9 +109,10 @@ public class ZKWatcherManager implements Watcher {
             public Number getSample() {
                 return allWatchesGauge.get();
             }
-        });
+        };
+        this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge);
 
-        this.statsLogger.registerGauge("num_child_watches", new Gauge<Number>() {
+        numChildWatchesGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -116,7 +122,9 @@ public class ZKWatcherManager implements Watcher {
             public Number getSample() {
                 return childWatches.size();
             }
-        });
+        };
+
+        this.statsLogger.registerGauge(numChildWatchesGauageLabel, numChildWatchesGauge);
     }
 
     public Watcher registerChildWatcher(String path, Watcher watcher) {
@@ -178,6 +186,11 @@ public class ZKWatcherManager implements Watcher {
         }
     }
 
+    public void unregisterGauges() {
+        this.statsLogger.unregisterGauge(totalWatchesGauageLabel, totalWatchesGauge);
+        this.statsLogger.unregisterGauge(numChildWatchesGauageLabel, numChildWatchesGauge);
+    }
+
     @Override
     public void process(WatchedEvent event) {
         switch (event.getType()) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java
index 8dcb053..19b9863 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogConfiguration.java
@@ -109,14 +109,15 @@ public class TestDistributedLogConfiguration {
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
         // validate default configuration
         conf.validate();
-        // test invalid timeout, should throw exception
+        // test equal, should not throw exception
         conf.setReadLACLongPollTimeout(conf.getBKClientReadTimeout() * 1000);
         try {
             conf.validate();
         } catch (IllegalArgumentException e){
             exceptionThrown=true;
         }
-        assertTrue(exceptionThrown);
+        assertFalse(exceptionThrown);
+        // test invalid case, should throw exception
         exceptionThrown=false;
         conf.setReadLACLongPollTimeout(conf.getBKClientReadTimeout() * 1000 * 2);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
index 3ad181d..b702d4c 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
@@ -73,7 +73,8 @@ public class TestZKWatcherManager {
 
         // unregister watcher
         watcherManager.unregisterChildWatcher(path, watcher, true);
-
+        // unregister gauges
+        watcherManager.unregisterGauges();
         assertEquals(0, watcherManager.childWatches.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 74da34f..677ade5 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -150,6 +150,11 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters =
             new ConcurrentHashMap<StatusCode, Counter>();
     private final Counter statusCodeTotal;
+    private final Gauge<Number> proxyStatusGauge;
+    private final Gauge<Number> movingAvgRpsGauge;
+    private final Gauge<Number> movingAvgBpsGauge;
+    private final Gauge<Number> streamAcquiredGauge;
+    private final Gauge<Number> streamCachedGauge;
 
     DistributedLogServiceImpl(ServerConfiguration serverConf,
                               DistributedLogConfiguration dlConf,
@@ -260,9 +265,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         // Stats
         this.statsLogger = statsLogger;
 
-        // Stats on server
-        // Gauge for server status/health
-        statsLogger.registerGauge("proxy_status", new Gauge<Number>() {
+        // Gauges for server status/health
+        this.proxyStatusGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -271,11 +275,10 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             @Override
             public Number getSample() {
                 return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
?
-                        3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
+                    3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
             }
-        });
-        // Global moving average rps
-        statsLogger.registerGauge("moving_avg_rps", new Gauge<Number>() {
+        };
+        this.movingAvgRpsGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -285,9 +288,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             public Number getSample() {
                 return windowedRps.get();
             }
-        });
-        // Global moving average bps
-        statsLogger.registerGauge("moving_avg_bps", new Gauge<Number>() {
+        };
+        this.movingAvgBpsGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -297,19 +299,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             public Number getSample() {
                 return windowedBps.get();
             }
-        });
-
-        // Stats on requests
-        this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending");
-        this.writePendingStat = streamOpStats.requestPendingCounter("writePending");
-        this.redirects = streamOpStats.requestCounter("redirect");
-        this.statusCodeStatLogger = streamOpStats.requestScope("statuscode");
-        this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count");
-        this.receivedRecordCounter = streamOpStats.recordsCounter("received");
-
-        // Stats on streams
-        StatsLogger streamsStatsLogger = statsLogger.scope("streams");
-        streamsStatsLogger.registerGauge("acquired", new Gauge<Number>() {
+        };
+        // Gauges for streams
+        this.streamAcquiredGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -319,8 +311,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             public Number getSample() {
                 return streamManager.numAcquired();
             }
-        });
-        streamsStatsLogger.registerGauge("cached", new Gauge<Number>() {
+        };
+        this.streamCachedGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -330,7 +322,26 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             public Number getSample() {
                 return streamManager.numCached();
             }
-        });
+        };
+
+        // Stats on server
+        statsLogger.registerGauge("proxy_status", proxyStatusGauge);
+        // Global moving average rps
+        statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge);
+        // Global moving average bps
+        statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge);
+        // Stats on requests
+        this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending");
+        this.writePendingStat = streamOpStats.requestPendingCounter("writePending");
+        this.redirects = streamOpStats.requestCounter("redirect");
+        this.statusCodeStatLogger = streamOpStats.requestScope("statuscode");
+        this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count");
+        this.receivedRecordCounter = streamOpStats.recordsCounter("received");
+
+        // Stats for streams
+        StatsLogger streamsStatsLogger = statsLogger.scope("streams");
+        streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge);
+        streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
 
         // Setup complete
         logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream
stat {}, dlsn version {}.",
@@ -669,6 +680,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             // Stop the timer.
             timer.stop();
 
+            // clean up gauge
+            unregisterGauge();
+
             // shutdown the executor after requesting closing streams.
             SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS);
         } catch (Exception ex) {
@@ -704,6 +718,17 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         }
     }
 
+    /**
+     * clean up the gauge before we close to help GC
+     */
+    private void unregisterGauge(){
+        this.statsLogger.unregisterGauge("proxy_status",this.proxyStatusGauge);
+        this.statsLogger.unregisterGauge("moving_avg_rps",this.movingAvgRpsGauge);
+        this.statsLogger.unregisterGauge("moving_avg_bps",this.movingAvgBpsGauge);
+        this.statsLogger.unregisterGauge("acquired",this.streamAcquiredGauge);
+        this.statsLogger.unregisterGauge("cached",this.streamCachedGauge);
+    }
+
     @VisibleForTesting
     Stream newStream(String name) throws IOException {
         return streamManager.getOrCreateStream(name, false);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 2c6cccc..7edb778 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -41,6 +41,8 @@ import com.twitter.finagle.stats.StatsReceiver;
 import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Duration;
 import com.twitter.util.FutureEventListener;
+
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
@@ -107,6 +109,7 @@ public class MonitorService implements NamespaceListener {
     private final StatsReceiver monitorReceiver;
     private final Stat successStat;
     private final Stat failureStat;
+    private final Gauge<Number> numOfStreamsGauge;
     // Hash Function
     private final HashFunction hashFunction = Hashing.md5();
 
@@ -254,6 +257,17 @@ public class MonitorService implements NamespaceListener {
         this.successStat = monitorReceiver.stat0("success");
         this.failureStat = monitorReceiver.stat0("failure");
         this.statsProvider = statsProvider;
+        this.numOfStreamsGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return knownStreams.size();
+            }
+        };
     }
 
     public void runServer() throws IllegalArgumentException, IOException {
@@ -394,17 +408,7 @@ public class MonitorService implements NamespaceListener {
 
     void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException {
         // stats
-        statsProvider.getStatsLogger("monitor").registerGauge("num_streams", new org.apache.bookkeeper.stats.Gauge<Number>()
{
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return knownStreams.size();
-            }
-        });
+        statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
         logger.info("Construct dl namespace @ {}", dlUri);
         dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
                 .conf(conf)
@@ -442,6 +446,8 @@ public class MonitorService implements NamespaceListener {
             logger.error("Interrupted on waiting shutting down monitor executor service :
", e);
         }
         if (null != statsProvider) {
+            // clean up the gauges
+            unregisterGauge();
             statsProvider.stop();
         }
         keepAliveLatch.countDown();
@@ -452,4 +458,11 @@ public class MonitorService implements NamespaceListener {
         keepAliveLatch.await();
     }
 
+    /**
+     * clean up the gauge before we close to help GC
+     */
+    private void unregisterGauge(){
+        statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c6ea1755/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index e74ebbe..9f049c8 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -150,6 +150,7 @@ public class StreamImpl implements Stream {
     private final StatsLogger exceptionStatLogger;
     private final ConcurrentHashMap<String, Counter> exceptionCounters =
         new ConcurrentHashMap<String, Counter>();
+    private final Gauge<Number> streamStatusGauge;
 
     // Since we may create and discard streams at initialization if there's a race,
     // must not do any expensive initialization here (particularly any locking or
@@ -206,6 +207,17 @@ public class StreamImpl implements Stream {
         this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
         this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close");
         this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts");
+        // Gauges
+        this.streamStatusGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return StreamStatus.UNINITIALIZED.getCode();
+            }
+            @Override
+            public Number getSample() {
+                return status.getCode();
+            }
+        };
     }
 
     @Override
@@ -242,16 +254,7 @@ public class StreamImpl implements Stream {
 
         // Better to avoid registering the gauge multiple times, so do this in init
         // which only gets called once.
-        streamLogger.registerGauge("stream_status", new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return StreamStatus.UNINITIALIZED.getCode();
-            }
-            @Override
-            public Number getSample() {
-                return status.getCode();
-            }
-        });
+        streamLogger.registerGauge("stream_status", this.streamStatusGauge);
 
         // Signal initialization is complete, should be last in this method.
         status = StreamStatus.INITIALIZING;
@@ -761,6 +764,7 @@ public class StreamImpl implements Stream {
         // after the async writer is closed. so we could clear up the lock before redirect
         // them.
         close(abort);
+        unregisterGauge();
         if (uncache) {
             final long probationTimeoutMs;
             if (null != owner) {
@@ -827,6 +831,7 @@ public class StreamImpl implements Stream {
         } else {
             closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs);
         }
+
         FutureUtils.stats(
                 closeWriterFuture,
                 writerCloseStatLogger,
@@ -869,6 +874,13 @@ public class StreamImpl implements Stream {
         logger.info("Closed stream {}.", name);
     }
 
+    /**
+     * clean up the gauge to help GC
+     */
+    private void unregisterGauge(){
+        streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
+    }
+
     // Test-only apis
 
     @VisibleForTesting



Mime
View raw message