Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 51039 invoked from network); 24 Feb 2011 01:41:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 24 Feb 2011 01:41:52 -0000 Received: (qmail 85489 invoked by uid 500); 24 Feb 2011 01:41:51 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 85452 invoked by uid 500); 24 Feb 2011 01:41:51 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 85444 invoked by uid 99); 24 Feb 2011 01:41:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Feb 2011 01:41:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Feb 2011 01:41:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0C2D723889E5; Thu, 24 Feb 2011 01:41:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1074010 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Date: Thu, 24 Feb 2011 01:41:30 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110224014130.0C2D723889E5@eris.apache.org> Author: jbellis Date: Thu Feb 24 01:41:29 2011 New Revision: 1074010 URL: http://svn.apache.org/viewvc?rev=1074010&view=rev Log: reformat Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1074010&r1=1074009&r2=1074010&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Feb 24 01:41:29 2011 @@ -39,151 +39,180 @@ import org.apache.thrift.transport.TTran /** * Slightly modified version of the Apache Thrift TThreadPoolServer. - * + *

* This allows passing an executor so you have more control over the actual * behaviour of the tasks being run. - * + *

* Newer version of Thrift should make this obsolete. */ -public class CustomTThreadPoolServer extends TServer { +public class CustomTThreadPoolServer extends TServer +{ -private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName()); + private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName()); -// Executor service for handling client connections -private ExecutorService executorService_; + // Executor service for handling client connections + private ExecutorService executorService_; -// Flag for stopping the server -private volatile boolean stopped_; - -// Server options -private Options options_; - -// Customizable server options -public static class Options { - public int minWorkerThreads = 5; - public int maxWorkerThreads = Integer.MAX_VALUE; - public int stopTimeoutVal = 60; - public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; -} - - -public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory, - TServerSocket tServerSocket, - TTransportFactory inTransportFactory, - TTransportFactory outTransportFactory, - TProtocolFactory tProtocolFactory, - TProtocolFactory tProtocolFactory2, - Options options, - ExecutorService executorService) { - - super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory, - tProtocolFactory, tProtocolFactory2); - options_ = options; - executorService_ = executorService; -} - - -public void serve() { - try { - serverTransport_.listen(); - } catch (TTransportException ttx) { - LOGGER.error("Error occurred during listening.", ttx); - return; - } - - stopped_ = false; - while (!stopped_) { - int failureCount = 0; - try { - TTransport client = serverTransport_.accept(); - WorkerProcess wp = new WorkerProcess(client); - executorService_.execute(wp); - } catch (TTransportException ttx) { - if (!stopped_) { - ++failureCount; - LOGGER.warn("Transport error occurred during acceptance of message.", ttx); - } - } - } - - executorService_.shutdown(); - - // Loop until awaitTermination finally does return without a interrupted - // exception. If we don't do this, then we'll shut down prematurely. We want - // to let the executorService clear it's task queue, closing client sockets - // appropriately. - long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal); - long now = System.currentTimeMillis(); - while (timeoutMS >= 0) { - try { - executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); - break; - } catch (InterruptedException ix) { - long newnow = System.currentTimeMillis(); - timeoutMS -= (newnow - now); - now = newnow; - } - } -} - -public void stop() { - stopped_ = true; - serverTransport_.interrupt(); -} - -private class WorkerProcess implements Runnable { - - /** - * Client that this services. - */ - private TTransport client_; - - /** - * Default constructor. - * - * @param client Transport to process - */ - private WorkerProcess(TTransport client) { - client_ = client; - } - - /** - * Loops on processing a client forever - */ - public void run() { - TProcessor processor = null; - TTransport inputTransport = null; - TTransport outputTransport = null; - TProtocol inputProtocol = null; - TProtocol outputProtocol = null; - try { - processor = processorFactory_.getProcessor(client_); - inputTransport = inputTransportFactory_.getTransport(client_); - outputTransport = outputTransportFactory_.getTransport(client_); - inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); - // we check stopped_ first to make sure we're not supposed to be shutting - // down. this is necessary for graceful shutdown. - while (!stopped_ && processor.process(inputProtocol, outputProtocol)) - { - inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); - } - } catch (TTransportException ttx) { - // Assume the client died and continue silently - } catch (TException tx) { - LOGGER.error("Thrift error occurred during processing of message.", tx); - } catch (Exception x) { - LOGGER.error("Error occurred during processing of message.", x); - } - - if (inputTransport != null) { - inputTransport.close(); - } - - if (outputTransport != null) { - outputTransport.close(); - } - } -} + // Flag for stopping the server + private volatile boolean stopped_; + + // Server options + private Options options_; + + // Customizable server options + public static class Options + { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + } + + + public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory, + TServerSocket tServerSocket, + TTransportFactory inTransportFactory, + TTransportFactory outTransportFactory, + TProtocolFactory tProtocolFactory, + TProtocolFactory tProtocolFactory2, + Options options, + ExecutorService executorService) + { + + super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory, + tProtocolFactory, tProtocolFactory2); + options_ = options; + executorService_ = executorService; + } + + + public void serve() + { + try + { + serverTransport_.listen(); + } + catch (TTransportException ttx) + { + LOGGER.error("Error occurred during listening.", ttx); + return; + } + + stopped_ = false; + while (!stopped_) + { + int failureCount = 0; + try + { + TTransport client = serverTransport_.accept(); + WorkerProcess wp = new WorkerProcess(client); + executorService_.execute(wp); + } + catch (TTransportException ttx) + { + if (!stopped_) + { + ++failureCount; + LOGGER.warn("Transport error occurred during acceptance of message.", ttx); + } + } + } + + executorService_.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal); + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) + { + try + { + executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } + catch (InterruptedException ix) + { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + public void stop() + { + stopped_ = true; + serverTransport_.interrupt(); + } + + private class WorkerProcess implements Runnable + { + + /** + * Client that this services. + */ + private TTransport client_; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private WorkerProcess(TTransport client) + { + client_ = client; + } + + /** + * Loops on processing a client forever + */ + public void run() + { + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + try + { + processor = processorFactory_.getProcessor(client_); + inputTransport = inputTransportFactory_.getTransport(client_); + outputTransport = outputTransportFactory_.getTransport(client_); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + // we check stopped_ first to make sure we're not supposed to be shutting + // down. this is necessary for graceful shutdown. + while (!stopped_ && processor.process(inputProtocol, outputProtocol)) + { + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + } + } + catch (TTransportException ttx) + { + // Assume the client died and continue silently + } + catch (TException tx) + { + LOGGER.error("Thrift error occurred during processing of message.", tx); + } + catch (Exception x) + { + LOGGER.error("Error occurred during processing of message.", x); + } + + if (inputTransport != null) + { + inputTransport.close(); + } + + if (outputTransport != null) + { + outputTransport.close(); + } + } + } }