kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] kudu git commit: Move 'master_rpc.{h, cc}' into the client/ module
Date Thu, 02 Feb 2017 22:12:42 GMT
Move 'master_rpc.{h,cc}' into the client/ module

This file has a couple of classes which are in fact only used by the
client and by the ExternalMiniCluster. Previously they'd been used by
the tablet server as well, so we ended up implementing them not in
'client/' but in 'master/'.

Now that the tablet server doesn't use these classes (it instead
heartbeats to all masters in parallel), it doesn't make much sense
anymore for them to be in the master/ directory.

This patch moves it into client/ and puts it in the
kudu::client::internal namespace. Additionally, one of the two classes
here is only used internally by the other, so I moved that into an
anonymous namespace in the .cc file.

This is in preparation for some surgery on this file to convert it into
more of a "connect to cluster" type RPC which will also be responsible
for grabbing an authentication token, the master CA list, etc.

Change-Id: Ic3a1db6aeb5326e6dfb9986b08949fa349b5f74d
Reviewed-on: http://gerrit.cloudera.org:8080/5864
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e4a649c2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e4a649c2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e4a649c2

Branch: refs/heads/master
Commit: e4a649c205d46a3f2b06fba475f1325127ade555
Parents: d3e3786
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Feb 1 17:07:56 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Feb 2 21:49:07 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/CMakeLists.txt                  |   2 +-
 src/kudu/client/client-internal.cc              |   6 +-
 src/kudu/client/client-internal.h               |  10 +-
 src/kudu/client/master_rpc.cc                   | 283 +++++++++++++++++++
 src/kudu/client/master_rpc.h                    | 119 ++++++++
 .../integration-tests/external_mini_cluster.cc  |   4 +-
 src/kudu/master/CMakeLists.txt                  |  13 -
 src/kudu/master/master_rpc.cc                   | 243 ----------------
 src/kudu/master/master_rpc.h                    | 152 ----------
 src/kudu/tserver/CMakeLists.txt                 |   1 -
 src/kudu/tserver/heartbeater.cc                 |  16 +-
 11 files changed, 419 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index d0ebf12..e7dd8ae 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -34,6 +34,7 @@ set(CLIENT_SRCS
   client-internal.cc
   error_collector.cc
   error-internal.cc
+  master_rpc.cc
   meta_cache.cc
   scan_batch.cc
   scan_configuration.cc
@@ -60,7 +61,6 @@ set(CLIENT_LIBS
   kudu_common
   kudu_util
   master_proto
-  master_rpc
   tserver_proto
   tserver_service_proto)
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 54ac7c5..f53f540 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "kudu/client/master_rpc.h"
 #include "kudu/client/meta_cache.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
@@ -33,7 +34,6 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/master/master.h"
-#include "kudu/master/master_rpc.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/rpc/request_tracker.h"
@@ -61,7 +61,6 @@ using master::CreateTableRequestPB;
 using master::CreateTableResponsePB;
 using master::DeleteTableRequestPB;
 using master::DeleteTableResponsePB;
-using master::GetLeaderMasterRpc;
 using master::GetTableSchemaRequestPB;
 using master::GetTableSchemaResponsePB;
 using master::GetTabletLocationsRequestPB;
@@ -84,6 +83,7 @@ using strings::Substitute;
 
 namespace client {
 
+using internal::GetLeaderMasterRpc;
 using internal::GetTableSchemaRpc;
 using internal::RemoteTablet;
 using internal::RemoteTabletServer;
@@ -669,7 +669,7 @@ void KuduClient::Data::SetMasterServerProxyAsync(KuduClient* client,
   leader_master_callbacks_.push_back(cb);
   if (!leader_master_rpc_) {
     // No one is sending a request yet - we need to be the one to do it.
-    leader_master_rpc_.reset(new GetLeaderMasterRpc(
+    leader_master_rpc_.reset(new internal::GetLeaderMasterRpc(
                                Bind(&KuduClient::Data::LeaderMasterDetermined,
                                     Unretained(this)),
                                std::move(master_sockaddrs),

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 561cc67..1fbf15e 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -40,7 +40,6 @@ class HostPort;
 namespace master {
 class AlterTableRequestPB;
 class CreateTableRequestPB;
-class GetLeaderMasterRpc;
 class MasterServiceProxy;
 } // namespace master
 
@@ -52,6 +51,10 @@ class RpcController;
 
 namespace client {
 
+namespace internal {
+class GetLeaderMasterRpc;
+} // namespace internal
+
 class KuduClient::Data {
  public:
   Data();
@@ -154,8 +157,7 @@ class KuduClient::Data {
   // NOTE: since this uses a Synchronizer, this may not be invoked by
   // a method that's on a reactor thread.
   //
-  // TODO (KUDU-492): Get rid of this method and re-factor the client
-  // to lazily initialize 'master_proxy_'.
+  // TODO(todd): rename to ReconnectToMasters or something
   Status SetMasterServerProxy(KuduClient* client,
                               const MonoTime& deadline);
 
@@ -228,7 +230,7 @@ class KuduClient::Data {
   // Ref-counted RPC instance: since 'SetMasterServerProxyAsync' call
   // is asynchronous, we need to hold a reference in this class
   // itself, as to avoid a "use-after-free" scenario.
-  scoped_refptr<master::GetLeaderMasterRpc> leader_master_rpc_;
+  scoped_refptr<internal::GetLeaderMasterRpc> leader_master_rpc_;
   std::vector<StatusCallback> leader_master_callbacks_;
 
   // Protects 'leader_master_rpc_', 'leader_master_hostport_',

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
new file mode 100644
index 0000000..5878c58
--- /dev/null
+++ b/src/kudu/client/master_rpc.cc
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This module is internal to the client and not a public API.
+
+#include "kudu/client/master_rpc.h"
+
+#include <boost/bind.hpp>
+#include <mutex>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
+
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+using kudu::consensus::RaftPeerPB;
+using kudu::master::GetMasterRegistrationRequestPB;
+using kudu::master::GetMasterRegistrationResponsePB;
+using kudu::master::MasterErrorPB;
+using kudu::master::MasterServiceProxy;
+using kudu::rpc::Messenger;
+using kudu::rpc::Rpc;
+
+namespace kudu {
+namespace client {
+namespace internal {
+
+////////////////////////////////////////////////////////////
+// GetMasterRegistrationRpc
+////////////////////////////////////////////////////////////
+namespace {
+
+// An RPC for getting a Master server's registration.
+class GetMasterRegistrationRpc : public rpc::Rpc {
+ public:
+
+  // Create a wrapper object for a retriable GetMasterRegistration RPC
+  // to 'addr'. The result is stored in 'out', which must be a valid
+  // pointer for the lifetime of this object.
+  //
+  // Invokes 'user_cb' upon failure or success of the RPC call.
+  GetMasterRegistrationRpc(StatusCallback user_cb, Sockaddr addr,
+                           const MonoTime& deadline,
+                           const std::shared_ptr<rpc::Messenger>& messenger,
+                           ServerEntryPB* out);
+
+  ~GetMasterRegistrationRpc();
+
+  virtual void SendRpc() OVERRIDE;
+
+  virtual std::string ToString() const OVERRIDE;
+
+ private:
+  virtual void SendRpcCb(const Status& status) OVERRIDE;
+
+  StatusCallback user_cb_;
+  Sockaddr addr_;
+
+  ServerEntryPB* out_;
+
+  GetMasterRegistrationResponsePB resp_;
+};
+
+
+GetMasterRegistrationRpc::GetMasterRegistrationRpc(
+    StatusCallback user_cb, Sockaddr addr, const MonoTime& deadline,
+    const shared_ptr<Messenger>& messenger, ServerEntryPB* out)
+    : Rpc(deadline, messenger),
+      user_cb_(std::move(user_cb)),
+      addr_(std::move(addr)),
+      out_(DCHECK_NOTNULL(out)) {}
+
+GetMasterRegistrationRpc::~GetMasterRegistrationRpc() {
+}
+
+void GetMasterRegistrationRpc::SendRpc() {
+  MasterServiceProxy proxy(retrier().messenger(),
+                           addr_);
+  GetMasterRegistrationRequestPB req;
+  proxy.GetMasterRegistrationAsync(req, &resp_,
+                                   mutable_retrier()->mutable_controller(),
+                                   boost::bind(&GetMasterRegistrationRpc::SendRpcCb,
+                                               this,
+                                               Status::OK()));
+}
+
+string GetMasterRegistrationRpc::ToString() const {
+  return strings::Substitute("GetMasterRegistrationRpc(address: $0, num_attempts: $1)",
+                             addr_.ToString(), num_attempts());
+}
+
+void GetMasterRegistrationRpc::SendRpcCb(const Status& status) {
+  gscoped_ptr<GetMasterRegistrationRpc> deleter(this);
+  Status new_status = status;
+  if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status))
{
+    ignore_result(deleter.release());
+    return;
+  }
+  if (new_status.ok() && resp_.has_error()) {
+    if (resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
+      // If CatalogManager is not initialized, treat the node as a
+      // FOLLOWER for the time being, as currently this RPC is only
+      // used for the purposes of finding the leader master.
+      resp_.set_role(RaftPeerPB::FOLLOWER);
+      new_status = Status::OK();
+    } else {
+      out_->mutable_error()->CopyFrom(resp_.error().status());
+      new_status = StatusFromPB(resp_.error().status());
+    }
+  }
+  if (new_status.ok()) {
+    out_->mutable_instance_id()->CopyFrom(resp_.instance_id());
+    out_->mutable_registration()->CopyFrom(resp_.registration());
+    out_->set_role(resp_.role());
+  }
+  user_cb_.Run(new_status);
+}
+
+} // anonymous namespace
+
+////////////////////////////////////////////////////////////
+// GetLeaderMasterRpc
+////////////////////////////////////////////////////////////
+
+GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb,
+                                       vector<Sockaddr> addrs,
+                                       MonoTime deadline,
+                                       MonoDelta rpc_timeout,
+                                       shared_ptr<Messenger> messenger)
+    : Rpc(std::move(deadline), std::move(messenger)),
+      user_cb_(std::move(user_cb)),
+      addrs_(std::move(addrs)),
+      rpc_timeout_(std::move(rpc_timeout)),
+      pending_responses_(0),
+      completed_(false) {
+  DCHECK(deadline.Initialized());
+
+  // Using resize instead of reserve to explicitly initialized the values.
+  responses_.resize(addrs_.size());
+}
+
+GetLeaderMasterRpc::~GetLeaderMasterRpc() {
+}
+
+string GetLeaderMasterRpc::ToString() const {
+  vector<string> sockaddr_str;
+  for (const Sockaddr& addr : addrs_) {
+    sockaddr_str.push_back(addr.ToString());
+  }
+  return strings::Substitute("GetLeaderMasterRpc(addrs: $0, num_attempts: $1)",
+                             JoinStrings(sockaddr_str, ","),
+                             num_attempts());
+}
+
+void GetLeaderMasterRpc::SendRpc() {
+  // Compute the actual deadline to use for each RPC.
+  MonoTime rpc_deadline = MonoTime::Now() + rpc_timeout_;
+  MonoTime actual_deadline = MonoTime::Earliest(retrier().deadline(),
+                                                rpc_deadline);
+
+  std::lock_guard<simple_spinlock> l(lock_);
+  for (int i = 0; i < addrs_.size(); i++) {
+    GetMasterRegistrationRpc* rpc = new GetMasterRegistrationRpc(
+        Bind(&GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode,
+             this, ConstRef(addrs_[i]), ConstRef(responses_[i])),
+        addrs_[i],
+        actual_deadline,
+        retrier().messenger(),
+        &responses_[i]);
+    rpc->SendRpc();
+    ++pending_responses_;
+  }
+}
+
+void GetLeaderMasterRpc::SendRpcCb(const Status& status) {
+  // To safely retry, we must reset completed_ so that it can be reused in the
+  // next round of RPCs.
+  //
+  // The SendRpcCb invariant (see GetMasterRegistrationRpcCbForNode comments)
+  // implies that if we're to retry, we must be the last response. Thus, it is
+  // safe to reset completed_ in this case; there's no danger of a late
+  // response reading it and entering SendRpcCb inadvertently.
+  auto undo_completed = MakeScopedCleanup([&]() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    completed_ = false;
+  });
+
+  // If we've received replies from all of the nodes without finding
+  // the leader, or if there were network errors talking to all of the
+  // nodes the error is retriable and we can perform a delayed retry.
+  if (status.IsNetworkError() || status.IsNotFound()) {
+    mutable_retrier()->DelayedRetry(this, status);
+    return;
+  }
+
+  // If our replies timed out but the deadline hasn't passed, retry.
+  if (status.IsTimedOut() && MonoTime::Now() < retrier().deadline()) {
+    mutable_retrier()->DelayedRetry(this, status);
+    return;
+  }
+
+  // We are not retrying.
+  undo_completed.cancel();
+  user_cb_.Run(status, leader_master_);
+}
+
+void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr,
+                                                           const ServerEntryPB& resp,
+                                                           const Status& status) {
+  // TODO: handle the situation where one Master is partitioned from
+  // the rest of the Master consensus configuration, all are reachable by the client,
+  // and the partitioned node "thinks" it's the leader.
+  //
+  // The proper way to do so is to add term/index to the responses
+  // from the Master, wait for majority of the Masters to respond, and
+  // pick the one with the highest term/index as the leader.
+  Status new_status = status;
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    if (completed_) {
+      // If 'user_cb_' has been invoked (see SendRpcCb above), we can
+      // stop.
+      return;
+    }
+    if (new_status.ok()) {
+      if (resp.role() != RaftPeerPB::LEADER) {
+        // Use a Status::NotFound() to indicate that the node is not
+        // the leader: this way, we can handle the case where we've
+        // received a reply from all of the nodes in the cluster (no
+        // network or other errors encountered), but haven't found a
+        // leader (which means that SendRpcCb() above can perform a
+        // delayed retry).
+        new_status = Status::NotFound("no leader found: " + ToString());
+      } else {
+        // We've found a leader.
+        leader_master_ = HostPort(node_addr);
+        completed_ = true;
+      }
+    }
+    --pending_responses_;
+    if (!new_status.ok()) {
+      if (pending_responses_ > 0) {
+        // Don't call SendRpcCb() on error unless we're the last
+        // outstanding response: calling SendRpcCb() will trigger
+        // a delayed re-try, which don't need to do unless we've
+        // been unable to find a leader so far.
+        return;
+      } else {
+        completed_ = true;
+      }
+    }
+  }
+  // Called if the leader has been determined, or if we've received
+  // all of the responses.
+  SendRpcCb(new_status);
+}
+
+} // namespace internal
+} // namespace client
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/client/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h
new file mode 100644
index 0000000..9159a7f
--- /dev/null
+++ b/src/kudu/client/master_rpc.h
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This module is internal to the client and not a public API.
+#pragma once
+
+#include <vector>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+
+namespace kudu {
+
+class ServerEntryPB;
+class HostPort;
+
+namespace client {
+namespace internal {
+
+// In parallel, send requests to the specified Master servers until a
+// response comes back from the leader of the Master consensus configuration.
+//
+// If queries have been made to all of the specified servers, but no
+// leader has been found, we re-try again (with an increasing delay,
+// see: RpcRetrier in kudu/rpc/rpc.{cc,h}) until a specified deadline
+// passes or we find a leader.
+//
+// The RPCs are sent in parallel in order to avoid prolonged delays on
+// the client-side that would happen with a serial approach when one
+// of the Master servers is slow or stopped (that is, when we have to
+// wait for an RPC request to server N to timeout before we can make
+// an RPC request to server N+1). This allows for true fault tolerance
+// for the Kudu client.
+//
+// The class is reference counted to avoid a "use-after-free"
+// scenario, when responses to the RPC return to the caller _after_ a
+// leader has already been found.
+class GetLeaderMasterRpc : public rpc::Rpc,
+                           public RefCountedThreadSafe<GetLeaderMasterRpc> {
+ public:
+  typedef Callback<void(const Status&, const HostPort&)> LeaderCallback;
+  // The host and port of the leader master server is stored in
+  // 'leader_master', which must remain valid for the lifetime of this
+  // object.
+  //
+  // Calls 'user_cb' when the leader is found, or if no leader can be found
+  // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete
+  // before it times out and may be retried if 'deadline' has not yet passed.
+  GetLeaderMasterRpc(LeaderCallback user_cb,
+                     std::vector<Sockaddr> addrs,
+                     MonoTime deadline,
+                     MonoDelta rpc_timeout,
+                     std::shared_ptr<rpc::Messenger> messenger);
+
+  virtual void SendRpc() OVERRIDE;
+
+  virtual std::string ToString() const OVERRIDE;
+ private:
+  friend class RefCountedThreadSafe<GetLeaderMasterRpc>;
+  ~GetLeaderMasterRpc();
+
+  virtual void SendRpcCb(const Status& status) OVERRIDE;
+
+  // Invoked when a response comes back from a Master with address
+  // 'node_addr'.
+  //
+  // Invokes SendRpcCb if the response indicates that the specified
+  // master is a leader, or if responses have been received from all
+  // of the Masters.
+  void GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr,
+                                         const ServerEntryPB& resp,
+                                         const Status& status);
+
+  LeaderCallback user_cb_;
+  std::vector<Sockaddr> addrs_;
+
+  HostPort leader_master_;
+
+  // The amount of time alloted to each GetMasterRegistration RPC.
+  MonoDelta rpc_timeout_;
+
+  // The received responses.
+  //
+  // See also: GetMasterRegistrationRpc above.
+  std::vector<ServerEntryPB> responses_;
+
+  // Number of pending responses.
+  int pending_responses_;
+
+  // If true, then we've already executed the user callback and the
+  // RPC can be deallocated.
+  bool completed_;
+
+  // Protects 'pending_responses_' and 'completed_'.
+  mutable simple_spinlock lock_;
+};
+
+} // namespace internal
+} // namespace client
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index 71e3fee..a9ee499 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -25,13 +25,13 @@
 #include <unordered_set>
 
 #include "kudu/client/client.h"
+#include "kudu/client/master_rpc.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/master/master.proxy.h"
-#include "kudu/master/master_rpc.h"
 #include "kudu/security/test/mini_kdc.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/server/server_base.proxy.h"
@@ -51,7 +51,7 @@
 #include "kudu/util/subprocess.h"
 #include "kudu/util/test_util.h"
 
-using kudu::master::GetLeaderMasterRpc;
+using kudu::client::internal::GetLeaderMasterRpc;
 using kudu::master::ListTablesRequestPB;
 using kudu::master::ListTablesResponsePB;
 using kudu::master::MasterServiceProxy;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/master/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 3d3e18d..3afc7e6 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -60,19 +60,6 @@ target_link_libraries(master
   tserver
   tserver_service_proto)
 
-set(MASTER_RPC_SRCS
-  master_rpc.cc)
-set(MASTER_RPC_LIBS
-  kudu_common
-  krpc
-  gutil
-  kudu_util
-  master_proto
-  rpc_header_proto)
-ADD_EXPORTABLE_LIBRARY(master_rpc
-  SRCS ${MASTER_RPC_SRCS}
-  DEPS ${MASTER_RPC_LIBS})
-
 # Tests
 set(KUDU_TEST_LINK_LIBS master master_proto kudu_client ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(catalog_manager-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/master/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_rpc.cc b/src/kudu/master/master_rpc.cc
deleted file mode 100644
index b978b1a..0000000
--- a/src/kudu/master/master_rpc.cc
+++ /dev/null
@@ -1,243 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// This module is internal to the client and not a public API.
-
-#include "kudu/master/master_rpc.h"
-
-#include <boost/bind.hpp>
-#include <mutex>
-
-#include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/scoped_cleanup.h"
-
-
-using std::shared_ptr;
-using std::string;
-using std::vector;
-
-using kudu::consensus::RaftPeerPB;
-using kudu::rpc::Messenger;
-using kudu::rpc::Rpc;
-
-namespace kudu {
-namespace master {
-
-////////////////////////////////////////////////////////////
-// GetMasterRegistrationRpc
-////////////////////////////////////////////////////////////
-
-GetMasterRegistrationRpc::GetMasterRegistrationRpc(
-    StatusCallback user_cb, Sockaddr addr, const MonoTime& deadline,
-    const shared_ptr<Messenger>& messenger, ServerEntryPB* out)
-    : Rpc(deadline, messenger),
-      user_cb_(std::move(user_cb)),
-      addr_(std::move(addr)),
-      out_(DCHECK_NOTNULL(out)) {}
-
-GetMasterRegistrationRpc::~GetMasterRegistrationRpc() {
-}
-
-void GetMasterRegistrationRpc::SendRpc() {
-  MasterServiceProxy proxy(retrier().messenger(),
-                           addr_);
-  GetMasterRegistrationRequestPB req;
-  proxy.GetMasterRegistrationAsync(req, &resp_,
-                                   mutable_retrier()->mutable_controller(),
-                                   boost::bind(&GetMasterRegistrationRpc::SendRpcCb,
-                                               this,
-                                               Status::OK()));
-}
-
-string GetMasterRegistrationRpc::ToString() const {
-  return strings::Substitute("GetMasterRegistrationRpc(address: $0, num_attempts: $1)",
-                             addr_.ToString(), num_attempts());
-}
-
-void GetMasterRegistrationRpc::SendRpcCb(const Status& status) {
-  gscoped_ptr<GetMasterRegistrationRpc> deleter(this);
-  Status new_status = status;
-  if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status))
{
-    ignore_result(deleter.release());
-    return;
-  }
-  if (new_status.ok() && resp_.has_error()) {
-    if (resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
-      // If CatalogManager is not initialized, treat the node as a
-      // FOLLOWER for the time being, as currently this RPC is only
-      // used for the purposes of finding the leader master.
-      resp_.set_role(RaftPeerPB::FOLLOWER);
-      new_status = Status::OK();
-    } else {
-      out_->mutable_error()->CopyFrom(resp_.error().status());
-      new_status = StatusFromPB(resp_.error().status());
-    }
-  }
-  if (new_status.ok()) {
-    out_->mutable_instance_id()->CopyFrom(resp_.instance_id());
-    out_->mutable_registration()->CopyFrom(resp_.registration());
-    out_->set_role(resp_.role());
-  }
-  user_cb_.Run(new_status);
-}
-
-////////////////////////////////////////////////////////////
-// GetLeaderMasterRpc
-////////////////////////////////////////////////////////////
-
-GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb,
-                                       vector<Sockaddr> addrs,
-                                       MonoTime deadline,
-                                       MonoDelta rpc_timeout,
-                                       shared_ptr<Messenger> messenger)
-    : Rpc(std::move(deadline), std::move(messenger)),
-      user_cb_(std::move(user_cb)),
-      addrs_(std::move(addrs)),
-      rpc_timeout_(std::move(rpc_timeout)),
-      pending_responses_(0),
-      completed_(false) {
-  DCHECK(deadline.Initialized());
-
-  // Using resize instead of reserve to explicitly initialized the values.
-  responses_.resize(addrs_.size());
-}
-
-GetLeaderMasterRpc::~GetLeaderMasterRpc() {
-}
-
-string GetLeaderMasterRpc::ToString() const {
-  vector<string> sockaddr_str;
-  for (const Sockaddr& addr : addrs_) {
-    sockaddr_str.push_back(addr.ToString());
-  }
-  return strings::Substitute("GetLeaderMasterRpc(addrs: $0, num_attempts: $1)",
-                             JoinStrings(sockaddr_str, ","),
-                             num_attempts());
-}
-
-void GetLeaderMasterRpc::SendRpc() {
-  // Compute the actual deadline to use for each RPC.
-  MonoTime rpc_deadline = MonoTime::Now() + rpc_timeout_;
-  MonoTime actual_deadline = MonoTime::Earliest(retrier().deadline(),
-                                                rpc_deadline);
-
-  std::lock_guard<simple_spinlock> l(lock_);
-  for (int i = 0; i < addrs_.size(); i++) {
-    GetMasterRegistrationRpc* rpc = new GetMasterRegistrationRpc(
-        Bind(&GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode,
-             this, ConstRef(addrs_[i]), ConstRef(responses_[i])),
-        addrs_[i],
-        actual_deadline,
-        retrier().messenger(),
-        &responses_[i]);
-    rpc->SendRpc();
-    ++pending_responses_;
-  }
-}
-
-void GetLeaderMasterRpc::SendRpcCb(const Status& status) {
-  // To safely retry, we must reset completed_ so that it can be reused in the
-  // next round of RPCs.
-  //
-  // The SendRpcCb invariant (see GetMasterRegistrationRpcCbForNode comments)
-  // implies that if we're to retry, we must be the last response. Thus, it is
-  // safe to reset completed_ in this case; there's no danger of a late
-  // response reading it and entering SendRpcCb inadvertently.
-  auto undo_completed = MakeScopedCleanup([&]() {
-    std::lock_guard<simple_spinlock> l(lock_);
-    completed_ = false;
-  });
-
-  // If we've received replies from all of the nodes without finding
-  // the leader, or if there were network errors talking to all of the
-  // nodes the error is retriable and we can perform a delayed retry.
-  if (status.IsNetworkError() || status.IsNotFound()) {
-    mutable_retrier()->DelayedRetry(this, status);
-    return;
-  }
-
-  // If our replies timed out but the deadline hasn't passed, retry.
-  if (status.IsTimedOut() && MonoTime::Now() < retrier().deadline()) {
-    mutable_retrier()->DelayedRetry(this, status);
-    return;
-  }
-
-  // We are not retrying.
-  undo_completed.cancel();
-  user_cb_.Run(status, leader_master_);
-}
-
-void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr,
-                                                           const ServerEntryPB& resp,
-                                                           const Status& status) {
-  // TODO: handle the situation where one Master is partitioned from
-  // the rest of the Master consensus configuration, all are reachable by the client,
-  // and the partitioned node "thinks" it's the leader.
-  //
-  // The proper way to do so is to add term/index to the responses
-  // from the Master, wait for majority of the Masters to respond, and
-  // pick the one with the highest term/index as the leader.
-  Status new_status = status;
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    if (completed_) {
-      // If 'user_cb_' has been invoked (see SendRpcCb above), we can
-      // stop.
-      return;
-    }
-    if (new_status.ok()) {
-      if (resp.role() != RaftPeerPB::LEADER) {
-        // Use a Status::NotFound() to indicate that the node is not
-        // the leader: this way, we can handle the case where we've
-        // received a reply from all of the nodes in the cluster (no
-        // network or other errors encountered), but haven't found a
-        // leader (which means that SendRpcCb() above can perform a
-        // delayed retry).
-        new_status = Status::NotFound("no leader found: " + ToString());
-      } else {
-        // We've found a leader.
-        leader_master_ = HostPort(node_addr);
-        completed_ = true;
-      }
-    }
-    --pending_responses_;
-    if (!new_status.ok()) {
-      if (pending_responses_ > 0) {
-        // Don't call SendRpcCb() on error unless we're the last
-        // outstanding response: calling SendRpcCb() will trigger
-        // a delayed re-try, which don't need to do unless we've
-        // been unable to find a leader so far.
-        return;
-      } else {
-        completed_ = true;
-      }
-    }
-  }
-  // Called if the leader has been determined, or if we've received
-  // all of the responses.
-  SendRpcCb(new_status);
-}
-
-
-} // namespace master
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/master/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_rpc.h b/src/kudu/master/master_rpc.h
deleted file mode 100644
index cb300b0..0000000
--- a/src/kudu/master/master_rpc.h
+++ /dev/null
@@ -1,152 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// This module is internal to the client and not a public API.
-#ifndef KUDU_MASTER_MASTER_RPC_H
-#define KUDU_MASTER_MASTER_RPC_H
-
-#include <vector>
-#include <string>
-
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/rpc/rpc.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/net/sockaddr.h"
-
-
-namespace kudu {
-
-class ServerEntryPB;
-class HostPort;
-
-namespace master {
-
-// An RPC for getting a Master server's registration.
-class GetMasterRegistrationRpc : public rpc::Rpc {
- public:
-
-  // Create a wrapper object for a retriable GetMasterRegistration RPC
-  // to 'addr'. The result is stored in 'out', which must be a valid
-  // pointer for the lifetime of this object.
-  //
-  // Invokes 'user_cb' upon failure or success of the RPC call.
-  GetMasterRegistrationRpc(StatusCallback user_cb, Sockaddr addr,
-                           const MonoTime& deadline,
-                           const std::shared_ptr<rpc::Messenger>& messenger,
-                           ServerEntryPB* out);
-
-  ~GetMasterRegistrationRpc();
-
-  virtual void SendRpc() OVERRIDE;
-
-  virtual std::string ToString() const OVERRIDE;
-
- private:
-  virtual void SendRpcCb(const Status& status) OVERRIDE;
-
-  StatusCallback user_cb_;
-  Sockaddr addr_;
-
-  ServerEntryPB* out_;
-
-  GetMasterRegistrationResponsePB resp_;
-};
-
-// In parallel, send requests to the specified Master servers until a
-// response comes back from the leader of the Master consensus configuration.
-//
-// If queries have been made to all of the specified servers, but no
-// leader has been found, we re-try again (with an increasing delay,
-// see: RpcRetrier in kudu/rpc/rpc.{cc,h}) until a specified deadline
-// passes or we find a leader.
-//
-// The RPCs are sent in parallel in order to avoid prolonged delays on
-// the client-side that would happen with a serial approach when one
-// of the Master servers is slow or stopped (that is, when we have to
-// wait for an RPC request to server N to timeout before we can make
-// an RPC request to server N+1). This allows for true fault tolerance
-// for the Kudu client.
-//
-// The class is reference counted to avoid a "use-after-free"
-// scenario, when responses to the RPC return to the caller _after_ a
-// leader has already been found.
-class GetLeaderMasterRpc : public rpc::Rpc,
-                           public RefCountedThreadSafe<GetLeaderMasterRpc> {
- public:
-  typedef Callback<void(const Status&, const HostPort&)> LeaderCallback;
-  // The host and port of the leader master server is stored in
-  // 'leader_master', which must remain valid for the lifetime of this
-  // object.
-  //
-  // Calls 'user_cb' when the leader is found, or if no leader can be found
-  // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete
-  // before it times out and may be retried if 'deadline' has not yet passed.
-  GetLeaderMasterRpc(LeaderCallback user_cb,
-                     std::vector<Sockaddr> addrs,
-                     MonoTime deadline,
-                     MonoDelta rpc_timeout,
-                     std::shared_ptr<rpc::Messenger> messenger);
-
-  virtual void SendRpc() OVERRIDE;
-
-  virtual std::string ToString() const OVERRIDE;
- private:
-  friend class RefCountedThreadSafe<GetLeaderMasterRpc>;
-  ~GetLeaderMasterRpc();
-
-  virtual void SendRpcCb(const Status& status) OVERRIDE;
-
-  // Invoked when a response comes back from a Master with address
-  // 'node_addr'.
-  //
-  // Invokes SendRpcCb if the response indicates that the specified
-  // master is a leader, or if responses have been received from all
-  // of the Masters.
-  void GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr,
-                                         const ServerEntryPB& resp,
-                                         const Status& status);
-
-  LeaderCallback user_cb_;
-  std::vector<Sockaddr> addrs_;
-
-  HostPort leader_master_;
-
-  // The amount of time alloted to each GetMasterRegistration RPC.
-  MonoDelta rpc_timeout_;
-
-  // The received responses.
-  //
-  // See also: GetMasterRegistrationRpc above.
-  std::vector<ServerEntryPB> responses_;
-
-  // Number of pending responses.
-  int pending_responses_;
-
-  // If true, then we've already executed the user callback and the
-  // RPC can be deallocated.
-  bool completed_;
-
-  // Protects 'pending_responses_' and 'completed_'.
-  mutable simple_spinlock lock_;
-};
-
-} // namespace master
-} // namespace kudu
-
-#endif /* KUDU_MASTER_MASTER_RPC_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/tserver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 10b88ce..70fa9c8 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -124,7 +124,6 @@ target_link_libraries(tserver
   tserver_proto
   tserver_admin_proto
   tserver_service_proto
-  master_rpc
   master_proto
   consensus_proto
   log_proto

http://git-wip-us.apache.org/repos/asf/kudu/blob/e4a649c2/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 5edd83d..fbae7e8 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -17,18 +17,18 @@
 
 #include "kudu/tserver/heartbeater.h"
 
-#include <boost/optional.hpp>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
+#include <atomic>
 #include <memory>
 #include <string>
 #include <vector>
 
+#include <boost/optional.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/master.h"
-#include "kudu/master/master_rpc.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/security/server_cert_manager.h"
 #include "kudu/server/webserver.h"
@@ -57,12 +57,6 @@ DEFINE_int32(heartbeat_max_failures_before_backoff, 3,
              "rather than retrying.");
 TAG_FLAG(heartbeat_max_failures_before_backoff, advanced);
 
-using google::protobuf::RepeatedPtrField;
-using kudu::HostPortPB;
-using kudu::consensus::RaftPeerPB;
-using kudu::master::GetLeaderMasterRpc;
-using kudu::master::ListMastersResponsePB;
-using kudu::master::Master;
 using kudu::master::MasterServiceProxy;
 using kudu::master::TabletReportPB;
 using kudu::rpc::RpcController;


Mime
View raw message