From commits-return-6697-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Tue Oct 30 05:20:48 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A6C80180677 for ; Tue, 30 Oct 2018 05:20:47 +0100 (CET) Received: (qmail 42070 invoked by uid 500); 30 Oct 2018 04:20:46 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 42007 invoked by uid 99); 30 Oct 2018 04:20:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Oct 2018 04:20:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BDBA0E048B; Tue, 30 Oct 2018 04:20:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: adar@apache.org To: commits@kudu.apache.org Date: Tue, 30 Oct 2018 04:20:46 -0000 Message-Id: <36a0d7501e164e18bba42e13ce44a58c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] kudu git commit: rpc-test: fix TestClientConnectionMetrics rpc-test: fix TestClientConnectionMetrics Every now and then, this test would fail with: rpc-test.cc:542: Failure Expected: (dump_resp.outbound_connections(0).outbound_queue_size()) > (0), actual: 0 vs 0 Unfortunately, the test would go on to crash (and trigger a TSAN warning) due to the lack of proper cleanup in the event of an ASSERT failure. I've fixed that in this patch. I also tried to address the root of the test flakiness (that the outbound transfer queue contains at least one element), but I couldn't find a good way to do it. Blocking the server reactor thread has no effect on client-side queuing. And we can't block the client reactor thread outright because DumpRunningRpcs runs on it. Some of this is touched on in the original code review[1] that committed the test. Having given up, I wrapped the whole thing in an ASSERT_EVENTUALLY. It's ham-fisted for sure, but it seems to work: without it, the test fails every 100-200 runs on my laptop, and with it I can't get it to fail at all. I also looped it 1000 times in TSAN mode with 8 stress threads and didn't see any failures. I don't understand the krpc subsystem very well, so if there's a better way, I'm all ears. 1. https://gerrit.cloudera.org/c/9343/ Change-Id: I9c565b80bdca435d18787c7df0ec992728363980 Reviewed-on: http://gerrit.cloudera.org:8080/11819 Reviewed-by: Alexey Serbin Tested-by: Adar Dembo Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3a77ba13 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3a77ba13 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3a77ba13 Branch: refs/heads/master Commit: 3a77ba131b68d0ef5affc043c29c81f4d07a2659 Parents: d0205b9 Author: Adar Dembo Authored: Mon Oct 29 13:05:21 2018 -0700 Committer: Adar Dembo Committed: Tue Oct 30 04:20:21 2018 +0000 ---------------------------------------------------------------------- src/kudu/rpc/rpc-test.cc | 94 ++++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/3a77ba13/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index 077b5a3..1cffdfd 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include "kudu/rpc/rpc-test-base.h" - #include #include #include @@ -26,13 +24,11 @@ #include #include #include -#include #include #include #include #include -#include #include #include #include @@ -49,6 +45,7 @@ #include "kudu/rpc/outbound_call.h" #include "kudu/rpc/proxy.h" #include "kudu/rpc/reactor.h" +#include "kudu/rpc/rpc-test-base.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_introspection.pb.h" #include "kudu/rpc/rpc_sidecar.h" @@ -495,59 +492,64 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) { // Test that the metrics on a per connection level work accurately. TEST_P(TestRpc, TestClientConnectionMetrics) { - // Only run one reactor per messenger, so we can grab the metrics from that - // one without having to check all. - n_server_reactor_threads_ = 1; - keepalive_time_ms_ = -1; - // Set up server. Sockaddr server_addr; bool enable_ssl = GetParam(); ASSERT_OK(StartTestServer(&server_addr, enable_ssl)); - // Set up client. + // Set up client with one reactor so that we can grab the metrics from just + // that reactor. LOG(INFO) << "Connecting to " << server_addr.ToString(); shared_ptr client_messenger; ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl)); Proxy p(client_messenger, server_addr, server_addr.host(), GenericCalculatorService::static_service_name()); - // Cause the reactor thread to be blocked for 2 seconds. - server_messenger_->ScheduleOnReactor(boost::bind(sleep, 2), MonoDelta::FromSeconds(0)); - - RpcController controller; - DumpRunningRpcsRequestPB dump_req; - DumpRunningRpcsResponsePB dump_resp; - dump_req.set_include_traces(false); - - // We'll send several calls asynchronously to force RPC queueing on the sender side. - int n_calls = 1000; - AddRequestPB add_req; - add_req.set_x(rand()); - add_req.set_y(rand()); - AddResponsePB add_resp; - - vector> controllers; - CountDownLatch latch(n_calls); - for (int i = 0; i < n_calls; i++) { - controllers.emplace_back(new RpcController()); - p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp, - controllers.back().get(), boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); - } - - // Since we blocked the only reactor thread for sometime, we should see RPCs queued on the - // OutboundTransfer queue, unless the main thread is very slow. - ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp)); - ASSERT_EQ(1, dump_resp.outbound_connections_size()); - ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0); - - // Wait for the calls to be marked finished. - latch.Wait(); + // Here we queue a bunch of calls to the server and test that the sender's + // OutboundTransfer queue is indeed populated with those calls. Unfortunately, + // we have no surefire way of controlling the queue directly; a fast client + // reactor thread or a slow main thread could cause all of the outbound calls + // to be sent before we test the queue size, even though the server can't yet process them. + // + // So we repeat the entire exercise until we get a non-zero queue size. + ASSERT_EVENTUALLY([&]{ + // We'll send several calls asynchronously to force RPC queueing on the sender side. + constexpr int n_calls = 1000; + AddRequestPB add_req; + add_req.set_x(rand()); + add_req.set_y(rand()); + AddResponsePB add_resp; + + // Send the calls. + vector> controllers; + CountDownLatch latch(n_calls); + for (int i = 0; i < n_calls; i++) { + controllers.emplace_back(new RpcController()); + p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp, + controllers.back().get(), boost::bind( + &CountDownLatch::CountDown, boost::ref(latch))); + } + auto cleanup = MakeScopedCleanup([&](){ + latch.Wait(); + }); + + // Test the OutboundTransfer queue. + DumpRunningRpcsRequestPB dump_req; + DumpRunningRpcsResponsePB dump_resp; + dump_req.set_include_traces(false); + ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp)); + ASSERT_EQ(1, dump_resp.outbound_connections_size()); + ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0); + + // Unblock all of the calls and wait for them to finish. + latch.Wait(); + cleanup.cancel(); - // Verify that all the RPCs have finished. - for (const auto& controller : controllers) { - ASSERT_TRUE(controller->finished()); - } + // Verify that all the RPCs have finished. + for (const auto& controller : controllers) { + ASSERT_TRUE(controller->finished()); + } + }); } // Test that outbound connections to the same server are reopen upon every RPC @@ -1134,7 +1136,7 @@ TEST_P(TestRpc, TestApplicationFeatureFlag) { TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) { auto savedFlags = kSupportedServerRpcFeatureFlags; - auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; }); + SCOPED_CLEANUP({ kSupportedServerRpcFeatureFlags = savedFlags; }); kSupportedServerRpcFeatureFlags = {}; // Set up server.