hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [3/3] hbase git commit: HBASE-16448 Custom metrics for custom replication endpoints
Date Wed, 24 Aug 2016 01:02:09 GMT
HBASE-16448 Custom metrics for custom replication endpoints

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aac4e095
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aac4e095
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aac4e095

Branch: refs/heads/0.98
Commit: aac4e09514b8b7dcf9d253d5722d0fe813973b99
Parents: ce58f58
Author: Geoffrey <gjacoby@salesforce.com>
Authored: Tue Aug 23 16:49:24 2016 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Tue Aug 23 17:18:00 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         |  4 +-
 .../MetricsReplicationGlobalSourceSource.java   | 64 ++++++++++++++-
 .../MetricsReplicationSourceSourceImpl.java     | 81 ++++++++++++++++---
 .../replication/regionserver/MetricsSource.java | 85 +++++++++++++++++++-
 .../replication/TestReplicationEndpoint.java    | 76 ++++++++++++++++-
 5 files changed, 293 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 8611e15..ea0ae20 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index b3e1766..da1bcf4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource
{
-
+  private final MetricsReplicationSource rms;
   private final MutableGaugeLong ageOfLastShippedOpGauge;
   private final MutableGaugeLong sizeOfLogQueueGauge;
   private final MutableCounterLong logReadInEditsCounter;
@@ -35,7 +35,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableCounterLong logReadInBytesCounter;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
-
+    this.rms = rms;
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP,
0L);
 
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_LOG_QUEUE,
0L);
@@ -118,4 +118,64 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(name, value);
+  }
+
+  @Override
+  public void updateQuantile(String name, long value) {
+    rms.updateQuantile(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index d6a9128..55c9b05 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -31,6 +31,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  private String keyPrefix;
+
   @Deprecated
   private final String shippedKBsKey;
   private final String shippedBytesKey;
@@ -49,32 +51,33 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id)
{
     this.rms = rms;
     this.id = id;
+    this.keyPrefix = "source." + this.id + ".";
 
-    ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(ageOfLastShippedOpKey,
0L);
 
-    sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+    sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfLogQueueKey, 0L);
 
-    shippedBatchesKey = "source." + this.id + ".shippedBatches";
+    shippedBatchesKey = this.keyPrefix + "shippedBatches";
     shippedBatchesCounter = rms.getMetricsRegistry().getLongCounter(shippedBatchesKey, 0L);
 
-    shippedOpsKey = "source." + this.id + ".shippedOps";
+    shippedOpsKey = this.keyPrefix + "shippedOps";
     shippedOpsCounter = rms.getMetricsRegistry().getLongCounter(shippedOpsKey, 0L);
 
-    shippedKBsKey = "source." + this.id + ".shippedKBs";
+    shippedKBsKey = this.keyPrefix + "shippedKBs";
     shippedKBsCounter = rms.getMetricsRegistry().getLongCounter(shippedKBsKey, 0L);
 
-    shippedBytesKey = "source." + this.id + ".shippedBytes";
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
     shippedBytesCounter = rms.getMetricsRegistry().getLongCounter(shippedBytesKey, 0L);
 
-    logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+    logReadInBytesKey = this.keyPrefix + "logReadInBytes";
     logReadInBytesCounter = rms.getMetricsRegistry().getLongCounter(logReadInBytesKey, 0L);
 
-    logReadInEditsKey = "source." + id + ".logEditsRead";
+    logReadInEditsKey = this.keyPrefix + "logEditsRead";
     logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(logReadInEditsKey, 0L);
 
-    logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+    logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey,
0L);
   }
 
@@ -139,4 +142,64 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public void updateQuantile(String name, long value) {
+    rms.updateQuantile(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 113bc5b..5824d83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * through the metrics interfaces.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
 
   public static final Log LOG = LogFactory.getLog(MetricsSource.class);
 
@@ -74,6 +75,19 @@ public class MetricsSource {
   }
 
   /**
+   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+   * @param id Name of the source this class is monitoring
+   * @param singleSourceSource Class to monitor id-scoped metrics
+   * @param globalSourceSource Class to monitor global-scoped metrics
+   */
+  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+                       MetricsReplicationSourceSource globalSourceSource) {
+    this.id = id;
+    this.singleSourceSource = singleSourceSource;
+    this.globalSourceSource = globalSourceSource;
+  }
+
+  /**
    * Set the age of the last edit that was shipped
    *
    * @param timestamp write time of the edit
@@ -196,4 +210,73 @@ public class MetricsSource {
   public String getPeerID() {
     return id;
   }
+  
+  @Override
+  public void init() {
+    singleSourceSource.init();
+    globalSourceSource.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    singleSourceSource.setGauge(gaugeName, value);
+    globalSourceSource.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    singleSourceSource.incGauge(gaugeName, delta);
+    globalSourceSource.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    singleSourceSource.decGauge(gaugeName, delta);
+    globalSourceSource.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    singleSourceSource.removeMetric(key);
+    globalSourceSource.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    singleSourceSource.incCounters(counterName, delta);
+    globalSourceSource.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    singleSourceSource.updateHistogram(name, value);
+    globalSourceSource.updateHistogram(name, value);
+  }
+
+  @Override
+  public void updateQuantile(String name, long value) {
+    singleSourceSource.updateQuantile(name, value);
+    globalSourceSource.updateQuantile(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return globalSourceSource.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return globalSourceSource.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return globalSourceSource.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return globalSourceSource.getMetricsName();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index ae4c8e3..7d7df5c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.regionserver.*;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -44,6 +47,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests ReplicationSource and ReplicationEndpoint interactions
  */
@@ -82,8 +89,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -123,8 +130,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
     final String id = "testReplicationEndpointReturnsFalseOnReplicate";
     admin.addPeer(id,
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()),
null);
     // now replicate some data.
     doPut(row);
 
@@ -141,6 +148,66 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
   }
 
+  @Test
+  public void testMetricsSourceBaseSourcePassthrough(){
+    /*
+    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource,
which
+    allows for custom JMX metrics.
+    This test checks to make sure the BaseSource decorator logic on MetricsSource actually
calls down through
+    the two layers of wrapping to the actual BaseSource.
+    */
+    String id = "id";
+    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms,
id);
+    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+    String gaugeName = "gauge";
+    String singleGaugeName = "source.id." + gaugeName;
+    long delta = 1;
+    String counterName = "counter";
+    String singleCounterName = "source.id." + counterName;
+    long count = 2;
+    source.decGauge(gaugeName, delta);
+    source.getMetricsContext();
+    source.getMetricsDescription();
+    source.getMetricsJmxContext();
+    source.getMetricsName();
+    source.incCounters(counterName, count);
+    source.incGauge(gaugeName, delta);
+    source.init();
+    source.removeMetric(gaugeName);
+    source.setGauge(gaugeName, delta);
+    source.updateHistogram(counterName, count);
+    source.updateQuantile(counterName, count);
+
+    verify(singleRms).decGauge(singleGaugeName, delta);
+    verify(globalRms).decGauge(gaugeName, delta);
+    verify(globalRms).getMetricsContext();
+    verify(globalRms).getMetricsJmxContext();
+    verify(globalRms).getMetricsName();
+    verify(singleRms).incCounters(singleCounterName, count);
+    verify(globalRms).incCounters(counterName, count);
+    verify(singleRms).incGauge(singleGaugeName, delta);
+    verify(globalRms).incGauge(gaugeName, delta);
+    verify(globalRms).init();
+    verify(singleRms).removeMetric(singleGaugeName);
+    verify(globalRms).removeMetric(gaugeName);
+    verify(singleRms).setGauge(singleGaugeName, delta);
+    verify(globalRms).setGauge(gaugeName, delta);
+    verify(singleRms).updateHistogram(singleCounterName, count);
+    verify(globalRms).updateHistogram(counterName, count);
+    verify(singleRms).updateQuantile(singleCounterName, count);
+    verify(globalRms).updateQuantile(counterName, count);
+
+  }
+
   @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
@@ -253,6 +320,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
       return true;
     }
 
+
     @Override
     public WALEntryFilter getWALEntryfilter() {
       return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {


Mime
View raw message