hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1241974 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/thrift/ main/java/org/apache/hadoop/hbase/thrift2/ test/java/org/apache/hadoop/hbase/thrift/ test/java/org/apache/hadoop/hbase/thrift2/
Date Wed, 08 Feb 2012 17:03:11 GMT
Author: tedyu
Date: Wed Feb  8 17:03:11 2012
New Revision: 1241974

URL: http://svn.apache.org/viewvc?rev=1241974&view=rev
Log:
HBASE-5298 Add thrift metrics to thrift2

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
Wed Feb  8 17:03:11 2012
@@ -157,13 +157,6 @@ public class TBoundedThreadPoolServer ex
     serverOptions = options;
   }
 
-  /**
-   * Return the server working queue
-   */
-  public CallQueue getCallQueue() {
-    return this.callQueue;
-  }
-
   public void serve() {
     try {
       serverTransport_.listen();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java Wed Feb  8
17:03:11 2012
@@ -61,7 +61,7 @@ public class ThriftMetrics implements Up
   private MetricsTimeVaryingRate slowThriftCall =
       new MetricsTimeVaryingRate("slowThriftCall", registry);
 
-  public ThriftMetrics(int port, Configuration conf) {
+  public ThriftMetrics(int port, Configuration conf, Class<?> iface) {
     slowResponseTime = conf.getLong(
         SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
     context = MetricsUtil.getContext(CONTEXT_NAME);
@@ -73,7 +73,7 @@ public class ThriftMetrics implements Up
 
     context.registerUpdater(this);
 
-    createMetricsForMethods(Hbase.Iface.class);
+    createMetricsForMethods(iface);
   }
 
   public void incTimeInQueue(long time) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Wed Feb
 8 17:03:11 2012
@@ -123,7 +123,6 @@ public class ThriftServerRunner implemen
   volatile TServer tserver;
   private final Hbase.Iface handler;
   private final ThriftMetrics metrics;
-  private CallQueue callQueue;
 
   /** An enum of server implementation selections */
   enum ImplType {
@@ -227,7 +226,7 @@ public class ThriftServerRunner implemen
   public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
     this.conf = HBaseConfiguration.create(conf);
     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
-    this.metrics = new ThriftMetrics(listenPort, conf);
+    this.metrics = new ThriftMetrics(listenPort, conf, Hbase.Iface.class);
     this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
 
   }
@@ -305,9 +304,10 @@ public class ThriftServerRunner implemen
         tserver = new TNonblockingServer(serverArgs);
       } else if (implType == ImplType.HS_HA) {
         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
-        this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
+        CallQueue callQueue =
+            new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
         ExecutorService executorService = createExecutor(
-            this.callQueue, serverArgs.getWorkerThreads());
+            callQueue, serverArgs.getWorkerThreads());
         serverArgs.executorService(executorService)
                   .processor(processor)
                   .transportFactory(transportFactory)
@@ -316,9 +316,10 @@ public class ThriftServerRunner implemen
       } else { // THREADED_SELECTOR
         TThreadedSelectorServer.Args serverArgs =
             new HThreadedSelectorServerArgs(serverTransport, conf);
-        this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
+        CallQueue callQueue =
+            new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
         ExecutorService executorService = createExecutor(
-            this.callQueue, serverArgs.getWorkerThreads());
+            callQueue, serverArgs.getWorkerThreads());
         serverArgs.executorService(executorService)
                   .processor(processor)
                   .transportFactory(transportFactory)
@@ -344,7 +345,6 @@ public class ThriftServerRunner implemen
           + "; " + serverArgs);
       TBoundedThreadPoolServer tserver =
           new TBoundedThreadPoolServer(serverArgs, metrics);
-      this.callQueue = tserver.getCallQueue();
       this.tserver = tserver;
     } else {
       throw new AssertionError("Unsupported Thrift server implementation: " +

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
Wed Feb  8 17:03:11 2012
@@ -19,7 +19,28 @@
  */
 package org.apache.hadoop.hbase.thrift2;
 
-import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromHBase;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
+import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +49,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.thrift.ThriftMetrics;
 import org.apache.hadoop.hbase.thrift2.generated.TDelete;
 import org.apache.hadoop.hbase.thrift2.generated.TGet;
 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
@@ -39,13 +61,6 @@ import org.apache.hadoop.hbase.thrift2.g
 import org.apache.hadoop.hbase.thrift2.generated.TScan;
 import org.apache.thrift.TException;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
defined in the
  * HTableInterface.
@@ -61,7 +76,49 @@ public class ThriftHBaseServiceHandler i
   private final AtomicInteger nextScannerId = new AtomicInteger(0);
   private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<Integer,
ResultScanner>();
 
-  public ThriftHBaseServiceHandler(Configuration conf) {
+  public static THBaseService.Iface newInstance(
+      Configuration conf, ThriftMetrics metrics) {
+    THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf);
+    return (THBaseService.Iface) Proxy.newProxyInstance(
+        handler.getClass().getClassLoader(),
+        handler.getClass().getInterfaces(),
+        new THBaseServiceMetricsProxy(handler, metrics));
+  }
+
+  private static class THBaseServiceMetricsProxy implements InvocationHandler {
+    private final THBaseService.Iface handler;
+    private final ThriftMetrics metrics;
+
+    private THBaseServiceMetricsProxy(
+        THBaseService.Iface handler, ThriftMetrics metrics) {
+      this.handler = handler;
+      this.metrics = metrics;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method m, Object[] args)
+        throws Throwable {
+      Object result;
+      try {
+        long start = now();
+        result = m.invoke(handler, args);
+        int processTime = (int)(now() - start);
+        metrics.incMethodTime(m.getName(), processTime);
+      } catch (InvocationTargetException e) {
+        throw e.getTargetException();
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "unexpected invocation exception: " + e.getMessage());
+      }
+      return result;
+    }
+  }
+    
+  private static long now() {
+    return System.nanoTime();
+  }
+
+  ThriftHBaseServiceHandler(Configuration conf) {
     htablePool = new HTablePool(conf, Integer.MAX_VALUE);
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java Wed Feb  8
17:03:11 2012
@@ -19,6 +19,15 @@
  */
 package org.apache.hadoop.hbase.thrift2;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -29,7 +38,11 @@ import org.apache.commons.cli.ParseExcep
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.thrift.CallQueue;
+import org.apache.hadoop.hbase.thrift.CallQueue.Call;
+import org.apache.hadoop.hbase.thrift.ThriftMetrics;
 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -46,19 +59,16 @@ import org.apache.thrift.transport.TServ
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.List;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * ThriftServer - this class starts up a Thrift server which implements the HBase API specified
in the
  * HbaseClient.thrift IDL file.
  */
 public class ThriftServer {
-  private static final Log log = LogFactory.getLog("ThriftServer");
+  private static final Log log = LogFactory.getLog(ThriftServer.class);
 
-  private static final String DEFAULT_LISTEN_PORT = "9090";
+  public static final String DEFAULT_LISTEN_PORT = "9090";
 
   public ThriftServer() {
   }
@@ -141,17 +151,33 @@ public class ThriftServer {
     return new TNonblockingServer(serverArgs);
   }
 
-  private static TServer getTHsHaServer(TProtocolFactory protocolFactory, THBaseService.Processor
processor,
-      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException
{
+  private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
+      THBaseService.Processor processor, TTransportFactory transportFactory,
+      InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
+      throws TTransportException {
     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+    ExecutorService executorService = createExecutor(
+        serverArgs.getWorkerThreads(), metrics);
+    serverArgs.executorService(executorService);
     serverArgs.processor(processor);
     serverArgs.transportFactory(transportFactory);
     serverArgs.protocolFactory(protocolFactory);
     return new THsHaServer(serverArgs);
   }
 
+  private static ExecutorService createExecutor(
+      int workerThreads, ThriftMetrics metrics) {
+    CallQueue callQueue = new CallQueue(
+        new LinkedBlockingQueue<Call>(), metrics);
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setDaemon(true);
+    tfb.setNameFormat("thrift2-worker-%d");
+    return new ThreadPoolExecutor(workerThreads, workerThreads,
+            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
+  }
+
   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor
processor,
       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException
{
     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
@@ -195,10 +221,14 @@ public class ThriftServer {
       boolean nonblocking = cmd.hasOption("nonblocking");
       boolean hsha = cmd.hasOption("hsha");
 
+      Configuration conf = HBaseConfiguration.create();
+      ThriftMetrics metrics = new ThriftMetrics(
+          listenPort, conf, THBaseService.Iface.class);
+
       // Construct correct ProtocolFactory
       TProtocolFactory protocolFactory = getTProtocolFactory(cmd.hasOption("compact"));
-      THBaseService.Iface handler = new ThriftHBaseServiceHandler(
-          HBaseConfiguration.create());
+      THBaseService.Iface handler =
+          ThriftHBaseServiceHandler.newInstance(conf, metrics);
       THBaseService.Processor processor = new THBaseService.Processor(handler);
 
       boolean framed = cmd.hasOption("framed") || nonblocking || hsha;
@@ -217,7 +247,7 @@ public class ThriftServer {
       if (nonblocking) {
         server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
       } else if (hsha) {
-        server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress);
+        server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress,
metrics);
       } else {
         server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
       }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java Wed Feb  8
17:03:11 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
 import org.apache.hadoop.metrics.ContextFactory;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -106,7 +107,8 @@ public class TestCallQueue {
   private static ThriftMetrics createMetrics() throws Exception {
     setupMetricsContext();
     Configuration conf = UTIL.getConfiguration();
-    return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
+    return new ThriftMetrics(
+        ThriftServerRunner.DEFAULT_LISTEN_PORT, conf, Hbase.Iface.class);
   }
 
   private static void setupMetricsContext() throws Exception {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java Wed Feb
 8 17:03:11 2012
@@ -141,7 +141,7 @@ public class TestThriftServer {
 
   private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
     setupMetricsContext();
-    return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
+    return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf, Hbase.Iface.class);
   }
 
   private static void setupMetricsContext() throws IOException {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java?rev=1241974&r1=1241973&r2=1241974&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
Wed Feb  8 17:03:11 2012
@@ -19,6 +19,13 @@
  */
 package org.apache.hadoop.hbase.thrift2;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,15 +33,22 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.thrift.ThriftMetrics;
 import org.apache.hadoop.hbase.thrift2.generated.TColumn;
 import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement;
 import org.apache.hadoop.hbase.thrift2.generated.TColumnValue;
 import org.apache.hadoop.hbase.thrift2.generated.TDelete;
 import org.apache.hadoop.hbase.thrift2.generated.TDeleteType;
 import org.apache.hadoop.hbase.thrift2.generated.TGet;
+import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
 import org.apache.hadoop.hbase.thrift2.generated.TIOError;
 import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
 import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
@@ -42,6 +56,11 @@ import org.apache.hadoop.hbase.thrift2.g
 import org.apache.hadoop.hbase.thrift2.generated.TResult;
 import org.apache.hadoop.hbase.thrift2.generated.TScan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
 import org.apache.thrift.TException;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -49,13 +68,13 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.*;
-
 /**
  * Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2
package.
  */
 @Category(MediumTests.class)
 public class TestThriftHBaseServiceHandler {
+
+  public static final Log LOG = LogFactory.getLog(TestThriftHBaseServiceHandler.class);
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   // Static names for tables, columns, rows, and values
@@ -513,6 +532,77 @@ public class TestThriftHBaseServiceHandl
     }
   }
 
+  @Test
+  public void testMetrics() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    ThriftMetrics metrics = getMetrics(conf);
+    THBaseService.Iface handler =
+        ThriftHBaseServiceHandler.newInstance(conf, metrics);
+    byte[] rowName = "testMetrics".getBytes();
+    ByteBuffer table = ByteBuffer.wrap(tableAname);
+
+    TGet get = new TGet(ByteBuffer.wrap(rowName));
+    assertFalse(handler.exists(table, get));
+
+    List<TColumnValue> columnValues = new ArrayList<TColumnValue>();
+    columnValues.add(new TColumnValue(ByteBuffer.wrap(familyAname),
+                                      ByteBuffer.wrap(qualifierAname),
+                                      ByteBuffer.wrap(valueAname)));
+    columnValues.add(new TColumnValue(ByteBuffer.wrap(familyBname),
+                                      ByteBuffer.wrap(qualifierBname),
+                                      ByteBuffer.wrap(valueBname)));
+    TPut put = new TPut(ByteBuffer.wrap(rowName), columnValues);
+    put.setColumnValues(columnValues);
+
+    handler.put(table, put);
+
+    assertTrue(handler.exists(table, get));
+    logMetrics(metrics);
+    verifyMetrics(metrics, "put_num_ops", 1);
+    verifyMetrics(metrics, "exists_num_ops", 2);
+  }
+ 
+  private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
+    setupMetricsContext();
+    return new ThriftMetrics(Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT),
+        conf, THBaseService.Iface.class);
+  }
+ 
+  private static void setupMetricsContext() throws IOException {
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
+        NoEmitMetricsContext.class.getName());
+    MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
+               .createRecord(ThriftMetrics.CONTEXT_NAME).remove();
+  }
+ 
+  private static void logMetrics(ThriftMetrics metrics) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      return;
+    }
+    MetricsContext context = MetricsUtil.getContext( 
+        ThriftMetrics.CONTEXT_NAME); 
+    metrics.doUpdates(context); 
+    for (String key : context.getAllRecords().keySet()) {
+      for (OutputRecord record : context.getAllRecords().get(key)) {
+        for (String name : record.getMetricNames()) {
+          LOG.debug("metrics:" + name + " value:" +
+              record.getMetric(name).intValue());
+        }
+      }
+    }
+  }
+
+  private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
+      throws Exception { 
+    MetricsContext context = MetricsUtil.getContext( 
+        ThriftMetrics.CONTEXT_NAME); 
+    metrics.doUpdates(context); 
+    OutputRecord record = context.getAllRecords().get( 
+        ThriftMetrics.CONTEXT_NAME).iterator().next(); 
+    assertEquals(expectValue, record.getMetric(name).intValue()); 
+  }
+
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();



Mime
View raw message