impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sail...@apache.org
Subject [1/3] incubator-impala git commit: IMPALA-5394: Change ThriftServer() to always use TAcceptQueueServer
Date Thu, 05 Oct 2017 05:38:56 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 72072f6e8 -> c9740b43d


IMPALA-5394: Change ThriftServer() to always use TAcceptQueueServer

- Previously TThreadPoolServer called getTransport() on a client from
  the Server thread (the thread that did the accepts).
  - TSaslServerTransport->getTransport() called TSaslTransport->open()
  - TSaslServerTransport->open() tried to negotiate SASL which calls
    read/write
    - If read/write blocks indefinitely, the TThreadPoolServer could
      not accept connections until tcp_keepalive kicked in.
- Set the underlying TSocket's recvTimeout and sendTimeout before the
  TSaslServerTransport->open() and reset them to 0 after open()
  completes.
- Added sasl_connect_tcp_timeout_ms flag that defaults to 300000
  milliseconds (5 minutes)
- Add the ability for TAcceptQueueServer to limit the maximum
  number of concurrent tasks
- Added a test case to thrift-server-test to test
  max_concurrent_connections enforcement
- Changed the remaining Thrift servers to use TAcceptQueueServer.
  (hs2/beeswax/network-perf-benchmark)
  - The timeout is still needed in TAcceptQueueServer since
    SetupConnection follows a similar pattern that TThreadPoolServer
    does.
- Removed support for TThreadPool from ThriftServer() since it is
  no longer used anywhere. ThriftServer() now always uses
  TAcceptQueueServer.
- Deprecated enable_accept_queue_server flag and removed supporting
  code.

Change-Id: I56a5f3d9cf931cff14eae7f236fea018236a6255
Reviewed-on: http://gerrit.cloudera.org:8080/7061
Reviewed-by: Sailesh Mukil <sailesh@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4dd0f1b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4dd0f1b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4dd0f1b3

Branch: refs/heads/master
Commit: 4dd0f1b3d84f67eb40bf671160b057be9bbdb921
Parents: 72072f6
Author: John Sherman <jfs@arcadiadata.com>
Authored: Thu Jun 1 18:49:53 2017 +0000
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Oct 5 02:26:01 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/network-perf-benchmark.cc |  5 ++-
 be/src/common/global-flags.cc               |  6 +--
 be/src/rpc/TAcceptQueueServer.cpp           |  7 ++--
 be/src/rpc/TAcceptQueueServer.h             | 28 ++++++++++---
 be/src/rpc/thrift-server-test.cc            | 48 ++++++++++++++++++++++
 be/src/rpc/thrift-server.cc                 | 46 ++++-----------------
 be/src/rpc/thrift-server.h                  | 52 ++++++++----------------
 be/src/service/impala-server.cc             |  4 +-
 be/src/transport/TSaslServerTransport.cpp   | 51 +++++++++++++++--------
 common/thrift/metrics.json                  | 20 +++++++++
 10 files changed, 157 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/benchmarks/network-perf-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/network-perf-benchmark.cc b/be/src/benchmarks/network-perf-benchmark.cc
index 1a0de24..251399e 100644
--- a/be/src/benchmarks/network-perf-benchmark.cc
+++ b/be/src/benchmarks/network-perf-benchmark.cc
@@ -27,6 +27,7 @@
 #include "gen-cpp/NetworkTest_types.h"
 #include "gen-cpp/NetworkTestService.h"
 
+#include "common/init.h"
 #include "common/logging.h"
 #include "util/cpu-info.h"
 #include "util/stopwatch.h"
@@ -203,7 +204,7 @@ bool ProcessCommand(const vector<string>& tokens) {
 
 int main(int argc, char** argv) {
   google::ParseCommandLineFlags(&argc, &argv, true);
-  CpuInfo::Init();
+  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
 
   if (argc != 1) {
     // Just run client from command line args
@@ -223,7 +224,7 @@ int main(int argc, char** argv) {
   boost::shared_ptr<TProcessor> processor(new NetworkTestServiceProcessor(handler));
   ThriftServer* server;
   ABORT_IF_ERROR(ThriftServerBuilder("Network Test Server", processor, FLAGS_port)
-                     .thread_pool(100)
+                     .max_concurrent_connections(100)
                      .Build(&server));
   thread* server_thread = new thread(&TestServer::Server, handler.get(), server);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index e0a3384..1a8b027 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -153,11 +153,7 @@ DEFINE_int32(kudu_operation_timeout_ms, 3 * 60 * 1000, "Timeout (milliseconds)
s
     "all Kudu operations. This must be a positive value, and there is no way to disable "
     "timeouts.");
 
-DEFINE_bool(enable_accept_queue_server, true,
-    "If true, uses a modified version of "
-    "TThreadedServer that accepts connections as quickly as possible and hands them off "
-    "to a thread pool to finish setup, reducing the chances that connections time out "
-    "waiting to be accepted.");
+DEFINE_bool_hidden(enable_accept_queue_server, true, "Deprecated");
 
 DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
     "incremental stats the catalog is allowed to serialize per table. "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 030d714..8a398a2 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -111,9 +111,7 @@ class TAcceptQueueServer::Task : public Runnable {
     {
       Synchronized s(server_.tasksMonitor_);
       server_.tasks_.erase(this);
-      if (server_.tasks_.empty()) {
-        server_.tasksMonitor_.notify();
-      }
+      server_.tasksMonitor_.notify();
     }
   }
 
@@ -167,6 +165,9 @@ void TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport>
client) {
     // Insert thread into the set of threads
     {
       Synchronized s(tasksMonitor_);
+      while (maxTasks_ > 0 && tasks_.size() >= maxTasks_) {
+        tasksMonitor_.wait();
+      }
       tasks_.insert(task);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/TAcceptQueueServer.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 3f5530a..61335f9 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -53,11 +53,13 @@ class TAcceptQueueServer : public TServer {
  public:
   class Task;
 
+  // TODO: Determine which c'tors are used and remove unused ones.
   template <typename ProcessorFactory>
   TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
       const boost::shared_ptr<TServerTransport>& serverTransport,
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
 
   template <typename ProcessorFactory>
@@ -66,6 +68,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
 
   template <typename Processor>
@@ -73,6 +76,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TServerTransport>& serverTransport,
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(Processor, TProcessor));
 
   template <typename Processor>
@@ -81,6 +85,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(Processor, TProcessor));
 
   virtual ~TAcceptQueueServer();
@@ -99,16 +104,21 @@ class TAcceptQueueServer : public TServer {
  protected:
   void init();
 
-  // New - this is the work function for the thread pool, which does the work of setting
-  // up the connection and starting a thread to handle it.
+  // This is the work function for the thread pool, which does the work of setting up the
+  // connection and starting a thread to handle it. Will block if there are currently
+  // maxTasks_ connections and maxTasks_ is non-zero.
   void SetupConnection(boost::shared_ptr<TTransport> client);
 
   boost::shared_ptr<ThreadFactory> threadFactory_;
   volatile bool stop_;
 
+  // Monitor protecting tasks_, notified on removal.
   Monitor tasksMonitor_;
   std::set<Task*> tasks_;
 
+  // The maximum number of running tasks allowed at a time.
+  const int32_t maxTasks_;
+
   /// New - True if metrics are enabled
   bool metrics_enabled_;
 
@@ -122,8 +132,10 @@ TAcceptQueueServer::TAcceptQueueServer(
     const boost::shared_ptr<TServerTransport>& serverTransport,
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
-  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {
+  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+    maxTasks_(maxTasks) {
   init();
 }
 
@@ -134,9 +146,10 @@ TAcceptQueueServer::TAcceptQueueServer(
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
     const boost::shared_ptr<ThreadFactory>& threadFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
   : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
-    threadFactory_(threadFactory) {
+    threadFactory_(threadFactory), maxTasks_(maxTasks) {
   init();
 }
 
@@ -145,8 +158,10 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>&
proce
     const boost::shared_ptr<TServerTransport>& serverTransport,
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
-  : TServer(processor, serverTransport, transportFactory, protocolFactory) {
+  : TServer(processor, serverTransport, transportFactory, protocolFactory),
+    maxTasks_(maxTasks) {
   init();
 }
 
@@ -156,9 +171,10 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>&
proce
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
     const boost::shared_ptr<ThreadFactory>& threadFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
   : TServer(processor, serverTransport, transportFactory, protocolFactory),
-    threadFactory_(threadFactory) {
+    threadFactory_(threadFactory), maxTasks_(maxTasks) {
   init();
 }
 } // namespace server

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index ef50160..fbb00ef 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <string>
 
 #include "gen-cpp/StatestoreService.h"
@@ -395,6 +396,53 @@ TEST(SslTest, OverlappingMatchedCiphers) {
       });
 }
 
+TEST(ConcurrencyTest, MaxConcurrentConnections) {
+  // Tests if max concurrent connections is being enforced by the ThriftServer
+  // implementation. It creates a ThriftServer with max_concurrent_connections set to 2
+  // and a ThreadPool of clients that attempt to connect concurrently and sleep for a
+  // small amount of time. The test fails if the number of concurrently connected clients
+  // exceeds the requested max_concurrent_connections limit. The test will also fail if
+  // the number of concurrently connected clients never reaches the limit of
+  // max_concurrent_connections.
+  int port = GetServerPort();
+  int max_connections = 2;
+  ThriftServer* server;
+  std::atomic<int> num_concurrent_connections{0};
+  std::atomic<bool> did_reach_max{false};
+  EXPECT_OK(ThriftServerBuilder("DummyStatestore", MakeProcessor(), port)
+      .max_concurrent_connections(max_connections)
+      .Build(&server));
+  EXPECT_OK(server->Start());
+
+  ThreadPool<int> pool("ConcurrentTest", "MaxConcurrentConnections", 10, 10,
+      [&num_concurrent_connections, &did_reach_max, max_connections, port](int tid,
+            const int& item) {
+        ThriftClient<StatestoreServiceClientWrapper> client("localhost", port, "",
+            nullptr, false);
+        EXPECT_OK(client.Open());
+        bool send_done = false;
+        TRegisterSubscriberResponse resp;
+        EXPECT_NO_THROW({
+            client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest(),
+                &send_done);
+          });
+        int connection_count = ++num_concurrent_connections;
+        // Check that we have not exceeded the expected limit
+        EXPECT_TRUE(connection_count <= max_connections);
+        if (connection_count == max_connections) did_reach_max = true;
+        SleepForMs(100);
+        --num_concurrent_connections;
+  });
+  ASSERT_OK(pool.Init());
+
+  for (int i = 0; i < 10; ++i) pool.Offer(i);
+  pool.DrainAndShutdown();
+
+  // If we did not reach the maximum number of concurrent connections, the test was not
+  // effective.
+  EXPECT_TRUE(did_reach_max);
+}
+
 /// Test disabled because requires a high ulimit -n on build machines. Since the test does
 /// not always fail, we don't lose much coverage by disabling it until we fix the build
 /// infra issue.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index c385a66..5bf47b2 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -24,12 +24,9 @@
 #include <thrift/concurrency/Thread.h>
 #include <thrift/concurrency/ThreadManager.h>
 #include <thrift/protocol/TBinaryProtocol.h>
-#include <thrift/server/TThreadPoolServer.h>
-#include <thrift/server/TThreadedServer.h>
 #include <thrift/transport/TSocket.h>
 #include <thrift/transport/TSSLServerSocket.h>
 #include <thrift/transport/TSSLSocket.h>
-#include <thrift/server/TThreadPoolServer.h>
 #include <thrift/transport/TServerSocket.h>
 #include <gflags/gflags.h>
 
@@ -63,7 +60,6 @@ using namespace apache::thrift;
 DEFINE_int32_hidden(rpc_cnxn_attempts, 10, "Deprecated");
 DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
 
-DECLARE_bool(enable_accept_queue_server);
 DECLARE_string(principal);
 DECLARE_string(keytab_file);
 DECLARE_string(ssl_client_ca_certificate);
@@ -328,12 +324,11 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
-    MetricGroup* metrics, int num_worker_threads, ServerType server_type)
+    MetricGroup* metrics, int max_concurrent_connections)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
-    num_worker_threads_(num_worker_threads),
-    server_type_(server_type),
+    max_concurrent_connections_(max_concurrent_connections),
     name_(name),
     server_(NULL),
     processor_(processor),
@@ -451,37 +446,11 @@ Status ThriftServer::Start() {
   boost::shared_ptr<TTransportFactory> transport_factory;
   RETURN_IF_ERROR(CreateSocket(&server_socket));
   RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(&transport_factory));
-  switch (server_type_) {
-    case ThreadPool:
-      {
-        boost::shared_ptr<ThreadManager> thread_mgr(
-            ThreadManager::newSimpleThreadManager(num_worker_threads_));
-        thread_mgr->threadFactory(thread_factory);
-        thread_mgr->start();
-        server_.reset(new TThreadPoolServer(processor_, server_socket,
-                transport_factory, protocol_factory, thread_mgr));
-      }
-      break;
-    case Threaded:
-      if (FLAGS_enable_accept_queue_server) {
-        server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
-            protocol_factory, thread_factory));
-        if (metrics_ != NULL) {
-          stringstream key_prefix_ss;
-          key_prefix_ss << "impala.thrift-server." << name_;
-          (static_cast<TAcceptQueueServer*>(server_.get()))
-              ->InitMetrics(metrics_, key_prefix_ss.str());
-        }
-      } else {
-        server_.reset(new TThreadedServer(processor_, server_socket, transport_factory,
-            protocol_factory, thread_factory));
-      }
-      break;
-    default:
-      stringstream error_msg;
-      error_msg << "Unsupported server type: " << server_type_;
-      LOG(ERROR) << error_msg.str();
-      return Status(error_msg.str());
+  server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
+        protocol_factory, thread_factory, max_concurrent_connections_));
+  if (metrics_ != NULL) {
+    (static_cast<TAcceptQueueServer*>(server_.get()))->InitMetrics(metrics_,
+        Substitute("impala.thrift-server.$0", name_));
   }
   boost::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
       new ThriftServer::ThriftServerEventProcessor(this));
@@ -504,7 +473,6 @@ void ThriftServer::Join() {
 void ThriftServer::StopForTesting() {
   DCHECK(server_thread_ != NULL);
   DCHECK(server_);
-  DCHECK_EQ(server_type_, Threaded);
   server_->stop();
   if (started_) Join();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index f889a4e..588904f 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -35,14 +35,12 @@ namespace impala {
 
 class AuthProvider;
 
-/// Utility class for all Thrift servers. Runs a threaded server by default, or a
-/// TThreadPoolServer with, by default, 2 worker threads, that exposes the interface
+/// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
+/// default, no enforced concurrent connection limit, that exposes the interface
 /// described by a user-supplied TProcessor object.
 ///
 /// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
 /// private.
-///
-/// If TThreadPoolServer is used, client must use TSocket as transport.
 /// TODO: shutdown is buggy (which only harms tests)
 class ThriftServer {
  public:
@@ -91,14 +89,6 @@ class ThriftServer {
     virtual ~ConnectionHandlerIf() = default;
   };
 
-  static const int DEFAULT_WORKER_THREADS = 2;
-
-  /// There are 2 servers supported by Thrift with different threading models.
-  /// ThreadPool  -- Allocates a fixed number of threads. A thread is used by a
-  ///                connection until it closes.
-  /// Threaded    -- Allocates 1 thread per connection, as needed.
-  enum ServerType { ThreadPool = 0, Threaded };
-
   int port() const { return port_; }
 
   bool ssl_enabled() const { return ssl_enabled_; }
@@ -106,8 +96,7 @@ class ThriftServer {
   /// Blocks until the server stops and exits its main thread.
   void Join();
 
-  /// FOR TESTING ONLY; stop the server and block until the server is stopped; use it
-  /// only if it is a Threaded server.
+  /// FOR TESTING ONLY; stop the server and block until the server is stopped
   void StopForTesting();
 
   /// Starts the main server thread. Once this call returns, clients
@@ -151,12 +140,12 @@ class ThriftServer {
   ///  - auth_provider: Authentication scheme to use. If nullptr, use the global default
   ///    demon<->demon provider.
   ///  - metrics: if not nullptr, the server will register metrics on this object
-  ///  - num_worker_threads: the number of worker threads to use in any thread pool
-  ///  - server_type: the type of IO strategy this server should employ
+  ///  - max_concurrent_connections: The maximum number of concurrent connections allowed.
+  ///    If 0, there will be no enforced limit on the number of concurrent connections.
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
-      int num_worker_threads = DEFAULT_WORKER_THREADS, ServerType server_type = Threaded);
+      int max_concurrent_connections = 0);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
   /// arguments are the minimum SSL/TLS version, and paths to certificate and private key
@@ -198,12 +187,10 @@ class ThriftServer {
   /// The SSL/TLS protocol client versions that this server will allow to connect.
   apache::thrift::transport::SSLProtocol version_;
 
-  /// How many worker threads to use to serve incoming requests
-  /// (requests are queued if no thread is immediately available)
-  int num_worker_threads_;
-
-  /// ThreadPool or Threaded server
-  ServerType server_type_;
+  /// Maximum number of concurrent connections (connections will block until fewer than
+  /// max_concurrent_connections_ are concurrently active). If 0, there is no enforced
+  /// limit.
+  int max_concurrent_connections_;
 
   /// User-specified identifier that shows up in logs
   const std::string name_;
@@ -271,16 +258,10 @@ class ThriftServerBuilder {
     return *this;
   }
 
-  /// Make this server a thread-pool server with 'num_worker_threads' threads.
-  ThriftServerBuilder& thread_pool(int num_worker_threads) {
-    server_type_ = ThriftServer::ServerType::ThreadPool;
-    num_worker_threads_ = num_worker_threads;
-    return *this;
-  }
-
-  /// Make this server a threaded server (i.e. one thread per connection).
-  ThriftServerBuilder& threaded() {
-    server_type_ = ThriftServer::ServerType::Threaded;
+  /// Sets the maximum concurrent thread count for this server. Default is 0, which means
+  /// there is no enforced limit.
+  ThriftServerBuilder& max_concurrent_connections(int max_concurrent_connections) {
+    max_concurrent_connections_ = max_concurrent_connections;
     return *this;
   }
 
@@ -319,7 +300,7 @@ class ThriftServerBuilder {
   /// '*server'.
   Status Build(ThriftServer** server) {
     std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_, port_,
-        auth_provider_, metrics_, num_worker_threads_, server_type_));
+        auth_provider_, metrics_, max_concurrent_connections_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -329,8 +310,7 @@ class ThriftServerBuilder {
   }
 
  private:
-  ThriftServer::ServerType server_type_ = ThriftServer::ServerType::Threaded;
-  int num_worker_threads_ = ThriftServer::DEFAULT_WORKER_THREADS;
+  int max_concurrent_connections_ = 0;
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
   int port_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ac5b3a9..6ce20e9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1993,7 +1993,7 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port,
int32_t
     RETURN_IF_ERROR(
         builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
             .metrics(exec_env_->metrics())
-            .thread_pool(FLAGS_fe_service_threads)
+            .max_concurrent_connections(FLAGS_fe_service_threads)
             .Build(&server));
     beeswax_server_.reset(server);
     beeswax_server_->SetConnectionHandler(this);
@@ -2020,7 +2020,7 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port,
int32_t
     RETURN_IF_ERROR(
         builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
             .metrics(exec_env_->metrics())
-            .thread_pool(FLAGS_fe_service_threads)
+            .max_concurrent_connections(FLAGS_fe_service_threads)
             .Build(&server));
     hs2_server_.reset(server);
     hs2_server_->SetConnectionHandler(this);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/transport/TSaslServerTransport.cpp
----------------------------------------------------------------------
diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp
index a8000b1..15d548e 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -28,6 +28,7 @@
 #include <boost/thread/thread.hpp>
 
 #include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TSocket.h>
 #include "rpc/thrift-server.h"
 #include "transport/TSaslTransport.h"
 #include "transport/TSaslServerTransport.h"
@@ -36,6 +37,9 @@
 
 #include "common/names.h"
 
+DEFINE_int32(sasl_connect_tcp_timeout_ms, 300000, "(Advanced) The underlying TSocket "
+    "send/recv timeout in milliseconds for the initial SASL handeshake.");
+
 using namespace sasl;
 
 namespace apache { namespace thrift { namespace transport {
@@ -126,7 +130,6 @@ void TSaslServerTransport::handleSaslStartMessage() {
 
 boost::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
     boost::shared_ptr<TTransport> trans) {
-  lock_guard<mutex> l(transportMap_mutex_);
   // Thrift servers use both an input and an output transport to communicate with
   // clients. In principal, these can be different, but for SASL clients we require them
   // to be the same so that the authentication state is identical for communication in
@@ -138,29 +141,43 @@ boost::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
   // However, the cache map would retain references to all the transports it ever
   // created. Instead, we remove an entry in the map after it has been found for the first
   // time, that is, after the second call to getTransport() with the same argument. That
-  // matches the calling pattern in TThreadedServer and TThreadPoolServer, which both call
-  // getTransport() twice in succession when a connection is established, and then never
-  // again. This is obviously brittle (what if for some reason getTransport() is called a
-  // third time?) but for our usage of Thrift it's a tolerable band-aid.
+  // matches the calling pattern in TAcceptQueueServer which calls getTransport() twice in
+  // succession when a connection is established, and then never again. This is obviously
+  // brittle (what if for some reason getTransport() is called a third time?) but for our
+  // usage of Thrift it's a tolerable band-aid.
   //
   // An alternative approach is to use the 'custom deleter' feature of shared_ptr to
   // ensure that when ret_transport is eventually deleted, its corresponding map entry is
   // removed. That is likely to be error prone given the locking involved; for now we go
   // with the simple solution.
-  TransportMap::iterator trans_map = transportMap_.find(trans);
-  VLOG_EVERY_N(2, 100) << "getTransport(): transportMap_ size is: "
-                       << transportMap_.size();
   boost::shared_ptr<TBufferedTransport> ret_transport;
-  if (trans_map == transportMap_.end()) {
-    boost::shared_ptr<TTransport> wrapped(
-        new TSaslServerTransport(serverDefinitionMap_, trans));
-    ret_transport.reset(new TBufferedTransport(wrapped,
-            impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
-    ret_transport.get()->open();
+  {
+    lock_guard<mutex> l(transportMap_mutex_);
+    TransportMap::iterator trans_map = transportMap_.find(trans);
+    if (trans_map != transportMap_.end()) {
+      ret_transport = trans_map->second;
+      transportMap_.erase(trans_map);
+      return ret_transport;
+    }
+    // This method should never be called concurrently with the same 'trans' object.
+    // Therefore, it is safe to drop the transportMap_mutex_ here.
+  }
+  boost::shared_ptr<TTransport> wrapped(
+      new TSaslServerTransport(serverDefinitionMap_, trans));
+  // Set socket timeouts to prevent TSaslServerTransport->open from blocking the server
+  // from accepting new connections if a read/write blocks during the handshake
+  TSocket* socket = static_cast<TSocket*>(trans.get());
+  socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
+  socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
+  ret_transport.reset(new TBufferedTransport(wrapped,
+        impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
+  ret_transport.get()->open();
+  // Reset socket timeout back to zero, so idle clients do not timeout
+  socket->setRecvTimeout(0);
+  socket->setSendTimeout(0);
+  {
+    lock_guard<mutex> l(transportMap_mutex_);
     transportMap_[trans] = ret_transport;
-  } else {
-    ret_transport = trans_map->second;
-    transportMap_.erase(trans_map);
   }
   return ret_transport;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 4ba94be..4e67eae 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -702,6 +702,16 @@
     "key": "impala.thrift-server.beeswax-frontend.total-connections"
   },
   {
+    "description": "The number of Beeswax API connections to this Impala Daemon that have
been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Beeswax API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.beeswax-frontend.connection-setup-queue-size"
+  },
+  {
     "description": "The number of active HiveServer2 API connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"
@@ -722,6 +732,16 @@
     "key": "impala.thrift-server.hiveserver2-frontend.total-connections"
   },
   {
+    "description": "The number of HiveServer2 API connections to this Impala Daemon that
have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-frontend.connection-setup-queue-size"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"


Mime
View raw message