Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B6E4200B32 for ; Wed, 25 May 2016 07:36:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A19A160A35; Wed, 25 May 2016 05:36:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38E37160A36 for ; Wed, 25 May 2016 07:36:23 +0200 (CEST) Received: (qmail 15181 invoked by uid 500); 25 May 2016 05:36:22 -0000 Mailing-List: contact commits-help@kudu.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.incubator.apache.org Delivered-To: mailing list commits@kudu.incubator.apache.org Received: (qmail 15172 invoked by uid 99); 25 May 2016 05:36:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 May 2016 05:36:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 09C2CC26C6 for ; Wed, 25 May 2016 05:36:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id aaQAIYw7GEW2 for ; Wed, 25 May 2016 05:36:19 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 494525FBBD for ; Wed, 25 May 2016 05:36:18 +0000 (UTC) Received: (qmail 15083 invoked by uid 99); 25 May 2016 05:36:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 May 2016 05:36:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 65F45DFF8E; Wed, 25 May 2016 05:36:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.incubator.apache.org Date: Wed, 25 May 2016 05:36:18 -0000 Message-Id: <0e00a5a83f2545809b258b63ec4307da@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-kudu git commit: Add a request tracker to track client rpc sequence numbers archived-at: Wed, 25 May 2016 05:36:24 -0000 Add a request tracker to track client rpc sequence numbers This adds a new component to the rpc subsystem: the RequestTracker. This is inspired by the RequestTracker in "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al. and is responsible for generating new sequence numbers for rpcs and to keep track of the incomplete ones so that we're able to implement exactly-once semantics for certain rpcs. Change-Id: I23201625ca02f244dc94205d88dabc01608de471 Reviewed-on: http://gerrit.cloudera.org:8080/3078 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/9d5ce002 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/9d5ce002 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/9d5ce002 Branch: refs/heads/master Commit: 9d5ce0027eeede5ef906545d9abc258041309d7d Parents: ab500fc Author: David Alves Authored: Tue May 3 13:23:08 2016 -0700 Committer: David Ribeiro Alves Committed: Wed May 25 03:08:02 2016 +0000 ---------------------------------------------------------------------- src/kudu/rpc/CMakeLists.txt | 2 + src/kudu/rpc/request_tracker-test.cc | 89 +++++++++++++++++++++++++++++++ src/kudu/rpc/request_tracker.cc | 49 +++++++++++++++++ src/kudu/rpc/request_tracker.h | 84 +++++++++++++++++++++++++++++ 4 files changed, 224 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt index 6dae57e..8d2037e 100644 --- a/src/kudu/rpc/CMakeLists.txt +++ b/src/kudu/rpc/CMakeLists.txt @@ -53,6 +53,7 @@ set(KRPC_SRCS proxy.cc reactor.cc remote_method.cc + request_tracker.cc rpc.cc rpc_context.cc rpc_controller.cc @@ -113,6 +114,7 @@ target_link_libraries(rtest_krpc set(KUDU_TEST_LINK_LIBS rtest_krpc krpc ${KUDU_MIN_TEST_LIBS}) ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true) ADD_KUDU_TEST(reactor-test) +ADD_KUDU_TEST(request_tracker-test) ADD_KUDU_TEST(rpc-bench RUN_SERIAL true) ADD_KUDU_TEST(rpc-test) ADD_KUDU_TEST(rpc_stub-test) http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/request_tracker-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/request_tracker-test.cc b/src/kudu/rpc/request_tracker-test.cc new file mode 100644 index 0000000..19efef6 --- /dev/null +++ b/src/kudu/rpc/request_tracker-test.cc @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "kudu/rpc/request_tracker.h" +#include "kudu/util/test_util.h" + +using std::vector; + +namespace kudu { +namespace rpc { + +TEST(RequestTrackerTest, TestSequenceNumberGeneration) { + const int MAX = 10; + + scoped_refptr tracker_(new RequestTracker("test_client")); + + // A new tracker should have no incomplete RPCs + ASSERT_TRUE(tracker_->FirstIncomplete(nullptr).IsNotFound()); + + vector generated_seq_nos; + + // Generate MAX in flight RPCs, making sure they are correctly returned. + for (int i = 0; i < MAX; i++) { + RequestTracker::SequenceNumber seq_no; + ASSERT_OK(tracker_->NewSeqNo(&seq_no)); + generated_seq_nos.push_back(seq_no); + } + + // Now we should get a first incomplete. + RequestTracker::SequenceNumber first_incomplete; + ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete)); + ASSERT_EQ(generated_seq_nos[0], first_incomplete); + + // Marking 'first_incomplete' as done, should advance the first incomplete. + tracker_->RpcCompleted(first_incomplete); + + ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete)); + ASSERT_EQ(generated_seq_nos[1], first_incomplete); + + // Marking a 'middle' rpc, should not advance 'first_incomplete'. + tracker_->RpcCompleted(generated_seq_nos[5]); + ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete)); + ASSERT_EQ(generated_seq_nos[1], first_incomplete); + + // Marking half the rpc as complete should advance FirstIncomplete. + // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that + // marking the same sequence number as complete twice is a no-op. + for (int i = 0; i < MAX / 2; i++) { + tracker_->RpcCompleted(generated_seq_nos[i]); + } + + ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete)); + ASSERT_EQ(generated_seq_nos[6], first_incomplete); + + for (int i = MAX / 2; i <= MAX; i++) { + RequestTracker::SequenceNumber seq_no; + ASSERT_OK(tracker_->NewSeqNo(&seq_no)); + generated_seq_nos.push_back(seq_no); + } + + // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return + // Status::NotFound() again. + for (auto seq_no : generated_seq_nos) { + tracker_->RpcCompleted(seq_no); + } + + ASSERT_TRUE(tracker_->FirstIncomplete(nullptr).IsNotFound()); +} + +} // namespace rpc +} // namespace kudu + http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/request_tracker.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/request_tracker.cc b/src/kudu/rpc/request_tracker.cc new file mode 100644 index 0000000..4774c04 --- /dev/null +++ b/src/kudu/rpc/request_tracker.cc @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/request_tracker.h" +#include "kudu/gutil/map-util.h" + +namespace kudu { +namespace rpc { + +RequestTracker::RequestTracker(const string& client_id) + : client_id_(client_id), + next_(0) {} + +Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) { + lock_guard l(&lock_); + *seq_no = next_; + InsertOrDie(&incomplete_rpcs_, *seq_no); + next_++; + return Status::OK(); +} + +Status RequestTracker::FirstIncomplete(SequenceNumber* seq_no) { + lock_guard l(&lock_); + if (incomplete_rpcs_.empty()) return Status::NotFound("There are no incomplete RPCs"); + *seq_no = *incomplete_rpcs_.begin(); + return Status::OK(); +} + +void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) { + lock_guard l(&lock_); + incomplete_rpcs_.erase(seq_no); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/request_tracker.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/request_tracker.h b/src/kudu/rpc/request_tracker.h new file mode 100644 index 0000000..8147e21 --- /dev/null +++ b/src/kudu/rpc/request_tracker.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +#include "kudu/util/locks.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +// RequestTracker implementation, inspired by: +// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al. +// +// This generates sequence numbers for retriable RPCs and tracks the ongoing ones. +// The main point of this is to enable exactly-once semantics, i.e. making sure that +// an RPC is only executed once, by uniquely identifying each RPC that is sent to +// the server. +// +// Note that the sequence numbers here are differet from RPC 'call ids'. A call id +// uniquely identifies a call _to a server_. All calls have a call id that is +// assigned incrementally. Sequence numbers, on the other hand, uniquely identify +// the RPC operation itself. That is, if an RPC is retried on another server it will +// have a different call id, but the same sequence number. +// +// By keeping track of the RPCs that are in-flight and which ones are completed +// we can determine the first incomplete RPC. When this information is sent +// to the server it can use it to garbage collect RPC results that it might be +// saving for future retries, since it now knows there won't be any. +// +// This class is thread safe. +class RequestTracker : public RefCountedThreadSafe { + public: + typedef int64_t SequenceNumber; + explicit RequestTracker(const std::string& client_id); + + // Creates a new, unique, sequence number. + // Sequence numbers are assigned in increasing integer order. + // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number + // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case + // the caller should try again later. + Status NewSeqNo(SequenceNumber* seq_no); + + // Returns the sequence number of the first incomplete RPC. + // If there is no incomplete RPC returns Status::NotFound. 'seq_no' is not set. + Status FirstIncomplete(SequenceNumber* seq_no); + + // Marks the rpc with 'seq_no' as completed. + void RpcCompleted(const SequenceNumber& seq_no); + + // Returns the client id for this request tracker. + const std::string& client_id() { return client_id_; } + private: + // The client id for this request tracker. + const std::string client_id_; + + // Lock that protects all non-const fields. + simple_spinlock lock_; + + // The next sequence number. + SequenceNumber next_; + + // The (ordered) set of incomplete RPCs. + std::set incomplete_rpcs_; +}; + +} // namespace rpc +} // namespace kudu