hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1206267 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/thrift/ main/java/org/apache/hadoop/hbase/util/ main/resources/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/thrift/ test/java/org/apache/hadoop/hbase...
Date Fri, 25 Nov 2011 17:26:41 GMT
Author: tedyu
Date: Fri Nov 25 17:26:40 2011
New Revision: 1206267

URL: http://svn.apache.org/viewvc?rev=1206267&view=rev
Log:
HBASE-4863 Phabricator D531 Make Thrift server thread pool bounded and add a command-line UI test

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/trunk/src/main/resources/hbase-default.xml
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java?rev=1206267&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java Fri Nov 25 17:26:40 2011
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A bounded thread pool server customized for HBase.
+ */
+public class TBoundedThreadPoolServer extends TServer {
+
+  private static final String QUEUE_FULL_MSG =
+      "Queue is full, closing connection";
+
+  /**
+   * The "core size" of the thread pool. New threads are created on every
+   * connection until this many threads are created.
+   */
+  public static final String MIN_WORKER_THREADS_CONF_KEY =
+      "hbase.thrift.minWorkerThreads";
+
+  /**
+   * This default core pool size should be enough for many test scenarios. We
+   * want to override this with a much larger number (e.g. at least 200) for a
+   * large-scale production setup.
+   */
+  public static final int DEFAULT_MIN_WORKER_THREADS = 16;
+
+  /**
+   * The maximum size of the thread pool. When the pending request queue
+   * overflows, new threads are created until their number reaches this number.
+   * After that, the server starts dropping connections.
+   */
+  public static final String MAX_WORKER_THREADS_CONF_KEY =
+      "hbase.thrift.maxWorkerThreads";
+
+  public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
+
+  /**
+   * The maximum number of pending connections waiting in the queue. If there
+   * are no idle threads in the pool, the server queues requests. Only when
+   * the queue overflows, new threads are added, up to
+   * hbase.thrift.maxQueuedRequests threads.
+   */
+  public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
+      "hbase.thrift.maxQueuedRequests";
+
+  public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
+
+  /**
+   * Default amount of time in seconds to keep a thread alive. Worker threads
+   * are stopped after being idle for this long.
+   */
+  public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
+      "hbase.thrift.threadKeepAliveTimeSec";
+
+  private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
+
+  /**
+   * Time to wait after interrupting all worker threads. This is after a clean
+   * shutdown has been attempted.
+   */
+  public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
+
+  private static final Log LOG = LogFactory.getLog(
+      TBoundedThreadPoolServer.class.getName());
+
+  public static class Args extends TThreadPoolServer.Args {
+    int maxQueuedRequests;
+    int threadKeepAliveTimeSec;
+
+    public Args(TServerTransport transport, Configuration conf) {
+      super(transport);
+      minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
+          DEFAULT_MIN_WORKER_THREADS);
+      maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
+          DEFAULT_MAX_WORKER_THREADS);
+      maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
+          DEFAULT_MAX_QUEUED_REQUESTS);
+      threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
+          DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
+    }
+
+    @Override
+    public String toString() {
+      return "min worker threads=" + minWorkerThreads
+          + ", max worker threads=" + maxWorkerThreads 
+          + ", max queued requests=" + maxQueuedRequests;
+    }
+  }
+
+  /** Executor service for handling client connections */
+  private ExecutorService executorService;
+
+  /** Flag for stopping the server */
+  private volatile boolean stopped;
+
+  private Args serverOptions;
+
+  public TBoundedThreadPoolServer(Args options) {
+    super(options);
+
+    BlockingQueue<Runnable> executorQueue;
+    if (options.maxQueuedRequests > 0) {
+      executorQueue = new LinkedBlockingQueue<Runnable>(
+          options.maxQueuedRequests);
+    } else {
+      executorQueue = new SynchronousQueue<Runnable>();
+    }
+
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setDaemon(true);
+    tfb.setNameFormat("thrift-worker-%d");
+    executorService =
+        new ThreadPoolExecutor(options.minWorkerThreads,
+            options.maxWorkerThreads, DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS,
+            executorQueue, tfb.build());
+    serverOptions = options;
+  }
+
+  public void serve() {
+    try {
+      serverTransport_.listen();
+    } catch (TTransportException ttx) {
+      LOG.error("Error occurred during listening.", ttx);
+      return;
+    }
+
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(getClass().getSimpleName() + "-shutdown-hook") {
+          @Override
+          public void run() {
+            TBoundedThreadPoolServer.this.stop();
+          }
+        });
+
+    stopped = false;
+    while (!stopped && !Thread.interrupted()) {
+      TTransport client = null;
+      try {
+        client = serverTransport_.accept();
+      } catch (TTransportException ttx) {
+        if (!stopped) {
+          LOG.warn("Transport error when accepting message", ttx);
+          continue;
+        } else {
+          // The server has been stopped
+          break;
+        }
+      }
+
+      ClientConnnection command = new ClientConnnection(client);
+      try {
+        executorService.execute(command);
+      } catch (RejectedExecutionException rex) {
+        if (client.getClass() == TSocket.class) {
+          // We expect the client to be TSocket.
+          LOG.warn(QUEUE_FULL_MSG + " from " +
+              ((TSocket) client).getSocket().getRemoteSocketAddress());
+        } else {
+          LOG.warn(QUEUE_FULL_MSG, rex);
+        }
+        client.close();
+      }
+    }
+
+    shutdownServer();
+  }
+
+  /**
+   * Loop until {@link ExecutorService#awaitTermination} finally does return
+   * without an interrupted exception. If we don't do this, then we'll shut
+   * down prematurely. We want to let the executor service clear its task
+   * queue, closing client sockets appropriately.
+   */
+  private void shutdownServer() {
+    executorService.shutdown();
+
+    long msLeftToWait =
+        serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
+    long timeMillis = System.currentTimeMillis();
+
+    LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
+        " pending requests");
+    boolean interrupted = false;
+    while (msLeftToWait >= 0) {
+      try {
+        executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long timePassed = System.currentTimeMillis() - timeMillis;
+        msLeftToWait -= timePassed;
+        timeMillis += timePassed;
+        interrupted = true;
+      }
+    }
+
+    LOG.info("Interrupting all worker threads and waiting for "
+        + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
+
+    // This will interrupt all the threads, even those running a task.
+    executorService.shutdownNow();
+    Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
+
+    // Preserve the interrupted status.
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("Thrift server shutdown complete");
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    serverTransport_.interrupt();
+  }
+
+  private class ClientConnnection implements Runnable {
+
+    private TTransport client;
+
+    /**
+     * Default constructor.
+     *
+     * @param client Transport to process
+     */
+    private ClientConnnection(TTransport client) {
+      this.client = client;
+    }
+
+    /**
+     * Loops on processing a client forever
+     */
+    public void run() {
+      TProcessor processor = null;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+      try {
+        processor = processorFactory_.getProcessor(client);
+        inputTransport = inputTransportFactory_.getTransport(client);
+        outputTransport = outputTransportFactory_.getTransport(client);
+        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+        // we check stopped_ first to make sure we're not supposed to be shutting
+        // down. this is necessary for graceful shutdown.
+        while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
+      } catch (TTransportException ttx) {
+        // Assume the client died and continue silently
+      } catch (TException tx) {
+        LOG.error("Thrift error occurred during processing of message.", tx);
+      } catch (Exception x) {
+        LOG.error("Error occurred during processing of message.", x);
+      }
+
+      if (inputTransport != null) {
+        inputTransport.close();
+      }
+
+      if (outputTransport != null) {
+        outputTransport.close();
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1206267&r1=1206266&r2=1206267&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Fri Nov 25 17:26:40 2011
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -65,6 +64,8 @@ import org.apache.hadoop.hbase.thrift.ge
 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
 import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface;
+import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor;
 import org.apache.hadoop.hbase.thrift.generated.IOError;
 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
 import org.apache.hadoop.hbase.thrift.generated.Mutation;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.util.Addr
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -83,7 +85,7 @@ import org.apache.thrift.protocol.TProto
 import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TServer.AbstractServerArgs;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -91,12 +93,113 @@ import org.apache.thrift.transport.TServ
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportFactory;
 
+import com.google.common.base.Joiner;
+
 /**
  * ThriftServer - this class starts up a Thrift server which implements the
  * Hbase API specified in the Hbase.thrift IDL file.
  */
 public class ThriftServer {
 
+  private static final Log LOG = LogFactory.getLog(ThriftServer.class);
+
+  private static final String MIN_WORKERS_OPTION = "minWorkers";
+  private static final String MAX_WORKERS_OPTION = "workers";
+  private static final String MAX_QUEUE_SIZE_OPTION = "queue";
+  private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
+  static final String BIND_OPTION = "bind";
+  static final String COMPACT_OPTION = "compact";
+  static final String FRAMED_OPTION = "framed";
+  static final String PORT_OPTION = "port";
+
+  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
+  private static final int DEFAULT_LISTEN_PORT = 9090;
+
+  private Configuration conf;
+  TServer server;
+
+  /** An enum of server implementation selections */
+  enum ImplType {
+    HS_HA("hsha", true, THsHaServer.class, false),
+    NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
+    THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true);
+
+    public static final ImplType DEFAULT = THREAD_POOL;
+
+    final String option;
+    final boolean isAlwaysFramed;
+    final Class<? extends TServer> serverClass;
+    final boolean canSpecifyBindIP;
+
+    ImplType(String option, boolean isAlwaysFramed,
+        Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
+      this.option = option;
+      this.isAlwaysFramed = isAlwaysFramed;
+      this.serverClass = serverClass;
+      this.canSpecifyBindIP = canSpecifyBindIP;
+    }
+
+    /**
+     * @return <code>-option</code> so we can get the list of options from
+     *         {@link #values()}
+     */
+    @Override
+    public String toString() {
+      return "-" + option;
+    }
+
+    String getDescription() {
+      StringBuilder sb = new StringBuilder("Use the " +
+          serverClass.getSimpleName());
+      if (isAlwaysFramed) {
+        sb.append(" This implies the framed transport.");
+      }
+      if (this == DEFAULT) {
+        sb.append("This is the default.");
+      }
+      return sb.toString();
+    }
+
+    static OptionGroup createOptionGroup() {
+      OptionGroup group = new OptionGroup();
+      for (ImplType t : values()) {
+        group.addOption(new Option(t.option, t.getDescription()));
+      }
+      return group;
+    }
+
+    static ImplType getServerImpl(CommandLine cmd) {
+      ImplType chosenType = null;
+      int numChosen = 0;
+      for (ImplType t : values()) {
+        if (cmd.hasOption(t.option)) {
+          chosenType = t;
+          ++numChosen;
+        }
+      }
+      if (numChosen != 1) {
+        throw new AssertionError("Exactly one option out of " +
+            Arrays.toString(values()) + " has to be specified");
+      }
+      return chosenType;
+    }
+
+    public String simpleClassName() {
+      return serverClass.getSimpleName();
+    }
+
+    public static List<String> serversThatCannotSpecifyBindIP() {
+      List<String> l = new ArrayList<String>();
+      for (ImplType t : values()) {
+        if (!t.canSpecifyBindIP) {
+          l.add(t.simpleClassName());
+        }
+      }
+      return l;
+    }
+
+  }
+
   /**
    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
@@ -713,84 +816,85 @@ public class ThriftServer {
 
     @Override
     public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError {
-        LOG.debug("scannerGetList: id=" + id);
-        ResultScanner scanner = getScanner(id);
-        if (null == scanner) {
-            throw new IllegalArgument("scanner ID is invalid");
-        }
+      LOG.debug("scannerGetList: id=" + id);
+      ResultScanner scanner = getScanner(id);
+      if (null == scanner) {
+        throw new IllegalArgument("scanner ID is invalid");
+      }
 
-        Result [] results = null;
-        try {
-            results = scanner.next(nbRows);
-            if (null == results) {
-                return new ArrayList<TRowResult>();
-            }
-        } catch (IOException e) {
-            throw new IOError(e.getMessage());
+      Result [] results = null;
+      try {
+        results = scanner.next(nbRows);
+        if (null == results) {
+          return new ArrayList<TRowResult>();
         }
-        return ThriftUtilities.rowResultFromHBase(results);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+      return ThriftUtilities.rowResultFromHBase(results);
     }
+
     @Override
     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
-        return scannerGetList(id,1);
+      return scannerGetList(id,1);
     }
 
     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError {
-        try {
-          HTable table = getTable(tableName);
-          Scan scan = new Scan();
-          if (tScan.isSetStartRow()) {
-              scan.setStartRow(tScan.getStartRow());
-          }
-          if (tScan.isSetStopRow()) {
-              scan.setStopRow(tScan.getStopRow());
-          }
-          if (tScan.isSetTimestamp()) {
-              scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());              
-          }
-          if (tScan.isSetCaching()) {
-              scan.setCaching(tScan.getCaching());
-          }
-          if(tScan.isSetColumns() && tScan.getColumns().size() != 0) {
-            for(ByteBuffer column : tScan.getColumns()) {
-              byte [][] famQf = KeyValue.parseColumn(getBytes(column));
-              if(famQf.length == 1) {
-                scan.addFamily(famQf[0]);
-              } else {
-                scan.addColumn(famQf[0], famQf[1]);
-              }
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan();
+        if (tScan.isSetStartRow()) {
+          scan.setStartRow(tScan.getStartRow());
+        }
+        if (tScan.isSetStopRow()) {
+          scan.setStopRow(tScan.getStopRow());
+        }
+        if (tScan.isSetTimestamp()) {
+          scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
+        }
+        if (tScan.isSetCaching()) {
+          scan.setCaching(tScan.getCaching());
+        }
+        if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
+          for(ByteBuffer column : tScan.getColumns()) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
             }
           }
-          if (tScan.isSetFilterString()) {
-            ParseFilter parseFilter = new ParseFilter();
-            scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
-          }
-          return addScanner(table.getScanner(scan));
-        } catch (IOException e) {
-          throw new IOError(e.getMessage());
         }
+        if (tScan.isSetFilterString()) {
+          ParseFilter parseFilter = new ParseFilter();
+          scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
+        }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
     }
 
     @Override
     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
-            List<ByteBuffer> columns) throws IOError {
-        try {
-          HTable table = getTable(tableName);
-          Scan scan = new Scan(getBytes(startRow));
-          if(columns != null && columns.size() != 0) {
-            for(ByteBuffer column : columns) {
-              byte [][] famQf = KeyValue.parseColumn(getBytes(column));
-              if(famQf.length == 1) {
-                scan.addFamily(famQf[0]);
-              } else {
-                scan.addColumn(famQf[0], famQf[1]);
-              }
+        List<ByteBuffer> columns) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan(getBytes(startRow));
+        if(columns != null && columns.size() != 0) {
+          for(ByteBuffer column : columns) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
             }
           }
-          return addScanner(table.getScanner(scan));
-        } catch (IOException e) {
-          throw new IOError(e.getMessage());
         }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
     }
 
     @Override
@@ -826,7 +930,7 @@ public class ThriftServer {
         Filter f = new WhileMatchFilter(
             new PrefixFilter(getBytes(startAndPrefix)));
         scan.setFilter(f);
-        if(columns != null && columns.size() != 0) {
+        if (columns != null && columns.size() != 0) {
           for(ByteBuffer column : columns) {
             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
             if(famQf.length == 1) {
@@ -849,8 +953,8 @@ public class ThriftServer {
         HTable table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow));
         scan.setTimeRange(Long.MIN_VALUE, timestamp);
-        if(columns != null && columns.size() != 0) {
-          for(ByteBuffer column : columns) {
+        if (columns != null && columns.size() != 0) {
+          for (ByteBuffer column : columns) {
             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
             if(famQf.length == 1) {
               scan.addFamily(famQf[0]);
@@ -873,8 +977,8 @@ public class ThriftServer {
         HTable table = getTable(tableName);
         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
         scan.setTimeRange(Long.MIN_VALUE, timestamp);
-        if(columns != null && columns.size() != 0) {
-          for(ByteBuffer column : columns) {
+        if (columns != null && columns.size() != 0) {
+          for (ByteBuffer column : columns) {
             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
             if(famQf.length == 1) {
               scan.addFamily(famQf[0]);
@@ -911,7 +1015,7 @@ public class ThriftServer {
     }
 
     @Override
-    public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row, 
+    public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
         ByteBuffer family) throws IOError {
       try {
         HTable table = getTable(getBytes(tableName));
@@ -966,69 +1070,98 @@ public class ThriftServer {
     }
   }
 
+  public ThriftServer(Configuration conf) {
+    this.conf = HBaseConfiguration.create(conf);
+  }
+
   //
   // Main program and support routines
   //
 
-  private static void printUsageAndExit(Options options, int exitCode) {
+  private static void printUsageAndExit(Options options, int exitCode)
+      throws ExitCodeException {
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp("Thrift", null, options,
-            "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
-            "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" +
-            " send a kill signal to the thrift server pid",
-            true);
-      System.exit(exitCode);
+        "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
+        "To shutdown the thrift server run 'bin/hbase-daemon.sh stop " +
+        "thrift' or send a kill signal to the thrift server pid",
+        true);
+    throw new ExitCodeException(exitCode, "");
   }
 
-  private static final String DEFAULT_LISTEN_PORT = "9090";
-
   /*
-   * Start up the Thrift server.
+   * Start up or shuts down the Thrift server, depending on the arguments.
    * @param args
    */
-  static private void doMain(final String[] args) throws Exception {
-    Log LOG = LogFactory.getLog("ThriftServer");
-
+   void doMain(final String[] args) throws Exception {
     Options options = new Options();
-    options.addOption("b", "bind", true, "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]");
-    options.addOption("p", "port", true, "Port to bind to [default: 9090]");
-    options.addOption("f", "framed", false, "Use framed transport");
-    options.addOption("c", "compact", false, "Use the compact protocol");
+    options.addOption("b", BIND_OPTION, true, "Address to bind " +
+        "the Thrift server to. Not supported by the Nonblocking and " +
+        "HsHa server [default: " + DEFAULT_BIND_ADDR + "]");
+    options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
+        DEFAULT_LISTEN_PORT + "]");
+    options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
+    options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
     options.addOption("h", "help", false, "Print help information");
 
-    OptionGroup servers = new OptionGroup();
-    servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
-    servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
-    servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
-    options.addOptionGroup(servers);
+    options.addOption("m", MIN_WORKERS_OPTION, true,
+        "The minimum number of worker threads for " +
+        ImplType.THREAD_POOL.simpleClassName());
+
+    options.addOption("w", MAX_WORKERS_OPTION, true,
+        "The maximum number of worker threads for " +
+        ImplType.THREAD_POOL.simpleClassName());
+
+    options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
+        "The maximum number of queued requests in " +
+        ImplType.THREAD_POOL.simpleClassName());
+
+    options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
+        "The amount of time in secods to keep a thread alive when idle in " +
+        ImplType.THREAD_POOL.simpleClassName());
+
+    options.addOptionGroup(ImplType.createOptionGroup());
 
     CommandLineParser parser = new PosixParser();
     CommandLine cmd = parser.parse(options, args);
 
-    /**
-     * This is so complicated to please both bin/hbase and bin/hbase-daemon.
-     * hbase-daemon provides "start" and "stop" arguments
-     * hbase should print the help if no argument is provided
-     */
+    // This is so complicated to please both bin/hbase and bin/hbase-daemon.
+    // hbase-daemon provides "start" and "stop" arguments
+    // hbase should print the help if no argument is provided
     List<String> commandLine = Arrays.asList(args);
     boolean stop = commandLine.contains("stop");
     boolean start = commandLine.contains("start");
-    if (cmd.hasOption("help") || !start || stop) {
+    boolean invalidStartStop = (start && stop) || (!start && !stop);
+    if (cmd.hasOption("help") || invalidStartStop) {
+      if (invalidStartStop) {
+        LOG.error("Exactly one of 'start' and 'stop' has to be specified");
+      }
       printUsageAndExit(options, 1);
     }
 
     // Get port to bind to
     int listenPort = 0;
     try {
-      listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
+      listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
+          String.valueOf(DEFAULT_LISTEN_PORT)));
     } catch (NumberFormatException e) {
       LOG.error("Could not parse the value provided for the port option", e);
       printUsageAndExit(options, -1);
     }
 
+    // Make optional changes to the configuration based on command-line options
+    optionToConf(cmd, MIN_WORKERS_OPTION,
+        conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
+    optionToConf(cmd, MAX_WORKERS_OPTION,
+        conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
+    optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
+        conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
+    optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
+        conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
+
     // Construct correct ProtocolFactory
     TProtocolFactory protocolFactory;
-    if (cmd.hasOption("compact")) {
+    if (cmd.hasOption(COMPACT_OPTION)) {
       LOG.debug("Using compact protocol");
       protocolFactory = new TCompactProtocol.Factory();
     } else {
@@ -1036,78 +1169,125 @@ public class ThriftServer {
       protocolFactory = new TBinaryProtocol.Factory();
     }
 
-    HBaseHandler handler = new HBaseHandler();
-    Hbase.Processor processor = new Hbase.Processor(handler);
-
-    TServer server;
-    if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) {
-      if (cmd.hasOption("bind")) {
-        LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." +
-                " See https://issues.apache.org/jira/browse/HBASE-2155 for details.");
-        printUsageAndExit(options, -1);
-      }
+    HBaseHandler handler = new HBaseHandler(conf);
+    Hbase.Processor<Hbase.Iface> processor =
+        new Hbase.Processor<Hbase.Iface>(handler);
+    ImplType implType = ImplType.getServerImpl(cmd);
+
+    // Construct correct TransportFactory
+    TTransportFactory transportFactory;
+    if (cmd.hasOption(FRAMED_OPTION) || implType.isAlwaysFramed) {
+      transportFactory = new TFramedTransport.Factory();
+      LOG.debug("Using framed transport");
+    } else {
+      transportFactory = new TTransportFactory();
+    }
 
-      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort);
-      TFramedTransport.Factory transportFactory = new TFramedTransport.Factory();
+    if (cmd.hasOption(BIND_OPTION) && !implType.canSpecifyBindIP) {
+      LOG.error("Server types " + Joiner.on(", ").join(
+          ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
+          "address binding at the moment. See " +
+          "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
+      printUsageAndExit(options, -1);
+    }
 
-     if (cmd.hasOption("nonblocking")) {
-        TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
-        serverArgs.processor(processor);
-        serverArgs.transportFactory(transportFactory);
-        serverArgs.protocolFactory(protocolFactory);
+    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING) {
+      if (cmd.hasOption(BIND_OPTION)) {
+        throw new RuntimeException("-" + BIND_OPTION + " not supported with " +
+            implType);
+      }
 
-        LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort));
+      TNonblockingServerTransport serverTransport =
+          new TNonblockingServerSocket(listenPort);
+
+      if (implType == ImplType.NONBLOCKING) {
+        TNonblockingServer.Args serverArgs =
+            new TNonblockingServer.Args(serverTransport);
+        setServerArgs(serverArgs, processor, transportFactory,
+            protocolFactory);
         server = new TNonblockingServer(serverArgs);
       } else {
         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
         serverArgs.processor(processor);
         serverArgs.transportFactory(transportFactory);
         serverArgs.protocolFactory(protocolFactory);
-
-        LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort));
         server = new THsHaServer(serverArgs);
       }
+      LOG.info("starting HBase " + implType.simpleClassName() +
+          " server on " + Integer.toString(listenPort));
+    } else if (implType == ImplType.THREAD_POOL) {
+      // Thread pool server. Get the IP address to bind to.
+      InetAddress listenAddress = getBindAddress(options, cmd);
+
+      TServerTransport serverTransport = new TServerSocket(
+          new InetSocketAddress(listenAddress, listenPort));
+
+      TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args(
+          serverTransport, conf);
+      setServerArgs(serverArgs, processor, transportFactory, protocolFactory);
+      LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+          + listenAddress + ":" + Integer.toString(listenPort)
+          + "; " + serverArgs);
+      server = new TBoundedThreadPoolServer(serverArgs);
     } else {
-      // Get IP address to bind to
-      InetAddress listenAddress = null;
-      if (cmd.hasOption("bind")) {
-        try {
-          listenAddress = InetAddress.getByName(cmd.getOptionValue("bind"));
-        } catch (UnknownHostException e) {
-          LOG.error("Could not bind to provided ip address", e);
-          printUsageAndExit(options, -1);
-        }
-      } else {
-        listenAddress = InetAddress.getByName("0.0.0.0");
-      }
-      TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort));
-
-      // Construct correct TransportFactory
-      TTransportFactory transportFactory;
-      if (cmd.hasOption("framed")) {
-        transportFactory = new TFramedTransport.Factory();
-        LOG.debug("Using framed transport");
-      } else {
-        transportFactory = new TTransportFactory();
-      }
+      throw new AssertionError("Unsupported Thrift server implementation: " +
+          implType.simpleClassName());
+    }
 
-      TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
-      serverArgs.processor(processor);
-      serverArgs.protocolFactory(protocolFactory);
-      serverArgs.transportFactory(transportFactory);
-      LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort));
-      server = new TThreadPoolServer(serverArgs);
+    // A sanity check that we instantiated the right type of server.
+    if (server.getClass() != implType.serverClass) {
+      throw new AssertionError("Expected to create Thrift server class " +
+          implType.serverClass.getName() + " but got " +
+          server.getClass().getName());
     }
 
     server.serve();
   }
 
+  public void stop() {
+    server.stop();
+  }
+
+  private InetAddress getBindAddress(Options options, CommandLine cmd)
+      throws ExitCodeException {
+    InetAddress listenAddress = null;
+    String bindAddressStr = cmd.getOptionValue(BIND_OPTION, DEFAULT_BIND_ADDR);
+    try {
+      listenAddress = InetAddress.getByName(bindAddressStr);
+    } catch (UnknownHostException e) {
+      LOG.error("Could not resolve the bind address specified: " +
+          bindAddressStr, e);
+      printUsageAndExit(options, -1);
+    }
+    return listenAddress;
+  }
+
+  private static void setServerArgs(AbstractServerArgs<?> serverArgs,
+      Processor<Iface> processor, TTransportFactory transportFactory,
+      TProtocolFactory protocolFactory) {
+    serverArgs.processor(processor);
+    serverArgs.transportFactory(transportFactory);
+    serverArgs.protocolFactory(protocolFactory);
+  }
+
+  private static void optionToConf(CommandLine cmd, String option,
+      Configuration conf, String destConfKey) {
+    if (cmd.hasOption(option)) {
+      conf.set(destConfKey, cmd.getOptionValue(option));
+    }
+  }
+
   /**
    * @param args
    * @throws Exception
    */
   public static void main(String [] args) throws Exception {
-	VersionInfo.logVersion();
-    doMain(args);
+    VersionInfo.logVersion();
+    try {
+      new ThriftServer(HBaseConfiguration.create()).doMain(args);
+    } catch (ExitCodeException ex) {
+      System.exit(ex.getExitCode());
+    }
   }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1206267&r1=1206266&r2=1206267&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Threads.java Fri Nov 25 17:26:40 2011
@@ -25,6 +25,7 @@ import java.io.PrintWriter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Thread Utility
@@ -127,4 +128,28 @@ public class Threads {
       e.printStackTrace();
     }
   }
+
+  /**
+   * Sleeps for the given amount of time even if interrupted. Preserves
+   * the interrupt status.
+   * @param msToWait the amount of time to sleep in milliseconds
+   */
+  public static void sleepWithoutInterrupt(final long msToWait) {
+    long timeMillis = System.currentTimeMillis();
+    long endTime = timeMillis + msToWait;
+    boolean interrupted = false;
+    while (timeMillis < endTime) {
+      try {
+        Thread.sleep(endTime - timeMillis);
+      } catch (InterruptedException ex) {
+        interrupted = true;
+      }
+      timeMillis = System.currentTimeMillis();
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
 }

Modified: hbase/trunk/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-default.xml?rev=1206267&r1=1206266&r2=1206267&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/src/main/resources/hbase-default.xml Fri Nov 25 17:26:40 2011
@@ -824,4 +824,31 @@
     (You will have to restart your cluster after setting it).
     </description>
   </property>
+  <property>
+    <name>hbase.thrift.minWorkerThreads</name>
+    <value>16</value>
+    <description>
+    The "core size" of the thread pool. New threads are created on every
+    connection until this many threads are created.
+    </description>
+  </property>
+  <property>
+    <name>hbase.thrift.maxWorkerThreads</name>
+    <value>1000</value>
+    <description>
+    The maximum size of the thread pool. When the pending request queue
+    overflows, new threads are created until their number reaches this number.
+    After that, the server starts dropping connections.
+    </description>
+  </property>
+  <property>
+     <name>hbase.thrift.maxQueuedRequests</name>
+     <value>1000</value>
+     <description>
+     The maximum number of pending Thrift connections waiting in the queue. If
+     there are no idle threads in the pool, the server queues requests. Only
+     when the queue overflows, new threads are added, up to
+     hbase.thrift.maxQueuedRequests threads.
+     </description>
+   </property>
 </configuration>

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1206267&r1=1206266&r2=1206267&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Fri Nov 25 17:26:40 2011
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Field;
+import java.net.ServerSocket;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1714,4 +1715,31 @@ public class HBaseTestingUtility {
 
     return table;
   }
+
+  private static final int MIN_RANDOM_PORT = 0xc000;
+  private static final int MAX_RANDOM_PORT = 0xfffe;
+
+  /**
+   * Returns a random port. These ports cannot be registered with IANA and are
+   * intended for dynamic allocation (see http://bit.ly/dynports).
+   */
+  public static int randomPort() {
+    return MIN_RANDOM_PORT
+        + new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
+  }
+  
+  public static int randomFreePort() {
+    int port = 0;
+    do {
+      port = randomPort();
+      try {
+        ServerSocket sock = new ServerSocket(port);
+        sock.close();
+      } catch (IOException ex) {
+        port = 0;
+      }
+    } while (port == 0);
+    return port;
+  }
+
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java?rev=1206267&r1=1206266&r2=1206267&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java Fri Nov 25 17:26:40 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.thrift;
 
 import static org.junit.Assert.*;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
 import org.apache.hadoop.hbase.thrift.generated.Mutation;
 import org.apache.hadoop.hbase.thrift.generated.TCell;
 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
@@ -47,20 +47,22 @@ import org.junit.experimental.categories
 public class TestThriftServer {
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
   protected static final int MAXVERSIONS = 3;
-  private static ByteBuffer $bb(String i) {
+
+  private static ByteBuffer asByteBuffer(String i) {
     return ByteBuffer.wrap(Bytes.toBytes(i));
   }
+
   // Static names for tables, columns, rows, and values
-  private static ByteBuffer tableAname = $bb("tableA");
-  private static ByteBuffer tableBname = $bb("tableB");
-  private static ByteBuffer columnAname = $bb("columnA:");
-  private static ByteBuffer columnBname = $bb("columnB:");
-  private static ByteBuffer rowAname = $bb("rowA");
-  private static ByteBuffer rowBname = $bb("rowB");
-  private static ByteBuffer valueAname = $bb("valueA");
-  private static ByteBuffer valueBname = $bb("valueB");
-  private static ByteBuffer valueCname = $bb("valueC");
-  private static ByteBuffer valueDname = $bb("valueD");
+  private static ByteBuffer tableAname = asByteBuffer("tableA");
+  private static ByteBuffer tableBname = asByteBuffer("tableB");
+  private static ByteBuffer columnAname = asByteBuffer("columnA:");
+  private static ByteBuffer columnBname = asByteBuffer("columnB:");
+  private static ByteBuffer rowAname = asByteBuffer("rowA");
+  private static ByteBuffer rowBname = asByteBuffer("rowB");
+  private static ByteBuffer valueAname = asByteBuffer("valueA");
+  private static ByteBuffer valueBname = asByteBuffer("valueB");
+  private static ByteBuffer valueCname = asByteBuffer("valueC");
+  private static ByteBuffer valueDname = asByteBuffer("valueD");
 
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -100,7 +102,11 @@ public class TestThriftServer {
   public void doTestTableCreateDrop() throws Exception {
     ThriftServer.HBaseHandler handler =
       new ThriftServer.HBaseHandler(UTIL.getConfiguration());
+    createTestTables(handler);
+    dropTestTables(handler);
+  }
 
+  public static void createTestTables(Hbase.Iface handler) throws Exception {
     // Create/enable/disable/delete tables, ensure methods act correctly
     assertEquals(handler.getTableNames().size(), 0);
     handler.createTable(tableAname, getColumnDescriptors());
@@ -109,6 +115,9 @@ public class TestThriftServer {
     assertTrue(handler.isTableEnabled(tableAname));
     handler.createTable(tableBname, new ArrayList<ColumnDescriptor>());
     assertEquals(handler.getTableNames().size(), 2);
+  }
+
+  public static void dropTestTables(Hbase.Iface handler) throws Exception {
     handler.disableTable(tableBname);
     assertFalse(handler.isTableEnabled(tableBname));
     handler.deleteTable(tableBname);
@@ -121,7 +130,7 @@ public class TestThriftServer {
     handler.disableTable(tableAname);*/
     handler.deleteTable(tableAname);
   }
-  
+
   /**
    * Tests adding a series of Mutations and BatchMutations, including a
    * delete mutation.  Also tests data retrieval, and getting back multiple
@@ -343,7 +352,7 @@ public class TestThriftServer {
     handler.disableTable(tableAname);
     handler.deleteTable(tableAname);
   }
-  
+
   /**
    * For HBASE-2556
    * Tests for GetTableRegions
@@ -357,19 +366,19 @@ public class TestThriftServer {
     int regionCount = handler.getTableRegions(tableAname).size();
     assertEquals("empty table should have only 1 region, " +
             "but found " + regionCount, regionCount, 1);
-    handler.disableTable(tableAname);    
+    handler.disableTable(tableAname);
     handler.deleteTable(tableAname);
     regionCount = handler.getTableRegions(tableAname).size();
     assertEquals("non-existing table should have 0 region, " +
-            "but found " + regionCount, regionCount, 0);    
-  } 
-  
+            "but found " + regionCount, regionCount, 0);
+  }
+
   /**
    *
    * @return a List of ColumnDescriptors for use in creating a table.  Has one
    * default ColumnDescriptor and one ColumnDescriptor with fewer versions
    */
-  private List<ColumnDescriptor> getColumnDescriptors() {
+  private static List<ColumnDescriptor> getColumnDescriptors() {
     ArrayList<ColumnDescriptor> cDescriptors = new ArrayList<ColumnDescriptor>();
 
     // A default ColumnDescriptor

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java?rev=1206267&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java Fri Nov 25 17:26:40 2011
@@ -0,0 +1,230 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.thrift.ThriftServer.ImplType;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Start the HBase Thrift server on a random port through the command-line
+ * interface and talk to it from client side.
+ */
+@Category(LargeTests.class)
+@RunWith(Parameterized.class)
+public class TestThriftServerCmdLine {
+
+  public static final Log LOG = 
+      LogFactory.getLog(TestThriftServerCmdLine.class);
+
+  private final ImplType implType;
+  private boolean specifyFramed;
+  private boolean specifyBindIP;
+  private boolean specifyCompact;
+
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private Thread cmdLineThread;
+  private volatile Exception cmdLineException;
+  
+  private Exception clientSideException;
+
+  private ThriftServer thriftServer;
+  private int port;
+
+  @Parameters
+  public static Collection<Object[]> getParameters() {
+    Collection<Object[]> parameters = new ArrayList<Object[]>();
+    for (ThriftServer.ImplType implType : ThriftServer.ImplType.values()) {
+      for (boolean specifyFramed : new boolean[] {false, true}) {
+        for (boolean specifyBindIP : new boolean[] {false, true}) {
+          if (specifyBindIP && !implType.canSpecifyBindIP) {
+            continue;
+          }
+          for (boolean specifyCompact : new boolean[] {false, true}) {
+            parameters.add(new Object[]{implType, new Boolean(specifyFramed),
+                new Boolean(specifyBindIP), new Boolean(specifyCompact)});
+          }
+        }
+      }
+    }
+    return parameters;
+  }
+
+  public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed,
+      boolean specifyBindIP, boolean specifyCompact) {
+    this.implType = implType;
+    this.specifyFramed = specifyFramed;
+    this.specifyBindIP = specifyBindIP;
+    this.specifyCompact = specifyCompact;
+    LOG.debug("implType=" + implType + ", " +
+        "specifyFramed=" + specifyFramed + ", " +
+        "specifyBindIP=" + specifyBindIP + ", " +
+        "specifyCompact=" + specifyCompact);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void startCmdLineThread(final String[] args) {
+    LOG.info("Starting HBase Thrift server with command line: " +
+        Joiner.on(" ").join(args));
+
+    cmdLineException = null;
+    cmdLineThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          thriftServer.doMain(args);
+        } catch (Exception e) {
+          cmdLineException = e;
+        }
+      }
+    });
+    cmdLineThread.setName(ThriftServer.class.getSimpleName() +
+        "-cmdline");
+    cmdLineThread.start();
+  }
+
+  @Test(timeout=30 * 1000)
+  public void testRunThriftServer() throws Exception {
+    List<String> args = new ArrayList<String>();
+    if (implType != null) {
+      String serverTypeOption = implType.toString();
+      assertTrue(serverTypeOption.startsWith("-"));
+      args.add(serverTypeOption);
+    }
+    port = HBaseTestingUtility.randomFreePort();
+    args.add("-" + ThriftServer.PORT_OPTION);
+    args.add(String.valueOf(port));
+    if (specifyFramed) {
+      args.add("-" + ThriftServer.FRAMED_OPTION);
+    }
+    if (specifyBindIP) {
+      args.add("-" + ThriftServer.BIND_OPTION);
+      args.add(InetAddress.getLocalHost().getHostName());
+    }
+    if (specifyCompact) {
+      args.add("-" + ThriftServer.COMPACT_OPTION);
+    }
+    args.add("start");
+
+    thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
+    startCmdLineThread(args.toArray(new String[0]));
+    Threads.sleepWithoutInterrupt(2000);
+
+    try {
+      talkToThriftServer();
+    } catch (Exception ex) {
+      clientSideException = ex;
+    } finally {
+      stopCmdLineThread();
+    }
+
+    Class<? extends TServer> expectedClass;
+    if (implType != null) {
+      expectedClass = implType.serverClass;
+    } else {
+      expectedClass = TBoundedThreadPoolServer.class;
+    }
+    assertEquals(expectedClass, thriftServer.server.getClass());
+
+    if (clientSideException != null) {
+      LOG.error("Thrift client threw an exception", clientSideException);
+      throw new Exception(clientSideException);
+    }
+  }
+
+  private void talkToThriftServer() throws Exception {
+    TSocket sock = new TSocket(InetAddress.getLocalHost().getHostName(),
+        port);
+    TTransport transport = sock;
+    if (specifyFramed || implType.isAlwaysFramed) {
+      transport = new TFramedTransport(transport);
+    }
+
+    sock.open();
+    TProtocol prot;
+    if (specifyCompact) {
+      prot = new TCompactProtocol(transport);
+    } else {
+      prot = new TBinaryProtocol(transport);
+    }
+    Hbase.Client client = new Hbase.Client(prot);
+    List<ByteBuffer> tableNames = client.getTableNames();
+    if (tableNames.isEmpty()) {
+      TestThriftServer.createTestTables(client);
+      assertEquals(2, client.getTableNames().size());
+    } else {
+      assertEquals(2, tableNames.size());
+      assertEquals(2, client.getColumnDescriptors(tableNames.get(0)).size());
+    }
+    sock.close();
+  }
+
+  private void stopCmdLineThread() throws Exception {
+    LOG.debug("Stopping " + implType.simpleClassName() + " Thrift server");
+    thriftServer.stop();
+    cmdLineThread.join();
+    if (cmdLineException != null) {
+      LOG.error("Command-line invocation of HBase Thrift server threw an " +
+          "exception", cmdLineException);
+      throw new Exception(cmdLineException);
+    }
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java?rev=1206267&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java Fri Nov 25 17:26:40 2011
@@ -0,0 +1,75 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class TestThreads {
+  private static final Log LOG = LogFactory.getLog(TestThreads.class);
+
+  private static final int SLEEP_TIME_MS = 5000;
+  private static final int TOLERANCE_MS = (int) (0.05 * SLEEP_TIME_MS);
+
+  private volatile boolean wasInterrupted;
+
+  @Test(timeout=6000)
+  public void testSleepWithoutInterrupt() throws InterruptedException {
+    Thread sleeper = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOG.debug("Sleeper thread: sleeping for " + SLEEP_TIME_MS);
+        Threads.sleepWithoutInterrupt(SLEEP_TIME_MS);
+        LOG.debug("Sleeper thread: finished sleeping");
+        wasInterrupted = Thread.currentThread().isInterrupted();
+      }
+    });
+    LOG.debug("Starting sleeper thread (" + SLEEP_TIME_MS + " ms)");
+    sleeper.start();
+    long startTime = System.currentTimeMillis();
+    LOG.debug("Main thread: sleeping for 500 ms");
+    Threads.sleep(500);
+
+    LOG.debug("Interrupting the sleeper thread and sleeping for 2000 ms");
+    sleeper.interrupt();
+    Threads.sleep(2000);
+
+    LOG.debug("Interrupting the sleeper thread and sleeping for 1000 ms");
+    sleeper.interrupt();
+    Threads.sleep(1000);
+
+    LOG.debug("Interrupting the sleeper thread again");
+    sleeper.interrupt();
+    sleeper.join();
+
+    assertTrue("sleepWithoutInterrupt did not preserve the thread's " +
+        "interrupted status", wasInterrupted);
+
+    long timeElapsed = System.currentTimeMillis() - startTime;
+    assertTrue("Elapsed time " + timeElapsed + " ms is out of the expected " +
+        "range of the sleep time " + SLEEP_TIME_MS,
+        Math.abs(timeElapsed - SLEEP_TIME_MS) < TOLERANCE_MS);
+    LOG.debug("Target sleep time: " + SLEEP_TIME_MS + ", time elapsed: " +
+        timeElapsed);
+  }
+
+}



Mime
View raw message