kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/3] incubator-kudu git commit: Add a request tracker to track client rpc sequence numbers
Date Wed, 25 May 2016 05:36:18 GMT
Add a request tracker to track client rpc sequence numbers

This adds a new component to the rpc subsystem: the RequestTracker. This is inspired
by the RequestTracker in "Implementing Linearizability at Large Scale and Low Latency"
by Colin Lee et al. and is responsible for generating new sequence numbers for rpcs
and to keep track of the incomplete ones so that we're able to implement exactly-once
semantics for certain rpcs.

Change-Id: I23201625ca02f244dc94205d88dabc01608de471
Reviewed-on: http://gerrit.cloudera.org:8080/3078
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/9d5ce002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/9d5ce002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/9d5ce002

Branch: refs/heads/master
Commit: 9d5ce0027eeede5ef906545d9abc258041309d7d
Parents: ab500fc
Author: David Alves <david.alves@cloudera.com>
Authored: Tue May 3 13:23:08 2016 -0700
Committer: David Ribeiro Alves <david.alves@cloudera.com>
Committed: Wed May 25 03:08:02 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/CMakeLists.txt          |  2 +
 src/kudu/rpc/request_tracker-test.cc | 89 +++++++++++++++++++++++++++++++
 src/kudu/rpc/request_tracker.cc      | 49 +++++++++++++++++
 src/kudu/rpc/request_tracker.h       | 84 +++++++++++++++++++++++++++++
 4 files changed, 224 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 6dae57e..8d2037e 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -53,6 +53,7 @@ set(KRPC_SRCS
     proxy.cc
     reactor.cc
     remote_method.cc
+    request_tracker.cc
     rpc.cc
     rpc_context.cc
     rpc_controller.cc
@@ -113,6 +114,7 @@ target_link_libraries(rtest_krpc
 set(KUDU_TEST_LINK_LIBS rtest_krpc krpc ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
 ADD_KUDU_TEST(reactor-test)
+ADD_KUDU_TEST(request_tracker-test)
 ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
 ADD_KUDU_TEST(rpc-test)
 ADD_KUDU_TEST(rpc_stub-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/request_tracker-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/request_tracker-test.cc b/src/kudu/rpc/request_tracker-test.cc
new file mode 100644
index 0000000..19efef6
--- /dev/null
+++ b/src/kudu/rpc/request_tracker-test.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <vector>
+
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+TEST(RequestTrackerTest, TestSequenceNumberGeneration) {
+  const int MAX = 10;
+
+  scoped_refptr<RequestTracker> tracker_(new RequestTracker("test_client"));
+
+  // A new tracker should have no incomplete RPCs
+  ASSERT_TRUE(tracker_->FirstIncomplete(nullptr).IsNotFound());
+
+  vector<RequestTracker::SequenceNumber> generated_seq_nos;
+
+  // Generate MAX in flight RPCs, making sure they are correctly returned.
+  for (int i = 0; i < MAX; i++) {
+    RequestTracker::SequenceNumber seq_no;
+    ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+    generated_seq_nos.push_back(seq_no);
+  }
+
+  // Now we should get a first incomplete.
+  RequestTracker::SequenceNumber first_incomplete;
+  ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete));
+  ASSERT_EQ(generated_seq_nos[0], first_incomplete);
+
+  // Marking 'first_incomplete' as done, should advance the first incomplete.
+  tracker_->RpcCompleted(first_incomplete);
+
+  ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete));
+  ASSERT_EQ(generated_seq_nos[1], first_incomplete);
+
+  // Marking a 'middle' rpc, should not advance 'first_incomplete'.
+  tracker_->RpcCompleted(generated_seq_nos[5]);
+  ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete));
+  ASSERT_EQ(generated_seq_nos[1], first_incomplete);
+
+  // Marking half the rpc as complete should advance FirstIncomplete.
+  // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that
+  // marking the same sequence number as complete twice is a no-op.
+  for (int i = 0; i < MAX / 2; i++) {
+    tracker_->RpcCompleted(generated_seq_nos[i]);
+  }
+
+  ASSERT_OK(tracker_->FirstIncomplete(&first_incomplete));
+  ASSERT_EQ(generated_seq_nos[6], first_incomplete);
+
+  for (int i = MAX / 2; i <= MAX; i++) {
+    RequestTracker::SequenceNumber seq_no;
+    ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+    generated_seq_nos.push_back(seq_no);
+  }
+
+  // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return
+  // Status::NotFound() again.
+  for (auto seq_no : generated_seq_nos) {
+    tracker_->RpcCompleted(seq_no);
+  }
+
+  ASSERT_TRUE(tracker_->FirstIncomplete(nullptr).IsNotFound());
+}
+
+} // namespace rpc
+} // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/request_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/request_tracker.cc b/src/kudu/rpc/request_tracker.cc
new file mode 100644
index 0000000..4774c04
--- /dev/null
+++ b/src/kudu/rpc/request_tracker.cc
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/gutil/map-util.h"
+
+namespace kudu {
+namespace rpc {
+
+RequestTracker::RequestTracker(const string& client_id)
+    : client_id_(client_id),
+      next_(0) {}
+
+Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) {
+  lock_guard<simple_spinlock> l(&lock_);
+  *seq_no = next_;
+  InsertOrDie(&incomplete_rpcs_, *seq_no);
+  next_++;
+  return Status::OK();
+}
+
+Status RequestTracker::FirstIncomplete(SequenceNumber* seq_no) {
+  lock_guard<simple_spinlock> l(&lock_);
+  if (incomplete_rpcs_.empty()) return Status::NotFound("There are no incomplete RPCs");
+  *seq_no = *incomplete_rpcs_.begin();
+  return Status::OK();
+}
+
+void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) {
+  lock_guard<simple_spinlock> l(&lock_);
+  incomplete_rpcs_.erase(seq_no);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/9d5ce002/src/kudu/rpc/request_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/request_tracker.h b/src/kudu/rpc/request_tracker.h
new file mode 100644
index 0000000..8147e21
--- /dev/null
+++ b/src/kudu/rpc/request_tracker.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <set>
+#include <string>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// RequestTracker implementation, inspired by:
+// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al.
+//
+// This generates sequence numbers for retriable RPCs and tracks the ongoing ones.
+// The main point of this is to enable exactly-once semantics, i.e. making sure that
+// an RPC is only executed once, by uniquely identifying each RPC that is sent to
+// the server.
+//
+// Note that the sequence numbers here are differet from RPC 'call ids'. A call id
+// uniquely identifies a call _to a server_. All calls have a call id that is
+// assigned incrementally. Sequence numbers, on the other hand, uniquely identify
+// the RPC operation itself. That is, if an RPC is retried on another server it will
+// have a different call id, but the same sequence number.
+//
+// By keeping track of the RPCs that are in-flight and which ones are completed
+// we can determine the first incomplete RPC. When this information is sent
+// to the server it can use it to garbage collect RPC results that it might be
+// saving for future retries, since it now knows there won't be any.
+//
+// This class is thread safe.
+class RequestTracker : public RefCountedThreadSafe<RequestTracker> {
+ public:
+  typedef int64_t SequenceNumber;
+  explicit RequestTracker(const std::string& client_id);
+
+  // Creates a new, unique, sequence number.
+  // Sequence numbers are assigned in increasing integer order.
+  // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number
+  // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case
+  // the caller should try again later.
+  Status NewSeqNo(SequenceNumber* seq_no);
+
+  // Returns the sequence number of the first incomplete RPC.
+  // If there is no incomplete RPC returns Status::NotFound. 'seq_no' is not set.
+  Status FirstIncomplete(SequenceNumber* seq_no);
+
+  // Marks the rpc with 'seq_no' as completed.
+  void RpcCompleted(const SequenceNumber& seq_no);
+
+  // Returns the client id for this request tracker.
+  const std::string& client_id() { return client_id_; }
+ private:
+  // The client id for this request tracker.
+  const std::string client_id_;
+
+  // Lock that protects all non-const fields.
+  simple_spinlock lock_;
+
+  // The next sequence number.
+  SequenceNumber next_;
+
+  // The (ordered) set of incomplete RPCs.
+  std::set<SequenceNumber> incomplete_rpcs_;
+};
+
+} // namespace rpc
+} // namespace kudu


Mime
View raw message