kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] incubator-kudu git commit: rpc: use earliest-deadline-first RPC scheduling and rejection
Date Fri, 01 Apr 2016 04:52:18 GMT
rpc: use earliest-deadline-first RPC scheduling and rejection

This changes the behavior of the RPC service queue to use earliest-deadline-first
scheduling. This is seeking to address an issue I noticed when testing
Impala on a box with 32 cores:

- Impala is spinning up 96 clients which all operate in a loop scanning
  local tablets. The "think time" in between Scan RPCs is very small,
  since the scanner threads are just pushing the requests onto an Impala-side
  queue and not doing any processing.

- With the default settings, we have 20 RPC handlers and a queue length of
  50. This causes the remaining 26 threads to get rejected with TOO_BUSY
  errors on their first Scan() RPCs.

- The unlucky threads back off by going to sleep for a little bit. Meanwhile,
  every time one of the lucky threads gets a response, it sends a new RPC and
  occupies the space in the queue that was just freed up. Because we have
  exactly 70 "lucky" threads, and 70 slots on the server side, and no "think
  time", the queue is full almost all the time.

- When one of the "unlucky" threads wakes up from its backoff sleep, it is
  extremely likely that it will not find an empty queue slot, and will thus
  just get rejected again.

The result of this behavior is extreme unfairness -- those threads that got
lucky at the beginning are successfully processing lots of scan requests, but
the ones that got unlucky at the beginning get rejected over and over again
until they eventually time out.

The approach taken by this patch is to do earliest-deadline-first (EDF)
scheduling for the queue. Because the scan RPC retries retain the deadline
of the original attempt after they back off, they'll have an earlier
deadline than a newly-arrived scan request, thus taking priority.

The patch includes a simple functional test which spawns a bunch of threads
which act somewhat like the above Impala scenario, and measure the number
of successful RPCs they are able to send in a 5-second period. Without the
patch, I got:

I0328 20:09:16.566520  1461 rpc_stub-test.cc:399] 1 1 0 1 10 17 6 1 12 12 17 10 8 7 12 9 16
15

In other words, some threads were able to complete tens of RPCs whereas other
threads were unable to complete any in the same time period. With the patch,
the distribution was very even:

I0328 20:08:21.608039  1250 rpc_stub-test.cc:399] 9 9 9 8 9 9 9 9 9 9 9 9 9 9 9 9 9

In testing on a cluster, this solved the frequent Impala query failures when
running 5 concurrent TPCH Q6 queries.

Change-Id: I423ce5d8c54f61aeab4909393bbcac3516fe94c6
Reviewed-on: http://gerrit.cloudera.org:8080/2641
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d79c1cf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d79c1cf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d79c1cf8

Branch: refs/heads/master
Commit: d79c1cf8020b1e577daecd2880ca7713af20c4b7
Parents: 7ffbbf9
Author: Todd Lipcon <todd@apache.org>
Authored: Fri Mar 25 18:58:39 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri Apr 1 04:51:34 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/inbound_call.cc  |   4 +
 src/kudu/rpc/inbound_call.h   |   3 +
 src/kudu/rpc/rpc-test-base.h  |   4 +-
 src/kudu/rpc/rpc_stub-test.cc |  96 ++++++++++++++++++++
 src/kudu/rpc/service_pool.cc  |  51 +++++++----
 src/kudu/rpc/service_pool.h   |  10 ++-
 src/kudu/rpc/service_queue.h  | 178 +++++++++++++++++++++++++++++++++++++
 7 files changed, 325 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 9fdaadf..4c5f75b 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -285,6 +285,10 @@ MonoTime InboundCall::GetClientDeadline() const {
   return deadline;
 }
 
+MonoTime InboundCall::GetTimeReceived() const {
+  return timing_.time_received;
+}
+
 vector<uint32_t> InboundCall::GetRequiredFeatures() const {
   vector<uint32_t> features;
   for (uint32_t feature : header_.required_feature_flags()) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index e15d323..08c87ea 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -159,6 +159,9 @@ class InboundCall {
   // If the client did not specify a deadline, returns MonoTime::Max().
   MonoTime GetClientDeadline() const;
 
+  // Return the time when this call was received.
+  MonoTime GetTimeReceived() const;
+
   // Returns the set of application-specific feature flags required to service
   // the RPC.
   std::vector<uint32_t> GetRequiredFeatures() const;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index c629720..fa3857d 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -256,6 +256,7 @@ class RpcTestBase : public KuduTest {
  public:
   RpcTestBase()
     : n_worker_threads_(3),
+      service_queue_length_(100),
       n_server_reactor_threads_(3),
       keepalive_time_ms_(1000),
       metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "test.rpc_test"))
{
@@ -399,7 +400,7 @@ class RpcTestBase : public KuduTest {
     gscoped_ptr<ServiceIf> service(new ServiceClass(metric_entity_));
     service_name_ = service->service_name();
     scoped_refptr<MetricEntity> metric_entity = server_messenger_->metric_entity();
-    service_pool_ = new ServicePool(std::move(service), metric_entity, 50);
+    service_pool_ = new ServicePool(std::move(service), metric_entity, service_queue_length_);
     server_messenger_->RegisterService(service_name_, service_pool_);
     ASSERT_OK(service_pool_->Init(n_worker_threads_));
   }
@@ -409,6 +410,7 @@ class RpcTestBase : public KuduTest {
   std::shared_ptr<Messenger> server_messenger_;
   scoped_refptr<ServicePool> service_pool_;
   int n_worker_threads_;
+  int service_queue_length_;
   int n_server_reactor_threads_;
   int keepalive_time_ms_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 48292be..8ac3dba 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
+#include <thread>
 #include <vector>
 
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 #include <boost/bind.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -46,6 +50,9 @@ class RpcStubTest : public RpcTestBase {
  public:
   virtual void SetUp() OVERRIDE {
     RpcTestBase::SetUp();
+    // Use a shorter queue length since some tests below need to start enough
+    // threads to saturate the queue.
+    service_queue_length_ = 10;
     StartTestServerWithGeneratedCode(&server_addr_);
     client_messenger_ = CreateMessenger("Client");
   }
@@ -328,6 +335,16 @@ TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
     sleeps.push_back(sleep.release());
   }
 
+  // We asynchronously sent the RPCs above, but the RPCs might still
+  // be in the queue. Because the RPC we send next has a lower timeout,
+  // it would take priority over the long-timeout RPCs. So, we have to
+  // wait until the above RPCs are being processed before we continue
+  // the test.
+  const Histogram* queue_time_metric = service_pool_->IncomingQueueTimeMetricForTests();
+  while (queue_time_metric->TotalCount() < n_worker_threads_) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+
   // Send another call with a short timeout. This shouldn't get processed, because
   // it'll get stuck in the queue for longer than its timeout.
   RpcController rpc;
@@ -347,6 +364,85 @@ TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
   ASSERT_EQ(1, timed_out_in_queue->value());
 }
 
+// Test which ensures that the RPC queue accepts requests with the earliest
+// deadline first (EDF), and upon overflow rejects requests with the latest deadlines.
+//
+// In particular, this simulates a workload experienced with Impala where the local
+// impalad would spawn more scanner threads than the total number of handlers plus queue
+// slots, guaranteeing that some of those clients would see SERVER_TOO_BUSY rejections on
+// scan requests and be forced to back off and retry.  Without EDF scheduling, we saw that
+// the "unlucky" threads that got rejected would likely continue to get rejected upon
+// retries, and some would be starved continually until they missed their overall deadline
+// and failed the query.
+//
+// With EDF scheduling, the retries take priority over the original requests (because
+// they retain their original deadlines). This prevents starvation of unlucky threads.
+TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
+  const int num_client_threads = service_queue_length_ + n_worker_threads_ + 5;
+  vector<std::thread> threads;
+  vector<int> successes(num_client_threads);
+  std::atomic<bool> done(false);
+  for (int thread_id = 0; thread_id < num_client_threads; thread_id++) {
+    threads.emplace_back([&, thread_id] {
+        Random rng(thread_id);
+        CalculatorServiceProxy p(client_messenger_, server_addr_);
+        while (!done.load()) {
+          // Set a deadline in the future. We'll keep using this same deadline
+          // on each of our retries.
+          MonoTime deadline = MonoTime::Now(MonoTime::FINE);
+          deadline.AddDelta(MonoDelta::FromSeconds(8));
+
+          for (int attempt = 1; !done.load(); attempt++) {
+            RpcController controller;
+            SleepRequestPB req;
+            SleepResponsePB resp;
+            controller.set_deadline(deadline);
+            req.set_sleep_micros(100000);
+            Status s = p.Sleep(req, &resp, &controller);
+            if (s.ok()) {
+              successes[thread_id]++;
+              break;
+            }
+            // We expect to get SERVER_TOO_BUSY errors because we have more clients than
the
+            // server has handlers and queue slots. No other errors are expected.
+            CHECK(s.IsRemoteError() &&
+                  controller.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY)
+                << "Unexpected RPC failure: " << s.ToString();
+            // Randomized exponential backoff (similar to that done by the scanners in the
Kudu
+            // client.).
+            int backoff = (0.5 + rng.NextDoubleFraction() * 0.5) * (std::min(1 << attempt,
1000));
+            VLOG(1) << "backoff " << backoff << "ms";
+            SleepFor(MonoDelta::FromMilliseconds(backoff));
+          }
+        }
+      });
+  }
+  // Let the threads run for 5 seconds before stopping them.
+  SleepFor(MonoDelta::FromSeconds(5));
+  done.store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // Before switching to earliest-deadline-first scheduling, the results
+  // here would typically look something like:
+  //  1 1 0 1 10 17 6 1 12 12 17 10 8 7 12 9 16 15
+  // With the fix, we see something like:
+  //  9 9 9 8 9 9 9 9 9 9 9 9 9 9 9 9 9
+  LOG(INFO) << "thread RPC success counts: " << successes;
+
+  int sum = 0;
+  int min = std::numeric_limits<int>::max();
+  for (int x : successes) {
+    sum += x;
+    min = std::min(min, x);
+  }
+  int avg = sum / successes.size();
+  ASSERT_GT(min, avg / 2)
+      << "expected the least lucky thread to have at least half as many successes "
+      << "as the average thread: min=" << min << " avg=" << avg;
+}
+
 TEST_F(RpcStubTest, TestDumpCallsInFlight) {
   CalculatorServiceProxy p(client_messenger_, server_addr_);
   AsyncSleep sleep;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/service_pool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index eb7d46a..a10d6c4 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -27,6 +27,7 @@
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_queue.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
@@ -95,7 +96,7 @@ void ServicePool::Shutdown() {
 
   // Now we must drain the service queue.
   Status status = Status::ServiceUnavailable("Service is shutting down");
-  gscoped_ptr<InboundCall> incoming;
+  std::unique_ptr<InboundCall> incoming;
   while (service_queue_.BlockingGet(&incoming)) {
     incoming.release()->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
   }
@@ -103,6 +104,22 @@ void ServicePool::Shutdown() {
   service_->Shutdown();
 }
 
+void ServicePool::RejectTooBusy(InboundCall* c) {
+  string err_msg =
+      Substitute("$0 request on $1 from $2 dropped due to backpressure. "
+                 "The service queue is full; it has $3 items.",
+                 c->remote_method().method_name(),
+                 service_->service_name(),
+                 c->remote_address().ToString(),
+                 service_queue_.max_size());
+  rpcs_queue_overflow_->Increment();
+  LOG(WARNING) << err_msg;
+  c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                    Status::ServiceUnavailable(err_msg));
+  DLOG(INFO) << err_msg << " Contents of service queue:\n"
+             << service_queue_.ToString();
+}
+
 Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
   InboundCall* c = call.release();
 
@@ -119,9 +136,20 @@ Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call)
{
   }
 
   TRACE_TO(c->trace(), "Inserting onto call queue");
+
   // Queue message on service queue
-  QueueStatus queue_status = service_queue_.Put(c);
-  if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) {
+  boost::optional<InboundCall*> evicted;
+  auto queue_status = service_queue_.Put(c, &evicted);
+  if (queue_status == ServiceQueue::QUEUE_FULL) {
+    RejectTooBusy(c);
+    return Status::OK();
+  }
+
+  if (PREDICT_FALSE(evicted != boost::none)) {
+    RejectTooBusy(*evicted);
+  }
+
+  if (PREDICT_TRUE(queue_status == ServiceQueue::QUEUE_SUCCESS)) {
     // NB: do not do anything with 'c' after it is successfully queued --
     // a service thread may have already dequeued it, processed it, and
     // responded by this point, in which case the pointer would be invalid.
@@ -129,20 +157,7 @@ Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call)
{
   }
 
   Status status = Status::OK();
-  if (queue_status == QUEUE_FULL) {
-    string err_msg =
-        Substitute("$0 request on $1 from $2 dropped due to backpressure. "
-        "The service queue is full; it has $3 items.",
-        c->remote_method().method_name(),
-        service_->service_name(),
-        c->remote_address().ToString(),
-        service_queue_.max_size());
-    status = Status::ServiceUnavailable(err_msg);
-    rpcs_queue_overflow_->Increment();
-    c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, status);
-    DLOG(INFO) << err_msg << " Contents of service queue:\n"
-               << service_queue_.ToString();
-  } else if (queue_status == QUEUE_SHUTDOWN) {
+  if (queue_status == ServiceQueue::QUEUE_SHUTDOWN) {
     status = Status::ServiceUnavailable("Service is shutting down");
     c->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
   } else {
@@ -154,7 +169,7 @@ Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call)
{
 
 void ServicePool::RunThread() {
   while (true) {
-    gscoped_ptr<InboundCall> incoming;
+    std::unique_ptr<InboundCall> incoming;
     if (!service_queue_.BlockingGet(&incoming)) {
       VLOG(1) << "ServicePool: messenger shutting down.";
       return;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/service_pool.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_pool.h b/src/kudu/rpc/service_pool.h
index 54e76e0..1dfc0ea 100644
--- a/src/kudu/rpc/service_pool.h
+++ b/src/kudu/rpc/service_pool.h
@@ -25,7 +25,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/rpc_service.h"
-#include "kudu/util/blocking_queue.h"
+#include "kudu/rpc/service_queue.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/status.h"
@@ -63,6 +63,10 @@ class ServicePool : public RpcService {
     return rpcs_timed_out_in_queue_.get();
   }
 
+  const Histogram* IncomingQueueTimeMetricForTests() const {
+    return incoming_queue_time_.get();
+  }
+
   const Counter* RpcsQueueOverflowMetric() const {
     return rpcs_queue_overflow_.get();
   }
@@ -71,9 +75,11 @@ class ServicePool : public RpcService {
 
  private:
   void RunThread();
+  void RejectTooBusy(InboundCall* c);
+
   gscoped_ptr<ServiceIf> service_;
   std::vector<scoped_refptr<kudu::Thread> > threads_;
-  BlockingQueue<InboundCall*> service_queue_;
+  ServiceQueue service_queue_;
   scoped_refptr<Histogram> incoming_queue_time_;
   scoped_refptr<Counter> rpcs_timed_out_in_queue_;
   scoped_refptr<Counter> rpcs_queue_overflow_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d79c1cf8/src/kudu/rpc/service_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_queue.h b/src/kudu/rpc/service_queue.h
new file mode 100644
index 0000000..7e20aba
--- /dev/null
+++ b/src/kudu/rpc/service_queue.h
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SERVICE_QUEUE_H
+#define KUDU_UTIL_SERVICE_QUEUE_H
+
+#include <boost/optional.hpp>
+#include <memory>
+#include <string>
+#include <set>
+
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+namespace rpc {
+
+// Blocking queue used for passing inbound RPC calls to the service handler pool.
+// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a
+// bounded number of calls. If the queue overflows, then calls with deadlines farthest
+// in the future are evicted.
+//
+// When calls do not provide deadlines, the RPC layer considers their deadline to
+// be infinitely in the future. This means that any call that does have a deadline
+// can evict any call that does not have a deadline. This incentivizes clients to
+// provide accurate deadlines for their calls.
+class ServiceQueue {
+ public:
+  // Return values for ServiceQueue::Put()
+  enum QueueStatus {
+    QUEUE_SUCCESS = 0,
+    QUEUE_SHUTDOWN = 1,
+    QUEUE_FULL = 2
+  };
+
+  explicit ServiceQueue(int max_size)
+      : shutdown_(false),
+        max_queue_size_(max_size),
+        not_empty_(&lock_) {
+  }
+
+  ~ServiceQueue() {
+    DCHECK(queue_.empty())
+        << "ServiceQueue holds bare pointers at destruction time";
+  }
+
+
+  // Get an element from the queue.  Returns false if we were shut down prior to
+  // getting the element.
+  bool BlockingGet(std::unique_ptr<InboundCall> *out) {
+    MutexLock l(lock_);
+    while (true) {
+      if (!queue_.empty()) {
+        auto it = queue_.begin();
+        out->reset(*it);
+        queue_.erase(it);
+        return true;
+      }
+      if (shutdown_) {
+        return false;
+      }
+      not_empty_.Wait();
+    }
+  }
+
+  // Add a new call to the queue.
+  // Returns:
+  // - QUEUE_SHUTDOWN if Shutdown() has already been called.
+  // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any
+  //   RPC already in the queue.
+  // - QUEUE_SUCCESS if 'call' was enqueued.
+  //
+  // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped
+  // another call out of the queue. In that case, *evicted will be set to the
+  // call that was bumped.
+  QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted) {
+    MutexLock l(lock_);
+    if (shutdown_) {
+      return QUEUE_SHUTDOWN;
+    }
+
+    if (queue_.size() >= max_queue_size_) {
+      DCHECK_EQ(queue_.size(), max_queue_size_);
+      auto it = queue_.end();
+      --it;
+      if (DeadlineLess(*it, call)) {
+        return QUEUE_FULL;
+      }
+
+      *evicted = *it;
+      queue_.erase(it);
+    }
+
+    queue_.insert(call);
+
+    l.Unlock();
+    not_empty_.Signal();
+    return QUEUE_SUCCESS;
+  }
+
+  // Shut down the queue.
+  // When a blocking queue is shut down, no more elements can be added to it,
+  // and Put() will return QUEUE_SHUTDOWN.
+  // Existing elements will drain out of it, and then BlockingGet will start
+  // returning false.
+  void Shutdown() {
+    MutexLock l(lock_);
+    shutdown_ = true;
+    not_empty_.Broadcast();
+  }
+
+  bool empty() const {
+    MutexLock l(lock_);
+    return queue_.empty();
+  }
+
+  int max_size() const {
+    return max_queue_size_;
+  }
+
+  std::string ToString() const {
+    std::string ret;
+
+    MutexLock l(lock_);
+    for (const auto* t : queue_) {
+      ret.append(t->ToString());
+      ret.append("\n");
+    }
+    return ret;
+  }
+
+ private:
+  // Comparison function which orders calls by their deadlines.
+  static bool DeadlineLess(const InboundCall* a,
+                           const InboundCall* b) {
+    auto time_a = a->GetClientDeadline();
+    auto time_b = b->GetClientDeadline();
+    if (time_a.Equals(time_b)) {
+      // If two calls have the same deadline (most likely because neither one specified
+      // one) then we should order them by arrival order.
+      time_a = a->GetTimeReceived();
+      time_b = b->GetTimeReceived();
+    }
+    return time_a.ComesBefore(time_b);
+  }
+
+  // Struct functor wrapper for DeadlineLess.
+  struct DeadlineLessStruct {
+    bool operator()(const InboundCall* a, const InboundCall* b) const {
+      return DeadlineLess(a, b);
+    }
+  };
+
+  bool shutdown_;
+  int max_queue_size_;
+  mutable Mutex lock_;
+  ConditionVariable not_empty_;
+  std::multiset<InboundCall*, DeadlineLessStruct> queue_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif


Mime
View raw message