cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject [1/2] git commit: merge from 1.1
Date Fri, 05 Oct 2012 21:59:43 GMT
Updated Branches:
  refs/heads/trunk 074f4befa -> 6b83663ca


merge from 1.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b83663c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b83663c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b83663c

Branch: refs/heads/trunk
Commit: 6b83663ca494123d87477f69205633497e4bef1d
Parents: 074f4be 8264eb2
Author: Pavel Yaskevich <xedin@apache.org>
Authored: Fri Oct 5 14:59:13 2012 -0700
Committer: Pavel Yaskevich <xedin@apache.org>
Committed: Fri Oct 5 14:59:13 2012 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 conf/cassandra.yaml                                |    5 +-
 .../cassandra/config/DatabaseDescriptor.java       |    2 -
 .../apache/cassandra/thrift/CustomTHsHaServer.java |   39 ++++
 .../cassandra/thrift/CustomTNonBlockingServer.java |   31 +++
 .../cassandra/thrift/CustomTThreadPoolServer.java  |   63 ++++++
 .../cassandra/thrift/TServerCustomFactory.java     |   75 +++++++
 .../apache/cassandra/thrift/TServerFactory.java    |   43 ++++
 .../org/apache/cassandra/thrift/ThriftServer.java  |  150 ++-------------
 9 files changed, 274 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cab1425,4d2fd27..4fffbf7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -113,7 -19,7 +113,8 @@@
   * Pluggable Thrift transport factories for CLI (CASSANDRA-4609)
   * Backport adding AlterKeyspace statement (CASSANDRA-4611)
   * (CQL3) Correcty accept upper-case data types (CASSANDRA-4770)
 + * Add binary protocol events for schema changes (CASSANDRA-4684)
+  * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
  Merged from 1.0:
   * Switch from NBHM to CHM in MessagingService's callback map, which
     prevents OOM in long-running instances (CASSANDRA-4708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index f98f9f0,5e0be98..84c0ae5
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -333,31 -287,39 +333,34 @@@ rpc_port: 916
  # enable or disable keepalive on rpc connections
  rpc_keepalive: true
  
- # Cassandra provides three options for the RPC Server:
+ # Cassandra provides three out-of-the-box options for the RPC Server:
  #
 -# sync  -> One connection per thread in the rpc pool (see below).
 -#          For a very large number of clients, memory will be your limiting
 -#          factor; on a 64 bit JVM, 128KB is the minimum stack size per thread.
 -#          Connection pooling is very, very strongly recommended.
 -#
 -# async -> Nonblocking server implementation with one thread to serve 
 -#          rpc connections.  This is not recommended for high throughput use
 -#          cases. Async has been tested to be about 50% slower than sync
 -#          or hsha and is deprecated: it will be removed in the next major release.
 +# sync  -> One thread per thrift connection. For a very large number of clients, memory
 +#          will be your limiting factor. On a 64 bit JVM, 128KB is the minimum stack size
 +#          per thread, and that will correspond to your use of virtual memory (but physical
memory
 +#          may be limited depending on use of stack space).
  #
 -# hsha  -> Stands for "half synchronous, half asynchronous." The rpc thread pool 
 -#          (see below) is used to manage requests, but the threads are multiplexed
 -#          across the different clients.
 +# hsha  -> Stands for "half synchronous, half asynchronous." All thrift clients are handled
 +#          asynchronously using a small number of threads that does not vary with the amount
 +#          of thrift clients (and thus scales well to many clients). The rpc requests are
still
 +#          synchronous (one thread per active request).
  #
  # The default is sync because on Windows hsha is about 30% slower.  On Linux,
  # sync/hsha performance is about the same, with hsha of course using less memory.
+ #
+ # Alternatively,  can provide your own RPC server by providing the fully-qualified class
name
+ # of an o.a.c.t.TServerFactory that can create an instance of it.
  rpc_server_type: sync
  
 -# Uncomment rpc_min|max|thread to set request pool size.
 -# You would primarily set max for the sync server to safeguard against
 -# misbehaved clients; if you do hit the max, Cassandra will block until one
 -# disconnects before accepting more.  The defaults for sync are min of 16 and max
 -# unlimited.
 -# 
 -# For the Hsha server, the min and max both default to quadruple the number of
 -# CPU cores.
 +# Uncomment rpc_min|max_thread to set request pool size limits.
 +#
 +# Regardless of your choice of RPC server (see above), the number of maximum requests in
the
 +# RPC thread pool dictates how many concurrent requests are possible (but if you are using
the sync
 +# RPC server, it also dictates the number of clients that can be connected at all).
  #
 -# This configuration is ignored by the async server.
 +# The default is unlimited and thus provide no protection against clients overwhelming the
server. You are
 +# encouraged to set a maximum that makes sense for you in production, but do keep in mind
that
 +# rpc_max_threads represents the maximum number of client requests this server may execute
concurrently.
  #
  # rpc_min_threads: 16
  # rpc_max_threads: 2048

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index cc8f07f,7ed6170..02a91e8
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -390,13 -381,14 +390,11 @@@ public class DatabaseDescripto
              if (conf.stream_throughput_outbound_megabits_per_sec == null)
                  conf.stream_throughput_outbound_megabits_per_sec = 400;
  
-             if (!ThriftServer.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
-                 throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type);
              if (conf.rpc_min_threads == null)
 -                conf.rpc_min_threads = conf.rpc_server_type.toLowerCase().equals("hsha")
 -                                     ? Runtime.getRuntime().availableProcessors() * 4
 -                                     : 16;
 +                conf.rpc_min_threads = 16;
 +
              if (conf.rpc_max_threads == null)
 -                conf.rpc_max_threads = conf.rpc_server_type.toLowerCase().equals("hsha")
 -                                     ? Runtime.getRuntime().availableProcessors() * 4
 -                                     : Integer.MAX_VALUE;
 +                conf.rpc_max_threads = Integer.MAX_VALUE;
  
              /* data file and commit log directories. they get created later, when they're
needed. */
              if (conf.commitlog_directory != null && conf.data_file_directories !=
null && conf.saved_caches_directory != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
index c6517a2,6ade5ca..436dbb3
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@@ -7,17 -9,20 +7,18 @@@
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
 - *   http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing,
 - * software distributed under the License is distributed on an
 - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 - * KIND, either express or implied.  See the License for the
 - * specific language governing permissions and limitations
 - * under the License.
 + *     http://www.apache.org/licenses/LICENSE-2.0
   *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
   */
 -
 +package org.apache.cassandra.thrift;
  
  import java.io.IOException;
+ import java.net.InetSocketAddress;
  import java.nio.channels.SelectionKey;
  import java.nio.channels.Selector;
  import java.nio.channels.spi.SelectorProvider;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index 52e9f66,479fba8..b6f76a7
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@@ -7,19 -9,26 +7,24 @@@
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
 - *   http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing,
 - * software distributed under the License is distributed on an
 - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 - * KIND, either express or implied.  See the License for the
 - * specific language governing permissions and limitations
 - * under the License.
 + *     http://www.apache.org/licenses/LICENSE-2.0
   *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
   */
 -
 +package org.apache.cassandra.thrift;
  
+ import java.net.InetSocketAddress;
+ 
  import org.apache.cassandra.service.SocketSessionManagementService;
  import org.apache.thrift.server.TNonblockingServer;
+ import org.apache.thrift.server.TServer;
+ import org.apache.thrift.transport.TNonblockingServerTransport;
  import org.apache.thrift.transport.TNonblockingSocket;
+ import org.apache.thrift.transport.TTransportException;
  
  public class CustomTNonBlockingServer extends TNonblockingServer
  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index ebffa6d,fc07c60..00f0444
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@@ -1,29 -1,34 +1,37 @@@
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
 - * or more contributor license agreements. See the NOTICE file
 + * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
 - * regarding copyright ownership. The ASF licenses this file
 + * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
 - * with the License. You may obtain a copy of the License at
 + * with the License.  You may obtain a copy of the License at
   *
 - *   http://www.apache.org/licenses/LICENSE-2.0
 + *     http://www.apache.org/licenses/LICENSE-2.0
   *
 - * Unless required by applicable law or agreed to in writing,
 - * software distributed under the License is distributed on an
 - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 - * KIND, either express or implied. See the License for the
 - * specific language governing permissions and limitations
 - * under the License.
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
   */
 -
  package org.apache.cassandra.thrift;
  
+ import java.net.InetSocketAddress;
  import java.net.SocketTimeoutException;
  import java.util.concurrent.ExecutorService;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.ThreadPoolExecutor;
++import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.service.AbstractCassandraDaemon;
++import org.apache.cassandra.service.ClientState;
  import org.apache.thrift.TException;
  import org.apache.thrift.TProcessor;
  import org.apache.thrift.protocol.TProtocol;
@@@ -217,4 -223,32 +226,58 @@@ public class CustomTThreadPoolServer ex
              }
          }
      }
+ 
+     public static class Factory implements TServerFactory
+     {
+         public TServer buildTServer(Args args)
+         {
+             final InetSocketAddress addr = args.addr;
+             TServerTransport serverTransport;
+             try
+             {
+                 serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize,
args.recvBufferSize);
+             }
+             catch (TTransportException e)
+             {
+                 throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", addr.getAddress(), addr.getPort()), e);
+             }
+             // ThreadPool Server and will be invocation per connection basis...
+             TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+                                                                      .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                      .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                      .inputTransportFactory(args.inTransportFactory)
+                                                                      .outputTransportFactory(args.outTransportFactory)
+                                                                      .inputProtocolFactory(args.tProtocolFactory)
+                                                                      .outputProtocolFactory(args.tProtocolFactory)
+                                                                      .processor(args.processor);
 -            ExecutorService executorService = new AbstractCassandraDaemon.CleaningThreadPool(args.cassandraServer.clientState,
serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
++            ExecutorService executorService = new CleaningThreadPool(args.cassandraServer.clientState,
serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+             return new CustomTThreadPoolServer(serverArgs, executorService);
+         }
+     }
++
++    /**
++     * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool
++     * interface (for integration with Avro), and performs ClientState cleanup.
++     *
++     * (Note that the tasks being executed perform their own while-command-process
++     * loop until the client disconnects.)
++     */
++    private static class CleaningThreadPool extends ThreadPoolExecutor
++    {
++        private final ThreadLocal<ClientState> state;
++
++        public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread,
int maxWorkerThreads)
++        {
++            super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new NamedThreadFactory("Thrift"));
++            this.state = state;
++        }
++
++        @Override
++        protected void afterExecute(Runnable r, Throwable t)
++        {
++            super.afterExecute(r, t);
++            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
++            state.get().logout();
++        }
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
index 0000000,50e4fac..208f664
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
@@@ -1,0 -1,75 +1,75 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements. See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership. The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License. You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.cassandra.thrift;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.thrift.server.TServer;
+ 
+ /**
+  * Helper implementation to create a thrift TServer based on one of the common types we
support (sync, async, hsha),
+  * or a custom type by setting the fully qualified java class name in the rpc_server_type
setting.
+  */
+ public class TServerCustomFactory implements TServerFactory
+ {
+     private static Logger logger = LoggerFactory.getLogger(TServerCustomFactory.class);
+     private final String serverType;
+ 
+     public TServerCustomFactory(String serverType)
+     {
+         assert serverType != null;
+         this.serverType = serverType;
+     }
+ 
+     public TServer buildTServer(TServerFactory.Args args)
+     {
+         TServer server;
 -        if (CassandraDaemon.SYNC.equalsIgnoreCase(serverType))
++        if (ThriftServer.SYNC.equalsIgnoreCase(serverType))
+         {
+             server = new CustomTThreadPoolServer.Factory().buildTServer(args);
+             logger.info(String.format("Using synchronous/threadpool thrift server on %s
: %s", args.addr.getHostName(), args.addr.getPort()));
+         }
 -        else if(CassandraDaemon.ASYNC.equalsIgnoreCase(serverType))
++        else if(ThriftServer.ASYNC.equalsIgnoreCase(serverType))
+         {
+             server = new CustomTNonBlockingServer.Factory().buildTServer(args);
+             logger.info(String.format("Using non-blocking/asynchronous thrift server on
%s : %s", args.addr.getHostName(), args.addr.getPort()));
+         }
 -        else if(CassandraDaemon.HSHA.equalsIgnoreCase(serverType))
++        else if(ThriftServer.HSHA.equalsIgnoreCase(serverType))
+         {
+             server = new CustomTHsHaServer.Factory().buildTServer(args);
+             logger.info(String.format("Using custom half-sync/half-async thrift server on
%s : %s", args.addr.getHostName(), args.addr.getPort()));
+         }
+         else
+         {
+             TServerFactory serverFactory;
+             try
+             {
+                 serverFactory = (TServerFactory) Class.forName(serverType).newInstance();
+             }
+             catch (Exception e)
+             {
+                 throw new RuntimeException("Failed to instantiate server factory:" + serverType,
e);
+             }
+             server = serverFactory.buildTServer(args);
+             logger.info(String.format("Using custom thrift server %s on %s : %s", server.getClass().getName(),
args.addr.getHostName(), args.addr.getPort()));
+         }
+         return server;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/ThriftServer.java
index b104cfd,0000000..3ff28e5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/thrift/ThriftServer.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java
@@@ -1,235 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.thrift;
 +
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.SynchronousQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
 +
- import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
- import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.service.CassandraDaemon;
- import org.apache.thrift.server.TNonblockingServer;
- import org.apache.thrift.server.TThreadPoolServer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
++import org.apache.cassandra.service.CassandraDaemon;
 +import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.service.ClientState;
- import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.transport.TFramedTransport;
- import org.apache.thrift.transport.TNonblockingServerTransport;
- import org.apache.thrift.transport.TServerTransport;
- import org.apache.thrift.transport.TTransportException;
- import org.apache.thrift.transport.TTransportFactory;
 +
 +public class ThriftServer implements CassandraDaemon.Server
 +{
-     private static final Logger logger = LoggerFactory.getLogger(ThriftServer.class);
-     private final static String SYNC = "sync";
-     private final static String ASYNC = "async";
-     private final static String HSHA = "hsha";
-     public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC,
HSHA);
++    protected static CassandraDaemon instance;
++    private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
++    final static String SYNC = "sync";
++    final static String ASYNC = "async";
++    final static String HSHA = "hsha";
 +
 +    private final InetAddress address;
 +    private final int port;
 +    private volatile ThriftServerThread server;
 +
 +    public ThriftServer(InetAddress address, int port)
 +    {
 +        this.address = address;
 +        this.port = port;
 +    }
 +
 +    public void start()
 +    {
 +        if (server == null)
 +        {
 +            server = new ThriftServerThread(address, port);
 +            server.start();
 +        }
 +    }
 +
 +    public void stop()
 +    {
 +        if (server != null)
 +        {
 +            server.stopServer();
 +            try
 +            {
 +                server.join();
 +            }
 +            catch (InterruptedException e)
 +            {
 +                logger.error("Interrupted while waiting thrift server to stop", e);
 +            }
 +            server = null;
 +        }
 +    }
 +
 +    public boolean isRunning()
 +    {
 +        return server != null;
 +    }
 +
 +    /**
 +     * Simple class to run the thrift connection accepting code in separate
 +     * thread of control.
 +     */
 +    private static class ThriftServerThread extends Thread
 +    {
 +        private TServer serverEngine;
 +
 +        public ThriftServerThread(InetAddress listenAddr, int listenPort)
 +        {
 +            // now we start listening for clients
-             final CassandraServer cassandraServer = new CassandraServer();
-             Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
- 
-             // Transport
 +            logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
 +
-             // Protocol factory
-             TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true,
DatabaseDescriptor.getThriftMaxMessageLength());
- 
-             // Transport factory
++            TServerFactory.Args args = new TServerFactory.Args();
++            args.tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
++            args.addr = new InetSocketAddress(listenAddr, listenPort);
++            args.cassandraServer = new CassandraServer();
++            args.processor = new Cassandra.Processor(args.cassandraServer);
++            args.keepAlive = DatabaseDescriptor.getRpcKeepAlive();
++            args.sendBufferSize = DatabaseDescriptor.getRpcSendBufferSize();
++            args.recvBufferSize = DatabaseDescriptor.getRpcRecvBufferSize();
 +            int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
-             TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-             TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-             logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
 +
-             if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
-             {
-                 TServerTransport serverTransport;
-                 try
-                 {
-                     serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr,
listenPort),
-                                                               DatabaseDescriptor.getRpcKeepAlive(),
-                                                               DatabaseDescriptor.getRpcSendBufferSize(),
-                                                               DatabaseDescriptor.getRpcRecvBufferSize());
-                 }
-                 catch (TTransportException e)
-                 {
-                     throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", listenAddr, listenPort), e);
-                 }
-                 // ThreadPool Server and will be invocation per connection basis...
-                 TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
-                                                                          .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
-                                                                          .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
-                                                                          .inputTransportFactory(inTransportFactory)
-                                                                          .outputTransportFactory(outTransportFactory)
-                                                                          .inputProtocolFactory(tProtocolFactory)
-                                                                          .outputProtocolFactory(tProtocolFactory)
-                                                                          .processor(processor);
-                 ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
-                 serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
-                 logger.info(String.format("Using synchronous/threadpool thrift server on
%s : %s", listenAddr, listenPort));
-             }
-             else
-             {
-                 TNonblockingServerTransport serverTransport;
-                 try
-                 {
-                     serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr,
listenPort),
-                                                                              DatabaseDescriptor.getRpcKeepAlive(),
-                                                                              DatabaseDescriptor.getRpcSendBufferSize(),
-                                                                              DatabaseDescriptor.getRpcRecvBufferSize());
-                 }
-                 catch (TTransportException e)
-                 {
-                     throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", listenAddr, listenPort), e);
-                 }
- 
-                 if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
-                 {
-                     // This is single threaded hence the invocation will be all
-                     // in one thread.
-                     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                                        
             .outputTransportFactory(outTransportFactory)
-                                                                                        
             .inputProtocolFactory(tProtocolFactory)
-                                                                                        
             .outputProtocolFactory(tProtocolFactory)
-                                                                                        
             .processor(processor);
-                     logger.info(String.format("Using non-blocking/asynchronous thrift server
on %s : %s", listenAddr, listenPort));
-                     serverEngine = new CustomTNonBlockingServer(serverArgs);
-                 }
-                 else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
-                 {
-                     // This is NIO selector service but the invocation will be Multi-Threaded
with the Executor service.
-                     ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
-                                                                                        DatabaseDescriptor.getRpcMaxThreads(),
-                                                                                        60L,
-                                                                                        TimeUnit.SECONDS,
-                                                                                        new
SynchronousQueue<Runnable>(),
-                                                                                        new
NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
-                     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                                        .outputTransportFactory(outTransportFactory)
-                                                                                        .inputProtocolFactory(tProtocolFactory)
-                                                                                        .outputProtocolFactory(tProtocolFactory)
-                                                                                        .processor(processor);
-                     logger.info(String.format("Using custom half-sync/half-async thrift
server on %s : %s", listenAddr, listenPort));
-                     // Check for available processors in the system which will be equal
to the IO Threads.
-                     serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
-                 }
-             }
++            logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
++            args.inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
++            args.outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
++            serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
 +        }
 +
 +        public void run()
 +        {
 +            logger.info("Listening for thrift clients...");
 +            serverEngine.serve();
 +        }
 +
 +        public void stopServer()
 +        {
 +            logger.info("Stop listening to thrift clients");
 +            serverEngine.stop();
 +        }
 +    }
- 
-     /**
-      * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool
-      * interface (for integration with Avro), and performs ClientState cleanup.
-      *
-      * (Note that the tasks being executed perform their own while-command-process
-      * loop until the client disconnects.)
-      */
-     private static class CleaningThreadPool extends ThreadPoolExecutor
-     {
-         private final ThreadLocal<ClientState> state;
-         public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread,
int maxWorkerThreads)
-         {
-             super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new NamedThreadFactory("Thrift"));
-             this.state = state;
-         }
- 
-         @Override
-         protected void afterExecute(Runnable r, Throwable t)
-         {
-             super.afterExecute(r, t);
-             DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
-             state.get().logout();
-         }
-     }
 +}


Mime
View raw message