incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding more metrics to the shared merge scheduler to monitor the queue depth.
Date Tue, 04 Nov 2014 15:12:40 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master badd38f47 -> 9fc5afbe0


Adding more metrics to the shared merge scheduler to monitor the queue depth.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9fc5afbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9fc5afbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9fc5afbe

Branch: refs/heads/master
Commit: 9fc5afbe0712e377606c25a59576f982f15d3fc7
Parents: badd38f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Nov 4 10:12:36 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Nov 4 10:12:36 2014 -0500

----------------------------------------------------------------------
 .../manager/writer/SharedMergeScheduler.java    | 47 ++++++++++++++++++++
 1 file changed, 47 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9fc5afbe/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
index 8e33b0a..02728e8 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -39,11 +39,16 @@ import org.apache.lucene.index.MergePolicy.OneMerge;
 import org.apache.lucene.index.MergeScheduler;
 
 import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.core.MetricName;
 
 public class SharedMergeScheduler implements Closeable {
 
+  private static final String LARGE_QUEUE_DEPTH_IN_BYTES = "Large Queue Depth In Bytes";
+  private static final String LARGE_QUEUE_DEPTH = "Large Queue Depth";
+  private static final String SMALL_QUEUE_DEPTH_IN_BYTES = "Small Queue Depth In Bytes";
+  private static final String SMALL_QUEUE_DEPTH = "Small Queue Depth";
   private static final Log LOG = LogFactory.getLog(SharedMergeScheduler.class);
   private static final Meter _throughputBytes;
 
@@ -93,6 +98,10 @@ public class SharedMergeScheduler implements Closeable {
     public String getId() {
       return _id;
     }
+
+    public long getSize() {
+      return _size;
+    }
   }
 
   public SharedMergeScheduler(int threads) {
@@ -100,6 +109,11 @@ public class SharedMergeScheduler implements Closeable {
   }
 
   public SharedMergeScheduler(int threads, long smallMergeThreshold) {
+    MetricName mergeSmallQueueDepth = new MetricName(ORG_APACHE_BLUR, LUCENE, SMALL_QUEUE_DEPTH);
+    MetricName mergeSmallQueueDepthInBytes = new MetricName(ORG_APACHE_BLUR, LUCENE, SMALL_QUEUE_DEPTH_IN_BYTES);
+    MetricName mergeLargeQueueDepth = new MetricName(ORG_APACHE_BLUR, LUCENE, LARGE_QUEUE_DEPTH);
+    MetricName mergeLargeQueueDepthInBytes = new MetricName(ORG_APACHE_BLUR, LUCENE, LARGE_QUEUE_DEPTH_IN_BYTES);
+
     _smallMergeThreshold = smallMergeThreshold;
     _smallMergeService = Executors.newThreadPool(SHARED_MERGE_SCHEDULER_PREFIX + "-small",
threads, false);
     _largeMergeService = Executors.newThreadPool(SHARED_MERGE_SCHEDULER_PREFIX + "-large",
threads, false);
@@ -107,6 +121,39 @@ public class SharedMergeScheduler implements Closeable {
       _smallMergeService.submit(getMergerRunnable(_smallMergeQueue));
       _largeMergeService.submit(getMergerRunnable(_largeMergeQueue));
     }
+
+    Metrics.newGauge(mergeSmallQueueDepth, new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return (long) _smallMergeQueue.size();
+      }
+    });
+    Metrics.newGauge(mergeSmallQueueDepthInBytes, new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return getSizeInBytes(_smallMergeQueue);
+      }
+    });
+    Metrics.newGauge(mergeLargeQueueDepth, new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return (long) _largeMergeQueue.size();
+      }
+    });
+    Metrics.newGauge(mergeLargeQueueDepthInBytes, new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return getSizeInBytes(_largeMergeQueue);
+      }
+    });
+  }
+
+  protected long getSizeInBytes(PriorityBlockingQueue<MergeWork> queue) {
+    long total = 0;
+    for (MergeWork mergeWork : queue) {
+      total += mergeWork.getSize();
+    }
+    return total;
   }
 
   private Runnable getMergerRunnable(final PriorityBlockingQueue<MergeWork> queue)
{


Mime
View raw message