hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1410117 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
Date Fri, 16 Nov 2012 00:02:07 GMT
Author: liyin
Date: Fri Nov 16 00:02:06 2012
New Revision: 1410117

URL: http://svn.apache.org/viewvc?rev=1410117&view=rev
Log:
[HBASE-7169] Adding average/max latency counters to HTableMultiplexer

Author: vinodv

Summary:
My first HBase diff! Didn't really open a JIRA or anything, just want to
verify that the approach looks ok. I may have reinvented the wheel a
little with my simple moving window average counter, but I looked around
in hadoop.metrics and hbase.metrics, but didn't find anything that was
easy to expose to clients. It was simpler for me to reuse the
boilerplate that Liyin wrote for exporting stats to clients.

Test Plan:
Tested on ODSS in my devserver. Checked that the latency counters look
reasonable both in fb303 and by adding some logging. Also ensured the
unittest ran correctly.

Reviewers: liyintang, cgthayer

Reviewed By: liyintang

Differential Revision: https://phabricator.fb.com/D525688

Task ID: 1183034

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1410117&r1=1410116&r2=1410117&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
Fri Nov 16 00:02:06 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -235,14 +236,22 @@ public class HTableMultiplexer {
   public static class HTableMultiplexerStatus {
     private long totalFailedPutCounter;
     private long totalBufferedPutCounter;
+    private long maxLatency;
+    private long overallAverageLatency;
     private Map<String, Long> serverToFailedCounterMap;
     private Map<String, Long> serverToBufferedCounterMap;
+    private Map<String, Long> serverToAverageLatencyMap;
+    private Map<String, Long> serverToMaxLatencyMap;
 
     public HTableMultiplexerStatus(Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap)
{
       this.totalBufferedPutCounter = 0;
       this.totalFailedPutCounter = 0;
+      this.maxLatency = 0;
+      this.overallAverageLatency = 0;
       this.serverToBufferedCounterMap = new HashMap<String, Long>();
       this.serverToFailedCounterMap = new HashMap<String, Long>();
+      this.serverToAverageLatencyMap = new HashMap<String, Long>();
+      this.serverToMaxLatencyMap = new HashMap<String, Long>();
       this.initialize(serverToFlushWorkerMap);
     }
 
@@ -251,6 +260,8 @@ public class HTableMultiplexer {
         return;
       }
 
+      long averageCalcSum = 0;
+      int averageCalcCount = 0;
       for (Map.Entry<HServerAddress, HTableFlushWorker> entry : serverToFlushWorkerMap
           .entrySet()) {
         HServerAddress addr = entry.getKey();
@@ -258,15 +269,32 @@ public class HTableMultiplexer {
 
         long bufferedCounter = worker.getTotalBufferedCount();
         long failedCounter = worker.getTotalFailedCount();
+        long serverMaxLatency = worker.getMaxLatency();
+        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
+        // Get sum and count pieces separately to compute overall average
+        SimpleEntry<Long, Integer> averageComponents =
+          averageCounter.getComponents();
+        long serverAvgLatency = averageCounter.getAndReset();
 
         this.totalBufferedPutCounter += bufferedCounter;
         this.totalFailedPutCounter += failedCounter;
+        if (serverMaxLatency > this.maxLatency) {
+          this.maxLatency = serverMaxLatency;
+        }
+        averageCalcSum += averageComponents.getKey();
+        averageCalcCount += averageComponents.getValue();
 
         this.serverToBufferedCounterMap.put(addr.getHostNameWithPort(),
             bufferedCounter);
         this.serverToFailedCounterMap.put(addr.getHostNameWithPort(),
             failedCounter);
+        this.serverToAverageLatencyMap.put(addr.getHostNameWithPort(),
+            serverAvgLatency);
+        this.serverToMaxLatencyMap.put(addr.getHostNameWithPort(),
+            serverMaxLatency);
       }
+      this.overallAverageLatency = averageCalcCount != 0 ?
+        averageCalcSum / averageCalcCount : 0;
     }
 
     public long getTotalBufferedCounter() {
@@ -277,6 +305,14 @@ public class HTableMultiplexer {
       return this.totalFailedPutCounter;
     }
 
+    public long getMaxLatency() {
+      return this.maxLatency;
+    }
+
+    public long getOverallAverageLatency() {
+      return this.overallAverageLatency;
+    }
+
     public Map<String, Long> getBufferedCounterForEachRegionServer() {
       return this.serverToBufferedCounterMap;
     }
@@ -284,6 +320,14 @@ public class HTableMultiplexer {
     public Map<String, Long> getFailedCounterForEachRegionServer() {
       return this.serverToFailedCounterMap;
     }
+
+    public Map<String, Long> getMaxLatencyForEachRegionServer() {
+      return this.serverToMaxLatencyMap;
+    }
+
+    public Map<String, Long> getAverageLatencyForEachRegionServer() {
+      return this.serverToAverageLatencyMap;
+    }
   }
   
   private static class PutStatus {
@@ -313,6 +357,46 @@ public class HTableMultiplexer {
     }
   }
 
+  /**
+   * Helper to count the average over an interval until reset.
+   */
+  public static class AtomicAverageCounter {
+    private long sum;
+    private int count;
+
+    public AtomicAverageCounter() {
+      this.sum = 0L;
+      this.count = 0;
+    }
+
+    public synchronized long getAndReset() {
+      long result = this.get();
+      this.reset();
+      return result;
+    }
+
+    public synchronized long get() {
+      if (this.count == 0) {
+        return 0;
+      }
+      return this.sum / this.count;
+    }
+
+    public synchronized SimpleEntry<Long, Integer> getComponents() {
+      return new SimpleEntry<Long, Integer>(sum, count);
+    }
+
+    public synchronized void reset() {
+      this.sum = 0l;
+      this.count = 0;
+    }
+
+    public synchronized void add(long value) {
+      this.sum += value;
+      this.count++;
+    }
+  }
+
   private static class HTableFlushWorker implements Runnable {
     private HServerAddress addr;
     private Configuration conf;
@@ -321,7 +405,9 @@ public class HTableMultiplexer {
     private HTableMultiplexer htableMultiplexer;
     private AtomicLong totalFailedPutCount;
     private AtomicInteger currentProcessingPutCount;
-    
+    private AtomicAverageCounter averageLatency;
+    private AtomicLong maxLatency;
+
     public HTableFlushWorker(Configuration conf, HServerAddress addr,
         HConnection connection, HTableMultiplexer htableMultiplexer,
         LinkedBlockingQueue<PutStatus> queue) {
@@ -332,6 +418,8 @@ public class HTableMultiplexer {
       this.queue = queue;
       this.totalFailedPutCount = new AtomicLong(0);
       this.currentProcessingPutCount = new AtomicInteger(0);
+      this.averageLatency = new AtomicAverageCounter();
+      this.maxLatency = new AtomicLong(0);
     }
 
     public long getTotalFailedCount() {
@@ -342,6 +430,14 @@ public class HTableMultiplexer {
       return queue.size() + currentProcessingPutCount.get();
     }
 
+    public AtomicAverageCounter getAverageLatencyCounter() {
+      return this.averageLatency;
+    }
+
+    public long getMaxLatency() {
+      return this.maxLatency.getAndSet(0);
+    }
+
     private boolean resubmitFailedPut(PutStatus failedPutStatus, HServerAddress oldLoc) throws
IOException{
       Put failedPut = failedPutStatus.getPut();
       // The currentPut is failed. So get the table name for the currentPut.
@@ -378,7 +474,7 @@ public class HTableMultiplexer {
       int failedCount = 0;
       while (true) {
         try {
-          start = System.currentTimeMillis();
+          start = elapsed = System.currentTimeMillis();
 
           // Clear the processingList, putToStatusMap and failedCount
           processingList.clear();
@@ -439,15 +535,27 @@ public class HTableMultiplexer {
             
             // Reset the current processing put count
             currentProcessingPutCount.set(0);
-            
+
+            elapsed = System.currentTimeMillis() - start;
+            // Update latency counters
+            averageLatency.add(elapsed);
+            if (elapsed > maxLatency.get()) {
+              maxLatency.set(elapsed);
+            }
+
             // Log some basic info
-            LOG.debug("Processed " + currentProcessingPutCount
-                + " put requests for " + addr.getHostNameWithPort()
-                + " and " + failedCount + " failed");
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Processed " + currentProcessingPutCount
+                  + " put requests for " + addr.getHostNameWithPort()
+                  + " and " + failedCount + " failed"
+                  + ", latency for this send: " + elapsed);
+            }
           }
 
           // Sleep for a while
-          elapsed = System.currentTimeMillis() - start;
+          if (elapsed == start) {
+            elapsed = System.currentTimeMillis() - start;
+          }
           if (elapsed < frequency) {
             Thread.sleep(frequency - elapsed);
           }



Mime
View raw message