hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [3/4] hbase git commit: HBASE-14693 Add client-side metrics for received pushback signals
Date Tue, 10 Nov 2015 02:03:47 GMT
HBASE-14693 Add client-side metrics for received pushback signals

Signed-off-by: Andrew Purtell <apurtell@apache.org>

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70964f48
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70964f48
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70964f48

Branch: refs/heads/0.98
Commit: 70964f48a7fbc03bade66d6f075fd78e6dcd8627
Parents: 70b9015
Author: chenheng <chenheng@fenbi.com>
Authored: Thu Nov 5 16:12:41 2015 +0800
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Mon Nov 9 16:10:43 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  18 ++++
 .../hadoop/hbase/client/MetricsConnection.java  | 106 +++++++++++++++++++
 .../hadoop/hbase/client/TestClientPushback.java |  18 +++-
 3 files changed, 141 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/70964f48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 841b757..1143101 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -632,6 +632,9 @@ class AsyncProcess<CResult> {
       final Batch.Callback<CResult> batchCallback) {
     // no stats to manage, just do the standard action
     if (AsyncProcess.this.hConnection.getStatisticsTracker() == null) {
+      if (hConnection.getConnectionMetrics() != null) {
+        hConnection.getConnectionMetrics().incrNormalRunners();
+      }
       List<Runnable> toReturn = new ArrayList<Runnable>(1);
       toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", 
         getNewSingleServerRunnable(initialActions, loc, multiAction, numAttempt,
@@ -663,6 +666,14 @@ class AsyncProcess<CResult> {
           runner.setRunner(runnable);
           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
           runnable = runner;
+          if (hConnection.getConnectionMetrics() != null) {
+            hConnection.getConnectionMetrics().incrDelayRunners();
+            hConnection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
+          }
+        } else {
+          if (hConnection.getConnectionMetrics() != null) {
+            hConnection.getConnectionMetrics().incrNormalRunners();
+          }
         }
         runnable = Trace.wrap(traceText, runnable);
         toReturn.add(runnable);
@@ -877,6 +888,13 @@ class AsyncProcess<CResult> {
             toReplay.add(correspondingAction);
           }
         } else { // success
+
+          if (AsyncProcess.this.hConnection.getConnectionMetrics() != null) {
+            AsyncProcess.this.hConnection.getConnectionMetrics().
+              updateServerStats(location.getServerName(),
+                location.getRegionInfo().getRegionName(), result);
+          }
+
           if (callback != null || batchCallback != null) {
             int index = regionResult.getFirst();
             Action<Row> correspondingAction = initialActions.get(index);

http://git-wip-us.apache.org/repos/asf/hbase/blob/70964f48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index febf03f..fd647e1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -26,12 +26,16 @@ import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.reporting.JmxReporter;
 import com.yammer.metrics.util.RatioGauge;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +58,8 @@ public class MetricsConnection {
   private static final String DRTN_BASE = "rpcCallDurationMs_";
   private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
   private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
+  private static final String MEMLOAD_BASE = "memstoreLoad_";
+  private static final String HEAP_BASE = "heapOccupancy_";
   private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
 
   /** A container class for collecting details about the RPC call as it percolates. */
@@ -130,6 +136,88 @@ public class MetricsConnection {
     }
   }
 
+  protected static class RegionStats {
+    final String name;
+    final Histogram memstoreLoadHist;
+    final Histogram heapOccupancyHist;
+
+    public RegionStats(MetricsRegistry registry, String name) {
+      this.name = name;
+      this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class,
+          MEMLOAD_BASE + this.name);
+      this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class,
+          HEAP_BASE + this.name);
+    }
+
+    public void update(ClientProtos.RegionLoadStats regionStatistics) {
+      this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad());
+      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
+    }
+  }
+
+  @VisibleForTesting
+  protected static class RunnerStats {
+    final Counter normalRunners;
+    final Counter delayRunners;
+    final Histogram delayIntevalHist;
+
+    public RunnerStats(MetricsRegistry registry) {
+      this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount");
+      this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount");
+      this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist");
+    }
+
+    public void incrNormalRunners() {
+      this.normalRunners.inc();
+    }
+
+    public void incrDelayRunners() {
+      this.delayRunners.inc();
+    }
+
+    public void updateDelayInterval(long interval) {
+      this.delayIntevalHist.update(interval);
+    }
+  }
+
+  @VisibleForTesting
+  protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>
serverStats
+          = new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>();
+
+  public void updateServerStats(ServerName serverName, byte[] regionName,
+                                Object r) {
+    if (!(r instanceof Result)) {
+      return;
+    }
+    Result result = (Result) r;
+    ClientProtos.RegionLoadStats stats = result.getStats();
+    if(stats == null){
+      return;
+    }
+    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
+    ConcurrentMap<byte[], RegionStats> rsStats = null;
+    if (serverStats.containsKey(serverName)) {
+      rsStats = serverStats.get(serverName);
+    } else {
+      rsStats = serverStats.putIfAbsent(serverName,
+          new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR));
+      if (rsStats == null) {
+        rsStats = serverStats.get(serverName);
+      }
+    }
+    RegionStats regionStats = null;
+    if (rsStats.containsKey(regionName)) {
+      regionStats = rsStats.get(regionName);
+    } else {
+      regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name));
+      if (regionStats == null) {
+        regionStats = rsStats.get(regionName);
+      }
+    }
+    regionStats.update(stats);
+  }
+
+
   /** A lambda for dispatching to the appropriate metric factory method */
   private static interface NewMetric<T> {
     T newMetric(Class<?> clazz, String name, String scope);
@@ -172,6 +260,7 @@ public class MetricsConnection {
   @VisibleForTesting protected final CallTracker incrementTracker;
   @VisibleForTesting protected final CallTracker putTracker;
   @VisibleForTesting protected final CallTracker multiTracker;
+  @VisibleForTesting protected final RunnerStats runnerStats;
 
   // dynamic metrics
 
@@ -208,6 +297,8 @@ public class MetricsConnection {
     this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
     this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
     this.multiTracker = new CallTracker(this.registry, "Multi", scope);
+    this.runnerStats = new RunnerStats(this.registry);
+
     this.reporter = new JmxReporter(this.registry);
     this.reporter.start();
   }
@@ -233,6 +324,21 @@ public class MetricsConnection {
     metaCacheMisses.inc();
   }
 
+  /** Increment the number of normal runner counts. */
+  public void incrNormalRunners() {
+    this.runnerStats.incrNormalRunners();
+  }
+
+  /** Increment the number of delay runner counts. */
+  public void incrDelayRunners() {
+    this.runnerStats.incrDelayRunners();
+  }
+
+  /** Update delay interval of delay runner. */
+  public void updateDelayInterval(long interval) {
+    this.runnerStats.updateDelayInterval(interval);
+  }
+
   /**
    * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/70964f48/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 0bbe130..2d80fc3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -44,6 +44,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -76,7 +77,7 @@ public class TestClientPushback {
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
     // ensure we block the flushes when we are double that flushsize
     conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
-
+    conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
     UTIL.startMiniCluster(1);
     UTIL.createTable(tableName, family);
   }
@@ -153,6 +154,21 @@ public class TestClientPushback {
     // produces a backoffTime of 151 milliseconds. This is long enough so the
     // wait and related checks below are reasonable. Revisit if the backoff
     // time reported by above debug logging has significantly deviated.
+    String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
+    MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
+            serverStats.get(server).get(regionName);
+    assertEquals(name, rsStats.name);
+    assertEquals(rsStats.heapOccupancyHist.mean(),
+        (double)regionStats.getHeapOccupancyPercent(), 0.1 );
+    assertEquals(rsStats.memstoreLoadHist.mean(),
+        (double)regionStats.getMemstoreLoadPercent(), 0.1);
+
+    MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats;
+
+    assertEquals(runnerStats.delayRunners.count(), 1);
+    assertEquals(runnerStats.normalRunners.count(), 1);
+    assertEquals("", runnerStats.delayIntevalHist.mean(), (double)backoffTime, 0.1);
+
     latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
     assertNotEquals("AsyncProcess did not submit the work in time", endTime.get(), 0);
     assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >=
backoffTime);


Mime
View raw message