cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject git commit: Add ability to use custom TServerFactory implementations patch by Jason Brown; reviewed by Pavel Yaskevich for CASSANDRA-4608
Date Fri, 05 Oct 2012 21:46:04 GMT
Updated Branches:
  refs/heads/cassandra-1.1 6eafeb2b0 -> 8264eb21c


Add ability to use custom TServerFactory implementations
patch by Jason Brown; reviewed by Pavel Yaskevich for CASSANDRA-4608


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8264eb21
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8264eb21
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8264eb21

Branch: refs/heads/cassandra-1.1
Commit: 8264eb21ccb20423ff7bdae0fbef6d88fe2b2529
Parents: 6eafeb2
Author: Pavel Yaskevich <xedin@apache.org>
Authored: Fri Oct 5 14:11:13 2012 -0700
Committer: Pavel Yaskevich <xedin@apache.org>
Committed: Fri Oct 5 14:12:35 2012 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 conf/cassandra.yaml                                |    5 +-
 .../cassandra/config/DatabaseDescriptor.java       |    2 -
 .../apache/cassandra/thrift/CassandraDaemon.java   |  120 ++-------------
 .../apache/cassandra/thrift/CustomTHsHaServer.java |   39 +++++
 .../cassandra/thrift/CustomTNonBlockingServer.java |   31 ++++
 .../cassandra/thrift/CustomTThreadPoolServer.java  |   32 ++++
 .../cassandra/thrift/TServerCustomFactory.java     |   75 +++++++++
 .../apache/cassandra/thrift/TServerFactory.java    |   43 +++++
 9 files changed, 241 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c680f03..4d2fd27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Pluggable Thrift transport factories for CLI (CASSANDRA-4609)
  * Backport adding AlterKeyspace statement (CASSANDRA-4611)
  * (CQL3) Correcty accept upper-case data types (CASSANDRA-4770)
+ * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
 Merged from 1.0:
  * Switch from NBHM to CHM in MessagingService's callback map, which
    prevents OOM in long-running instances (CASSANDRA-4708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c4732db..5e0be98 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -287,7 +287,7 @@ rpc_port: 9160
 # enable or disable keepalive on rpc connections
 rpc_keepalive: true
 
-# Cassandra provides three options for the RPC Server:
+# Cassandra provides three out-of-the-box options for the RPC Server:
 #
 # sync  -> One connection per thread in the rpc pool (see below).
 #          For a very large number of clients, memory will be your limiting
@@ -305,6 +305,9 @@ rpc_keepalive: true
 #
 # The default is sync because on Windows hsha is about 30% slower.  On Linux,
 # sync/hsha performance is about the same, with hsha of course using less memory.
+#
+# Alternatively,  can provide your own RPC server by providing the fully-qualified class
name
+# of an o.a.c.t.TServerFactory that can create an instance of it.
 rpc_server_type: sync
 
 # Uncomment rpc_min|max|thread to set request pool size.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 20fa981..7ed6170 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -381,8 +381,6 @@ public class DatabaseDescriptor
             if (conf.stream_throughput_outbound_megabits_per_sec == null)
                 conf.stream_throughput_outbound_megabits_per_sec = 400;
 
-            if (!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
-                throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type);
             if (conf.rpc_min_threads == null)
                 conf.rpc_min_threads = conf.rpc_server_type.toLowerCase().equals("hsha")
                                      ? Runtime.getRuntime().availableProcessors() * 4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
index 7153c08..2decb8e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
@@ -20,28 +20,14 @@ package org.apache.cassandra.thrift;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.service.AbstractCassandraDaemon;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
 
 /**
  * This class supports two methods for creating a Cassandra node daemon,
@@ -62,10 +48,9 @@ public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassan
     }
 
     private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class);
-    private final static String SYNC = "sync";
-    private final static String ASYNC = "async";
-    private final static String HSHA = "hsha";
-    public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC,
HSHA);
+    final static String SYNC = "sync";
+    final static String ASYNC = "async";
+    final static String HSHA = "hsha";
     private ThriftServer server;
 
     protected void startServer()
@@ -117,94 +102,21 @@ public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassan
         public ThriftServer(InetAddress listenAddr, int listenPort)
         {
             // now we start listening for clients
-            final CassandraServer cassandraServer = new CassandraServer();
-            Cassandra.Processor processor = new Cassandra.Processor(cassandraServer);
-
-            // Transport
             logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
 
-            // Protocol factory
-            TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
-
-            // Transport factory
+            TServerFactory.Args args = new TServerFactory.Args();
+            args.tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength());
+            args.addr = new InetSocketAddress(listenAddr, listenPort);
+            args.cassandraServer = new CassandraServer();
+            args.processor = new Cassandra.Processor(args.cassandraServer);
+            args.keepAlive = DatabaseDescriptor.getRpcKeepAlive();
+            args.sendBufferSize = DatabaseDescriptor.getRpcSendBufferSize();
+            args.recvBufferSize = DatabaseDescriptor.getRpcRecvBufferSize();
             int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
-            TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-            TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-            logger.info("Using TFastFramedTransport with a max frame size of {} bytes.",
tFramedTransportSize);
-
-            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
-            {
-                TServerTransport serverTransport;
-                try
-                {
-                    serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr,
listenPort),
-                                                              DatabaseDescriptor.getRpcKeepAlive(),
-                                                              DatabaseDescriptor.getRpcSendBufferSize(),
-                                                              DatabaseDescriptor.getRpcRecvBufferSize());
-                }
-                catch (TTransportException e)
-                {
-                    throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", listenAddr, listenPort), e);
-                }
-                // ThreadPool Server and will be invocation per connection basis...
-                TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
-                                                                         .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
-                                                                         .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
-                                                                         .inputTransportFactory(inTransportFactory)
-                                                                         .outputTransportFactory(outTransportFactory)
-                                                                         .inputProtocolFactory(tProtocolFactory)
-                                                                         .outputProtocolFactory(tProtocolFactory)
-                                                                         .processor(processor);
-                ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
-                serverEngine = new CustomTThreadPoolServer(serverArgs, executorService);
-                logger.info(String.format("Using synchronous/threadpool thrift server on
%s : %s", listenAddr, listenPort));
-            }
-            else
-            {
-                TNonblockingServerTransport serverTransport;
-                try
-                {
-                    serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr,
listenPort),
-                                                                             DatabaseDescriptor.getRpcKeepAlive(),
-                                                                             DatabaseDescriptor.getRpcSendBufferSize(),
-                                                                             DatabaseDescriptor.getRpcRecvBufferSize());
-                }
-                catch (TTransportException e)
-                {
-                    throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", listenAddr, listenPort), e);
-                }
-
-                if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
-                {
-                    // This is single threaded hence the invocation will be all
-                    // in one thread.
-                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                                        
            .outputTransportFactory(outTransportFactory)
-                                                                                        
            .inputProtocolFactory(tProtocolFactory)
-                                                                                        
            .outputProtocolFactory(tProtocolFactory)
-                                                                                        
            .processor(processor);
-                    logger.info(String.format("Using non-blocking/asynchronous thrift server
on %s : %s", listenAddr, listenPort));
-                    serverEngine = new CustomTNonBlockingServer(serverArgs);
-                }
-                else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
-                {
-                    // This is NIO selector service but the invocation will be Multi-Threaded
with the Executor service.
-                    ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
-                                                                                       DatabaseDescriptor.getRpcMaxThreads(),
-                                                                                       60L,
-                                                                                       TimeUnit.SECONDS,
-                                                                                       new
SynchronousQueue<Runnable>(),
-                                                                                       new
NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
-                    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                                       .outputTransportFactory(outTransportFactory)
-                                                                                       .inputProtocolFactory(tProtocolFactory)
-                                                                                       .outputProtocolFactory(tProtocolFactory)
-                                                                                       .processor(processor);
-                    logger.info(String.format("Using custom half-sync/half-async thrift server
on %s : %s", listenAddr, listenPort));
-                    // Check for available processors in the system which will be equal to
the IO Threads.
-                    serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
-                }
-            }
+            logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
+            args.inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            args.outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
         }
 
         public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
index 350a13d..6ade5ca 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.thrift;
 
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
@@ -30,9 +31,15 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.SocketSessionManagementService;
 import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TNonblockingTransport;
@@ -343,4 +350,36 @@ public class CustomTHsHaServer extends TNonblockingServer
         // thread because the method is not synchronized with the rest of the
         // selectors threads.
     }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            final InetSocketAddress addr = args.addr;
+            TNonblockingServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive,
args.sendBufferSize, args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+
+            // This is NIO selector service but the invocation will be Multi-Threaded with
the Executor service.
+            ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                               DatabaseDescriptor.getRpcMaxThreads(),
+                                                                               60L,
+                                                                               TimeUnit.SECONDS,
+                                                                               new SynchronousQueue<Runnable>(),
+                                                                               new NamedThreadFactory("RPC-Thread"),
"RPC-THREAD-POOL");
+            TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
+                                                                               .outputTransportFactory(args.outTransportFactory)
+                                                                               .inputProtocolFactory(args.tProtocolFactory)
+                                                                               .outputProtocolFactory(args.tProtocolFactory)
+                                                                               .processor(args.processor);
+            // Check for available processors in the system which will be equal to the IO
Threads.
+            return new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index 0b6c90b..479fba8 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -21,9 +21,14 @@ package org.apache.cassandra.thrift;
  */
 
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.service.SocketSessionManagementService;
 import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
 
 public class CustomTNonBlockingServer extends TNonblockingServer
 {
@@ -40,4 +45,30 @@ public class CustomTNonBlockingServer extends TNonblockingServer
         frameBuffer.invoke();
         return true;
     }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            final InetSocketAddress addr = args.addr;
+            TNonblockingServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive,
args.sendBufferSize, args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+
+            // This is single threaded hence the invocation will be all
+            // in one thread.
+            TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
+                                                                                        
    .outputTransportFactory(args.outTransportFactory)
+                                                                                        
    .inputProtocolFactory(args.tProtocolFactory)
+                                                                                        
    .outputProtocolFactory(args.tProtocolFactory)
+                                                                                        
    .processor(args.processor);
+            return new CustomTNonBlockingServer(serverArgs);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index d6ba012..fc07c60 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.thrift;
 
+import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -26,11 +27,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
@@ -219,4 +223,32 @@ public class CustomTThreadPoolServer extends TServer
             }
         }
     }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            final InetSocketAddress addr = args.addr;
+            TServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize,
args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create thrift socket
to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+            // ThreadPool Server and will be invocation per connection basis...
+            TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
+                                                                     .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                     .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                     .inputTransportFactory(args.inTransportFactory)
+                                                                     .outputTransportFactory(args.outTransportFactory)
+                                                                     .inputProtocolFactory(args.tProtocolFactory)
+                                                                     .outputProtocolFactory(args.tProtocolFactory)
+                                                                     .processor(args.processor);
+            ExecutorService executorService = new AbstractCassandraDaemon.CleaningThreadPool(args.cassandraServer.clientState,
serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+            return new CustomTThreadPoolServer(serverArgs, executorService);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
new file mode 100644
index 0000000..50e4fac
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.thrift.server.TServer;
+
+/**
+ * Helper implementation to create a thrift TServer based on one of the common types we support
(sync, async, hsha),
+ * or a custom type by setting the fully qualified java class name in the rpc_server_type
setting.
+ */
+public class TServerCustomFactory implements TServerFactory
+{
+    private static Logger logger = LoggerFactory.getLogger(TServerCustomFactory.class);
+    private final String serverType;
+
+    public TServerCustomFactory(String serverType)
+    {
+        assert serverType != null;
+        this.serverType = serverType;
+    }
+
+    public TServer buildTServer(TServerFactory.Args args)
+    {
+        TServer server;
+        if (CassandraDaemon.SYNC.equalsIgnoreCase(serverType))
+        {
+            server = new CustomTThreadPoolServer.Factory().buildTServer(args);
+            logger.info(String.format("Using synchronous/threadpool thrift server on %s :
%s", args.addr.getHostName(), args.addr.getPort()));
+        }
+        else if(CassandraDaemon.ASYNC.equalsIgnoreCase(serverType))
+        {
+            server = new CustomTNonBlockingServer.Factory().buildTServer(args);
+            logger.info(String.format("Using non-blocking/asynchronous thrift server on %s
: %s", args.addr.getHostName(), args.addr.getPort()));
+        }
+        else if(CassandraDaemon.HSHA.equalsIgnoreCase(serverType))
+        {
+            server = new CustomTHsHaServer.Factory().buildTServer(args);
+            logger.info(String.format("Using custom half-sync/half-async thrift server on
%s : %s", args.addr.getHostName(), args.addr.getPort()));
+        }
+        else
+        {
+            TServerFactory serverFactory;
+            try
+            {
+                serverFactory = (TServerFactory) Class.forName(serverType).newInstance();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Failed to instantiate server factory:" + serverType,
e);
+            }
+            server = serverFactory.buildTServer(args);
+            logger.info(String.format("Using custom thrift server %s on %s : %s", server.getClass().getName(),
args.addr.getHostName(), args.addr.getPort()));
+        }
+        return server;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/TServerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TServerFactory.java b/src/java/org/apache/cassandra/thrift/TServerFactory.java
new file mode 100644
index 0000000..0c93867
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TServerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import java.net.InetSocketAddress;
+
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TTransportFactory;
+
+public interface TServerFactory
+{
+    TServer buildTServer(Args args);
+
+    public static class Args
+    {
+        public InetSocketAddress addr;
+        public CassandraServer cassandraServer;
+        public Cassandra.Processor processor;
+        public TProtocolFactory tProtocolFactory;
+        public TTransportFactory inTransportFactory;
+        public TTransportFactory outTransportFactory;
+        public Integer sendBufferSize;
+        public Integer recvBufferSize;
+        public boolean keepAlive;
+    }
+}


Mime
View raw message