hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1245287 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/thrift/ test/java/org/apache/hadoop/hbase/thrift/
Date Fri, 17 Feb 2012 01:56:09 GMT
Author: mbautin
Date: Fri Feb 17 01:56:08 2012
New Revision: 1245287

URL: http://svn.apache.org/viewvc?rev=1245287&view=rev
Log:
[HBASE-5186]Add metrics to ThriftServer

Summary:
Port from Apache JIRA
https://issues.apache.org/jira/browse/Hbase-5186

Test Plan: Past thrift unit tests

Reviewers: kannan, dhruba

Reviewed By: dhruba

CC: hbase@lists, dhruba, davejwatson

Differential Revision: https://phabricator.fb.com/D403352

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java?rev=1245287&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java Fri
Feb 17 01:56:08 2012
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A BlockingQueue reports waiting time in queue and queue length to
+ * ThriftMetrics.
+ */
+public class CallQueue implements BlockingQueue<Runnable> {
+
+  private final BlockingQueue<Call> underlyingQueue;
+  private final ThriftMetrics metrics;
+
+  public CallQueue(BlockingQueue<Call> underlyingQueue,
+                   ThriftMetrics metrics) {
+    this.underlyingQueue = underlyingQueue;
+    this.metrics = metrics;
+  }
+
+  private static long now() {
+    return System.nanoTime();
+  }
+
+  public static class Call implements Runnable {
+    final long startTime;
+    final Runnable underlyingRunnable;
+
+    Call(Runnable underlyingRunnable) {
+      this.underlyingRunnable = underlyingRunnable;
+      this.startTime = now();
+    }
+
+    @Override
+    public void run() {
+      underlyingRunnable.run();
+    }
+
+    public long timeInQueue() {
+      return now() - startTime;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof Call) {
+        Call otherCall = (Call)(other);
+        return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
+      } else if (other instanceof Runnable) {
+        return this.underlyingRunnable.equals(other);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.underlyingRunnable.hashCode();
+    }
+  }
+
+  @Override
+  public Runnable poll() {
+    Call result = underlyingQueue.poll();
+    updateMetrics(result);
+    return result;
+  }
+
+  private void updateMetrics(Call result) {
+    if (result == null) {
+      return;
+    }
+    metrics.incTimeInQueue(result.timeInQueue());
+    metrics.setCallQueueLen(this.size());
+  }
+
+  @Override
+  public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
+    Call result = underlyingQueue.poll(timeout, unit);
+    updateMetrics(result);
+    return result;
+  }
+
+  @Override
+  public Runnable remove() {
+    Call result = underlyingQueue.remove();
+    updateMetrics(result);
+    return result;
+  }
+
+  @Override
+  public Runnable take() throws InterruptedException {
+    Call result = underlyingQueue.take();
+    updateMetrics(result);
+    return result;
+  }
+
+  @Override
+  public int drainTo(Collection<? super Runnable> destination) {
+    return drainTo(destination, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public int drainTo(Collection<? super Runnable> destination,
+                     int maxElements) {
+    if (destination == this) {
+      throw new IllegalArgumentException(
+          "A BlockingQueue cannot drain to itself.");
+    }
+    List<Call> drained = new ArrayList<Call>();
+    underlyingQueue.drainTo(drained, maxElements);
+    for (Call r : drained) {
+      updateMetrics(r);
+    }
+    destination.addAll(drained);
+    return drained.size();
+  }
+
+
+  @Override
+  public boolean offer(Runnable element) {
+    return underlyingQueue.offer(new Call(element));
+  }
+
+  @Override
+  public boolean offer(Runnable element, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return underlyingQueue.offer(new Call(element), timeout, unit);
+  }
+  @Override
+  public void put(Runnable element) throws InterruptedException {
+    underlyingQueue.put(new Call(element));
+  }
+
+  @Override
+  public boolean add(Runnable element) {
+    return underlyingQueue.add(new Call(element));
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends Runnable> elements) {
+    int added = 0;
+    for (Runnable r : elements) {
+      added += underlyingQueue.add(new Call(r)) ? 1 : 0;
+    }
+    return added != 0;
+  }
+
+  @Override
+  public Runnable element() {
+    return underlyingQueue.element();
+  }
+
+  @Override
+  public Runnable peek() {
+    return underlyingQueue.peek();
+  }
+
+  @Override
+  public void clear() {
+    underlyingQueue.clear();
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> elements) {
+    return underlyingQueue.containsAll(elements);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return underlyingQueue.isEmpty();
+  }
+
+  @Override
+  public Iterator<Runnable> iterator() {
+    return new Iterator<Runnable>() {
+      final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
+      @Override
+      public Runnable next() {
+        return underlyingIterator.next();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return underlyingIterator.hasNext();
+      }
+
+      @Override
+      public void remove() {
+        underlyingIterator.remove();
+      }
+    };
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> elements) {
+    return underlyingQueue.removeAll(elements);
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> elements) {
+    return underlyingQueue.retainAll(elements);
+  }
+
+  @Override
+  public int size() {
+    return underlyingQueue.size();
+  }
+
+  @Override
+  public Object[] toArray() {
+    return underlyingQueue.toArray();
+  }
+
+  @Override
+  public <T> T[] toArray(T[] array) {
+    return underlyingQueue.toArray(array);
+  }
+
+  @Override
+  public boolean contains(Object element) {
+    return underlyingQueue.contains(element);
+  }
+
+  @Override
+  public int remainingCapacity() {
+    return underlyingQueue.remainingCapacity();
+  }
+
+  @Override
+  public boolean remove(Object element) {
+    return underlyingQueue.remove(element);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java?rev=1245287&r1=1245286&r2=1245287&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
Fri Feb 17 01:56:08 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.thrift.CallQueue.Call;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
@@ -110,16 +111,18 @@ public class HBaseThreadPoolServer exten
       TServerTransport serverTransport,
       TTransportFactory transportFactory,
       TProtocolFactory protocolFactory,
-      Options options) {
+      Options options,
+      ThriftMetrics metrics) {
     super(new TProcessorFactory(processor), serverTransport, transportFactory,
         transportFactory, protocolFactory, protocolFactory);
 
     BlockingQueue<Runnable> executorQueue;
     if (options.maxQueuedRequests > 0) {
-      executorQueue = new LinkedBlockingQueue<Runnable>(
-          options.maxQueuedRequests);
+      executorQueue = new CallQueue(
+          new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
     } else {
-      executorQueue = new SynchronousQueue<Runnable>();
+      executorQueue = new CallQueue(
+          new SynchronousQueue<Call>(), metrics);
     }
 
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java?rev=1245287&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
(added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
Fri Feb 17 01:56:08 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+
+
+/**
+ * Converts a Hbase.Iface using InvocationHandler so that it reports process
+ * time of each call to ThriftMetrics.
+ */
+public class HbaseHandlerMetricsProxy implements InvocationHandler {
+
+  public static final Log LOG = LogFactory.getLog(
+      HbaseHandlerMetricsProxy.class);
+
+  private final Hbase.Iface handler;
+  private final ThriftMetrics metrics;
+
+  public static Hbase.Iface newInstance(Hbase.Iface handler,
+                                        ThriftMetrics metrics,
+                                        Configuration conf) {
+    return (Hbase.Iface) Proxy.newProxyInstance(
+        handler.getClass().getClassLoader(),
+        handler.getClass().getInterfaces(),
+        new HbaseHandlerMetricsProxy(handler, metrics, conf));
+  }
+
+  private HbaseHandlerMetricsProxy(
+      Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) {
+    this.handler = handler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method m, Object[] args)
+      throws Throwable {
+    Object result;
+    try {
+      long start = now();
+      result = m.invoke(handler, args);
+      int processTime = (int)(now() - start);
+      metrics.incMethodTime(m.getName(), processTime);
+    } catch (InvocationTargetException e) {
+      throw e.getTargetException();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "unexpected invocation exception: " + e.getMessage());
+    }
+    return result;
+  }
+
+  private static long now() {
+    return System.nanoTime();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1245287&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
(added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
Fri Feb 17 01:56:08 2012
@@ -0,0 +1,131 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * This class is for maintaining the various statistics of thrift server
+ * and publishing them through the metrics interfaces.
+ */
+public class ThriftMetrics implements Updater {
+  public final static Log LOG = LogFactory.getLog(ThriftMetrics.class);
+  public final static String CONTEXT_NAME = "thriftserver";
+
+  private final MetricsContext context;
+  private final MetricsRecord metricsRecord;
+  private final MetricsRegistry registry = new MetricsRegistry();
+  private final long slowResponseTime;
+  public static final String SLOW_RESPONSE_NANO_SEC =
+    "hbase.thrift.slow.response.nano.second";
+  public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000;
+
+  private final MetricsIntValue callQueueLen =
+      new MetricsIntValue("callQueueLen", registry);
+  private final MetricsTimeVaryingRate timeInQueue =
+      new MetricsTimeVaryingRate("timeInQueue", registry);
+  private MetricsTimeVaryingRate thriftCall =
+      new MetricsTimeVaryingRate("thriftCall", registry);
+  private MetricsTimeVaryingRate slowThriftCall =
+      new MetricsTimeVaryingRate("slowThriftCall", registry);
+
+  public ThriftMetrics(int port, Configuration conf) {
+    slowResponseTime = conf.getLong(
+        SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
+    context = MetricsUtil.getContext(CONTEXT_NAME);
+    metricsRecord = MetricsUtil.createRecord(context, CONTEXT_NAME);
+
+    metricsRecord.setTag("port", port + "");
+
+    LOG.info("Initializing RPC Metrics with port=" + port);
+
+    context.registerUpdater(this);
+
+    createMetricsForMethods(Hbase.Iface.class);
+  }
+
+  public void incTimeInQueue(long time) {
+    timeInQueue.inc(time);
+  }
+
+  public void setCallQueueLen(int len) {
+    callQueueLen.set(len);
+  }
+
+  public void incMethodTime(String name, int time) {
+    MetricsTimeVaryingRate methodTimeMetrc = getMethodTimeMetrics(name);
+    if (methodTimeMetrc == null) {
+      LOG.warn(
+          "Got incMethodTime() request for method that doesnt exist: " + name);
+      return; // ignore methods that dont exist.
+    }
+
+    // inc method specific processTime
+    methodTimeMetrc.inc(time);
+
+    // inc general processTime
+    thriftCall.inc(time);
+    if (time > slowResponseTime) {
+      slowThriftCall.inc(time);
+    }
+  }
+
+  private void createMetricsForMethods(Class<?> iface) {
+    for (Method m : iface.getDeclaredMethods()) {
+      if (getMethodTimeMetrics(m.getName()) == null)
+        LOG.debug("Creating metrics for method:" + m.getName());
+        createMethodTimeMetrics(m.getName());
+    }
+  }
+
+  private MetricsTimeVaryingRate getMethodTimeMetrics(String key) {
+    return (MetricsTimeVaryingRate) registry.get(key);
+  }
+
+  private MetricsTimeVaryingRate createMethodTimeMetrics(String key) {
+    return new MetricsTimeVaryingRate(key, this.registry);
+  }
+
+  /**
+   * Push the metrics to the monitoring subsystem on doUpdate() call.
+   */
+  public void doUpdates(final MetricsContext context) {
+    // getMetricsList() and pushMetric() are thread safe methods
+    for (MetricsBase m : registry.getMetricsList()) {
+      m.pushMetric(metricsRecord);
+    }
+    metricsRecord.update();
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1245287&r1=1245286&r2=1245287&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
Fri Feb 17 01:56:08 2012
@@ -1049,7 +1049,7 @@ public class ThriftServer {
       System.exit(exitCode);
   }
 
-  private static final String DEFAULT_LISTEN_PORT = "9090";
+  static final String DEFAULT_LISTEN_PORT = "9090";
 
   /*
    * Start up the Thrift server.
@@ -1134,7 +1134,9 @@ public class ThriftServer {
       protocolFactory = new TBinaryProtocol.Factory();
     }
 
-    HBaseHandler handler = new HBaseHandler(conf);
+    ThriftMetrics metrics = new ThriftMetrics(listenPort, conf);
+    Hbase.Iface handler = new HBaseHandler(conf);
+    handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
     Hbase.Processor processor = new Hbase.Processor(handler);
 
     TServer server;
@@ -1188,7 +1190,7 @@ public class ThriftServer {
           + serverOptions.maxQueuedRequests);
 
       server = new HBaseThreadPoolServer(processor, serverTransport,
-          transportFactory, protocolFactory, serverOptions);
+          transportFactory, protocolFactory, serverOptions, metrics);
 
       if (server.getClass() != THREAD_POOL_SERVER_CLASS) {
         // A sanity check that we instantiated the right thing.

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java?rev=1245287&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
Fri Feb 17 01:56:08 2012
@@ -0,0 +1,134 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.thrift.CallQueue.Call;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Unit testing for CallQueue, a part of the
+ * org.apache.hadoop.hbase.thrift package.
+ */
+@RunWith(Parameterized.class)
+public class TestCallQueue {
+
+  public static final Log LOG = LogFactory.getLog(TestCallQueue.class);
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private int elementsAdded;
+  private int elementsRemoved;
+
+  @Parameters
+  public static Collection<Object[]> getParameters() {
+    Collection<Object[]> parameters = new ArrayList<Object[]>();
+    for (int elementsAdded : new int[] {100, 200, 300}) {
+      for (int elementsRemoved : new int[] {0, 20, 100}) {
+        parameters.add(new Object[]{new Integer(elementsAdded),
+                                    new Integer(elementsRemoved)});
+      }
+    }
+    return parameters;
+  }
+
+  public TestCallQueue(int elementsAdded, int elementsRemoved) {
+    this.elementsAdded = elementsAdded;
+    this.elementsRemoved = elementsRemoved;
+    LOG.debug("elementsAdded:" + elementsAdded +
+              " elementsRemoved:" + elementsRemoved);
+  }
+
+  @Test(timeout=3000)
+  public void testPutTake() throws Exception {
+    ThriftMetrics metrics = createMetrics();
+    CallQueue callQueue = new CallQueue(
+        new LinkedBlockingQueue<Call>(), metrics);
+    for (int i = 0; i < elementsAdded; ++i) {
+      callQueue.put(createDummyRunnable());
+    }
+    for (int i = 0; i < elementsRemoved; ++i) {
+      callQueue.take();
+    }
+    verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
+  }
+
+  @Test(timeout=3000)
+  public void testOfferPoll() throws Exception {
+    ThriftMetrics metrics = createMetrics();
+    CallQueue callQueue = new CallQueue(
+        new LinkedBlockingQueue<Call>(), metrics);
+    for (int i = 0; i < elementsAdded; ++i) {
+      callQueue.offer(createDummyRunnable());
+    }
+    for (int i = 0; i < elementsRemoved; ++i) {
+      callQueue.poll();
+    }
+    verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
+  }
+
+  private static ThriftMetrics createMetrics() throws Exception {
+    setupMetricsContext();
+    Configuration conf = UTIL.getConfiguration();
+    return new ThriftMetrics(
+        Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT), conf);
+  }
+
+  private static void setupMetricsContext() throws Exception {
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
+        NoEmitMetricsContext.class.getName());
+    MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
+               .createRecord(ThriftMetrics.CONTEXT_NAME).remove();
+  }
+
+  private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
+      throws Exception {
+    MetricsContext context = MetricsUtil.getContext(
+        ThriftMetrics.CONTEXT_NAME);
+    metrics.doUpdates(context);
+    OutputRecord record = context.getAllRecords().get(
+        ThriftMetrics.CONTEXT_NAME).iterator().next();
+    assertEquals(expectValue, record.getMetric(name).intValue());
+  }
+
+  private static Runnable createDummyRunnable() {
+    return new Runnable() {
+      @Override
+      public void run() {
+      }
+    };
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java?rev=1245287&r1=1245286&r2=1245287&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
Fri Feb 17 01:56:08 2012
@@ -19,16 +19,24 @@
  */
 package org.apache.hadoop.hbase.thrift;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClusterTestCase;
 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
 import org.apache.hadoop.hbase.thrift.generated.Mutation;
 import org.apache.hadoop.hbase.thrift.generated.TCell;
 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
 
 /**
  * Unit testing for ThriftServer.HBaseHandler, a part of the
@@ -59,6 +67,7 @@ public class TestThriftServer extends HB
   public void testAll() throws Exception {
     // Run all tests
     doTestTableCreateDrop();
+    doTestThriftMetrics();
     doTestTableMutations();
     doTestTableTimestampsAndColumns();
     doTestTableScanners();
@@ -97,6 +106,54 @@ public class TestThriftServer extends HB
   }
 
   /**
+   * Tests if the metrics for thrift handler work correctly
+   */
+  public void doTestThriftMetrics() throws Exception {
+    ThriftMetrics metrics = getMetrics(conf);
+    Hbase.Iface handler = getHandler(metrics, conf);
+    handler.createTable(tableAname, getColumnDescriptors());
+    handler.disableTable(tableAname);
+    handler.deleteTable(tableAname);
+    handler.createTable(tableBname, getColumnDescriptors());
+    handler.disableTable(tableBname);
+    handler.deleteTable(tableBname);
+    verifyMetrics(metrics, "createTable_num_ops", 2);
+    verifyMetrics(metrics, "deleteTable_num_ops", 2);
+    verifyMetrics(metrics, "disableTable_num_ops", 2);
+  }
+
+  private static Hbase.Iface getHandler(ThriftMetrics metrics, Configuration conf)
+      throws Exception {
+    Hbase.Iface handler =
+      new ThriftServer.HBaseHandler(conf);
+    return HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
+  }
+
+  private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
+    setupMetricsContext();
+    return new ThriftMetrics(
+        Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT), conf);
+  }
+
+  private static void setupMetricsContext() throws IOException {
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
+        NoEmitMetricsContext.class.getName());
+    MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
+               .createRecord(ThriftMetrics.CONTEXT_NAME).remove();
+  }
+
+  private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
+      throws Exception {
+    MetricsContext context = MetricsUtil.getContext(
+        ThriftMetrics.CONTEXT_NAME);
+    metrics.doUpdates(context);
+    OutputRecord record = context.getAllRecords().get(
+        ThriftMetrics.CONTEXT_NAME).iterator().next();
+    assertEquals(expectValue, record.getMetric(name).intValue());
+  }
+
+  /**
    * Tests adding a series of Mutations and BatchMutations, including a
    * delete mutation.  Also tests data retrieval, and getting back multiple
    * versions.



Mime
View raw message