kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [4/6] incubator-kudu git commit: Add an all-virtual ServerPicker class and a meta cache backed implementation
Date Sun, 15 May 2016 20:38:45 GMT
Add an all-virtual ServerPicker class and a meta cache backed implementation

This pulls out the replica picking code from WriteRpc and into an implementation
of the new ServerPicker, all-virtual class.

The new server picker implementation, which is backed by the meta cache, fully
readies the replica before handing it over to WriteRpc. This makes the writing
logic simpler (less callbacks) and makes the replica picking code possibly reusable.

Again, this is just a refactor of existing code so no new tests were added.

Change-Id: Id43db316606cb807ec0019c79b3bdf76fa509fe5
Reviewed-on: http://gerrit.cloudera.org:8080/3017
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6ea07fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6ea07fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6ea07fb4

Branch: refs/heads/master
Commit: 6ea07fb42b2a2b09fd73fc3562b88bd4e3b5cefb
Parents: 205e666
Author: David Alves <david.alves@cloudera.com>
Authored: Thu May 5 17:09:14 2016 -0700
Committer: David Ribeiro Alves <david.alves@cloudera.com>
Committed: Sat May 14 21:56:16 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc | 392 +++++++++++++++++++++++++---------------
 src/kudu/rpc/rpc.h         |  29 +++
 2 files changed, 272 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6ea07fb4/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index bdebe37..5121cc6 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -62,6 +62,7 @@ using rpc::Messenger;
 using rpc::RetriableRpcStatus;
 using rpc::Rpc;
 using rpc::RpcController;
+using rpc::ServerPicker;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
 using tserver::WriteResponsePB_PerRowErrorPB;
@@ -174,6 +175,186 @@ struct InFlightOp {
   }
 };
 
+// A ServerPicker for tablets servers, backed by the MetaCache.
+// Replicas are returned fully initialized and ready to be used.
+class MetaCacheServerPicker : public ServerPicker<RemoteTabletServer> {
+ public:
+  MetaCacheServerPicker(KuduClient* client,
+                        const scoped_refptr<MetaCache>& meta_cache,
+                        const KuduTable* table,
+                        RemoteTablet* const tablet)
+   : client_(client),
+     meta_cache_(meta_cache),
+     table_(table),
+     tablet_(tablet) {}
+
+  virtual ~MetaCacheServerPicker() {}
+
+  // TODO push more of this logic into RemoteTablet or at the very least make it
+  // so that we don't run the whole algorithm if no problems occurred.
+  virtual void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline)
{
+    // Choose a destination TS according to the following algorithm:
+    // 1. If the tablet metadata is stale, refresh it (goto step 5).
+    // 2. Select the leader, provided:
+    //    a. The current leader is known,
+    //    b. It hasn't failed, and
+    //    c. It isn't currently marked as a follower.
+    // 3. If there's no good leader select another replica, provided:
+    //    a. It hasn't failed, and
+    //    b. It hasn't rejected our write due to being a follower.
+    // 4. Preemptively mark the replica we selected in step 3 as "leader" in the
+    //    meta cache, so that our selection remains sticky until the next Master
+    //    metadata refresh.
+    // 5. If we're out of appropriate replicas, force a lookup to the master
+    //    to fetch new consensus configuration information.
+    // 6. When the lookup finishes, forget which replicas were followers and
+    //    retry the write (i.e. goto 2).
+    // 7. If we issue the write and it fails because the destination was a
+    //    follower, remember that fact and retry the write (i.e. goto 2).
+    // 8. Repeat steps 1-7 until the write succeeds, fails for other reasons,
+    //    or the write's deadline expires.
+    RemoteTabletServer* leader = nullptr;
+    if (!tablet_->stale()) {
+      leader = tablet_->LeaderTServer();
+      bool marked_as_follower = false;
+      {
+        lock_guard<simple_spinlock> lock(&lock_);
+        marked_as_follower = ContainsKey(followers_, leader);
+
+      }
+      if (leader && marked_as_follower) {
+        VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a
follower for a leader: "
+            << leader->ToString();
+
+        // Mark the node as a follower in the cache so that on the next go-round,
+        // LeaderTServer() will not return it as a leader unless a full metadata
+        // refresh has occurred. This also avoids LookupTabletByKey() going into
+        // "fast path" mode and not actually performing a metadata refresh from the
+        // Master when it needs to.
+        tablet_->MarkTServerAsFollower(leader);
+        leader = nullptr;
+      }
+      if (!leader) {
+        // Try to "guess" the next leader.
+        vector<RemoteTabletServer*> replicas;
+        tablet_->GetRemoteTabletServers(&replicas);
+        set<RemoteTabletServer*> followers_copy;
+        {
+          lock_guard<simple_spinlock> lock(&lock_);
+          followers_copy = followers_;
+
+        }
+        for (RemoteTabletServer* ts : replicas) {
+          if (!ContainsKey(followers_copy, ts)) {
+            leader = ts;
+            break;
+          }
+        }
+        if (leader) {
+          // Mark this next replica "preemptively" as the leader in the meta cache,
+          // so we go to it first on the next write if writing was successful.
+          VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous
leader failed. "
+              << "Preemptively marking tserver " << leader->ToString()
+              << " as leader in the meta cache.";
+          tablet_->MarkTServerAsLeader(leader);
+        }
+      }
+    }
+
+    // If we've tried all replicas, force a lookup to the master to find the
+    // new leader. This relies on some properties of LookupTabletByKey():
+    // 1. The fast path only works when there's a non-failed leader (which we
+    //    know is untrue here).
+    // 2. The slow path always fetches consensus configuration information and updates the
+    //    looked-up tablet.
+    // Put another way, we don't care about the lookup results at all; we're
+    // just using it to fetch the latest consensus configuration information.
+    //
+    // TODO: When we support tablet splits, we should let the lookup shift
+    // the write to another tablet (i.e. if it's since been split).
+    if (!leader) {
+      meta_cache_->LookupTabletByKey(
+          table_,
+          tablet_->partition().partition_key_start(),
+          deadline,
+          NULL,
+          Bind(&MetaCacheServerPicker::LookUpTabletCb, Unretained(this), callback, deadline));
+      return;
+    }
+
+    // If we have a current TS initialize the proxy.
+    // Make sure we have a working proxy before sending out the RPC.
+    leader->InitProxy(client_,
+                      Bind(&MetaCacheServerPicker::InitProxyCb,
+                           Unretained(this),
+                           callback,
+                           leader));
+  }
+
+  virtual void MarkServerFailed(RemoteTabletServer* server, const Status& status) {
+    tablet_->MarkReplicaFailed(server, status);
+  }
+
+  virtual void MarkReplicaNotLeader(RemoteTabletServer* server) {
+    {
+      lock_guard<simple_spinlock> lock(&lock_);
+      followers_.insert(server);
+    }
+  }
+
+  virtual void MarkResourceNotFound(RemoteTabletServer* server) {
+    tablet_->MarkStale();
+  }
+ private:
+
+  // Called whenever a tablet lookup in the metacache completes.
+  void LookUpTabletCb(const ServerPickedCallback& callback,
+                      const MonoTime& deadline,
+                      const Status& status) {
+    // Whenever we lookup the tablet, clear the set of followers.
+    {
+      lock_guard<simple_spinlock> lock(&lock_);
+      followers_.clear();
+    }
+
+    // If we couldn't lookup the tablet call the user callback immediately.
+    if (!status.ok()) {
+      callback.Run(status, nullptr);
+      return;
+    }
+
+    // If we could lookup the tablet run the picking method again.
+    //
+    // TODO if we add new Pick* methods the method to (re-)call needs to be passed as
+    // a callback, for now we just have PickLeader so we can call it directly.
+    PickLeader(callback, deadline);
+  }
+
+  void InitProxyCb(const ServerPickedCallback& callback,
+                   RemoteTabletServer* replica,
+                   const Status& status) {
+    callback.Run(status, replica);
+  }
+
+  // Lock protecting accesses/updates to 'followers_'.
+  mutable simple_spinlock lock_;
+
+  // Reference to the client so that we can initialize a replica proxy, when we find it.
+  KuduClient* client_;
+
+  // A ref to the meta cache.
+  scoped_refptr<MetaCache> meta_cache_;
+
+  // The table we're writing to.
+  const KuduTable* table_;
+
+  // The tablet we're picking replicas for.
+  RemoteTablet* const tablet_;
+
+  // TSs that refused writes and that were marked as followers as a consequence.
+  set<RemoteTabletServer*> followers_;
+};
+
 // A Write RPC which is in-flight to a tablet. Initially, the RPC is sent
 // to the leader replica, but it may be retried with another replica if the
 // leader fails.
@@ -182,10 +363,11 @@ struct InFlightOp {
 class WriteRpc : public Rpc {
  public:
   WriteRpc(const scoped_refptr<Batcher>& batcher,
-           RemoteTablet* const tablet,
+           const scoped_refptr<MetaCacheServerPicker>& server_picker,
            vector<InFlightOp*> ops,
            const MonoTime& deadline,
-           const shared_ptr<Messenger>& messenger);
+           const shared_ptr<Messenger>& messenger,
+           const string& tablet_id);
   virtual ~WriteRpc();
   virtual void SendRpc() OVERRIDE;
   virtual string ToString() const OVERRIDE;
@@ -195,26 +377,22 @@ class WriteRpc : public Rpc {
     // so we'll just grab the table from the first.
     return ops_[0]->write_op->table();
   }
-  const RemoteTablet* tablet() const { return tablet_; }
   const vector<InFlightOp*>& ops() const { return ops_; }
   const WriteResponsePB& resp() const { return resp_; }
+  const string& tablet_id() const { return tablet_id_; }
 
  private:
-  // Called when we finish a lookup (to find the new consensus leader). Retries
-  // the rpc after a short delay.
-  void LookupTabletCb(const Status& status);
-
-  // Called when we finish initializing a TS proxy.
-  // Sends the RPC, provided there was no error.
-  void InitTSProxyCb(const Status& status);
-
   // Analyzes the response/current status and returns a RetriableRpcStatus containing
   // an enum that can be used to choose the action to take.
   RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status);
 
   // Retries the rpc, if possible. Returns true if a retry occurred or
   // false otherwise.
-  bool RetryIfNeeded(const RetriableRpcStatus& status);
+  bool RetryIfNeeded(const RetriableRpcStatus& status,  RemoteTabletServer* replica);
+
+  // Called with an OK status when the leader has been found and the proxy initialized.
+  // Called with any other status if something failed with 'replica' set to 'nullptr'.
+  void ReplicaFoundCb(const Status& status, RemoteTabletServer* replica);
 
   virtual void SendRpcCb(const Status& status) OVERRIDE;
 
@@ -223,14 +401,7 @@ class WriteRpc : public Rpc {
   scoped_refptr<Batcher> batcher_;
 
   // The tablet that should receive this write.
-  RemoteTablet* const tablet_;
-
-  // The TS receiving the write. May change if the write is retried.
-  RemoteTabletServer* current_ts_;
-
-  // TSes that refused the write because they were followers at the time.
-  // Cleared when new consensus configuration information arrives from the master.
-  set<RemoteTabletServer*> followers_;
+  scoped_refptr<MetaCacheServerPicker> server_picker_;
 
   // Request body.
   WriteRequestPB req_;
@@ -238,24 +409,35 @@ class WriteRpc : public Rpc {
   // Response body.
   WriteResponsePB resp_;
 
+  // Keeps track of the tablet server the write was sent to.
+  // TODO Remove this and pass the used replica around. For now we need to keep this as
+  // the retrier calls the SendRpcCb directly and doesn't know the replica that was
+  // being written to.
+  RemoteTabletServer* current_ts_;
+
   // Operations which were batched into this RPC.
   // These operations are in kRequestSent state.
   vector<InFlightOp*> ops_;
+
+  // The id of the tablet being written to.
+  string tablet_id_;
 };
 
 WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
-                   RemoteTablet* const tablet,
+                   const scoped_refptr<MetaCacheServerPicker>& server_picker,
                    vector<InFlightOp*> ops,
                    const MonoTime& deadline,
-                   const shared_ptr<Messenger>& messenger)
+                   const shared_ptr<Messenger>& messenger,
+                   const string& tablet_id)
     : Rpc(deadline, messenger),
       batcher_(batcher),
-      tablet_(tablet),
-      current_ts_(NULL),
-      ops_(std::move(ops)) {
+      server_picker_(server_picker),
+      current_ts_(nullptr),
+      ops_(std::move(ops)),
+      tablet_id_(tablet_id) {
   const Schema* schema = table()->schema().schema_;
 
-  req_.set_tablet_id(tablet->tablet_id());
+  req_.set_tablet_id(tablet_id_);
   switch (batcher->external_consistency_mode()) {
     case kudu::client::KuduSession::CLIENT_PROPAGATED:
       req_.set_external_consistency_mode(kudu::CLIENT_PROPAGATED);
@@ -301,8 +483,7 @@ WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
   }
 
   if (VLOG_IS_ON(3)) {
-    VLOG(3) << "Created batch for " << tablet->tablet_id() << ":\n"
-        << req_.ShortDebugString();
+    VLOG(3) << "Created batch for " << tablet_id << ":\n" << req_.ShortDebugString();
   }
 }
 
@@ -311,126 +492,34 @@ WriteRpc::~WriteRpc() {
 }
 
 void WriteRpc::SendRpc() {
-  // Choose a destination TS according to the following algorithm:
-  // 1. If the tablet metadata is stale, refresh it (goto step 5).
-  // 2. Select the leader, provided:
-  //    a. The current leader is known,
-  //    b. It hasn't failed, and
-  //    c. It isn't currently marked as a follower.
-  // 3. If there's no good leader select another replica, provided:
-  //    a. It hasn't failed, and
-  //    b. It hasn't rejected our write due to being a follower.
-  // 4. Preemptively mark the replica we selected in step 3 as "leader" in the
-  //    meta cache, so that our selection remains sticky until the next Master
-  //    metadata refresh.
-  // 5. If we're out of appropriate replicas, force a lookup to the master
-  //    to fetch new consensus configuration information.
-  // 6. When the lookup finishes, forget which replicas were followers and
-  //    retry the write (i.e. goto 2).
-  // 7. If we issue the write and it fails because the destination was a
-  //    follower, remember that fact and retry the write (i.e. goto 2).
-  // 8. Repeat steps 1-7 until the write succeeds, fails for other reasons,
-  //    or the write's deadline expires.
-  current_ts_ = nullptr;
-  if (!tablet_->stale()) {
-    current_ts_ = tablet_->LeaderTServer();
-    if (current_ts_ && ContainsKey(followers_, current_ts_)) {
-      VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a follower
for a leader: "
-              << current_ts_->ToString();
-
-      // Mark the node as a follower in the cache so that on the next go-round,
-      // LeaderTServer() will not return it as a leader unless a full metadata
-      // refresh has occurred. This also avoids LookupTabletByKey() going into
-      // "fast path" mode and not actually performing a metadata refresh from the
-      // Master when it needs to.
-      tablet_->MarkTServerAsFollower(current_ts_);
-      current_ts_ = NULL;
-    }
-    if (!current_ts_) {
-      // Try to "guess" the next leader.
-      vector<RemoteTabletServer*> replicas;
-      tablet_->GetRemoteTabletServers(&replicas);
-      for (RemoteTabletServer* ts : replicas) {
-        if (!ContainsKey(followers_, ts)) {
-          current_ts_ = ts;
-          break;
-        }
-      }
-      if (current_ts_) {
-        // Mark this next replica "preemptively" as the leader in the meta cache,
-        // so we go to it first on the next write if writing was successful.
-        VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous
leader failed. "
-                << "Preemptively marking tserver " << current_ts_->ToString()
-                << " as leader in the meta cache.";
-        tablet_->MarkTServerAsLeader(current_ts_);
-      }
-    }
-  }
-
-  // If we've tried all replicas, force a lookup to the master to find the
-  // new leader. This relies on some properties of LookupTabletByKey():
-  // 1. The fast path only works when there's a non-failed leader (which we
-  //    know is untrue here).
-  // 2. The slow path always fetches consensus configuration information and updates the
-  //    looked-up tablet.
-  // Put another way, we don't care about the lookup results at all; we're
-  // just using it to fetch the latest consensus configuration information.
-  //
-  // TODO: When we support tablet splits, we should let the lookup shift
-  // the write to another tablet (i.e. if it's since been split).
-  if (!current_ts_) {
-    batcher_->client_->data_->meta_cache_->LookupTabletByKey(table(),
-                                                             tablet_->partition()
-                                                                     .partition_key_start(),
-                                                             retrier().deadline(),
-                                                             NULL,
-                                                             Bind(&WriteRpc::LookupTabletCb,
-                                                                  Unretained(this)));
-    return;
-  }
-
-  // Make sure we have a working proxy before sending out the RPC.
-  current_ts_->InitProxy(batcher_->client_,
-                         Bind(&WriteRpc::InitTSProxyCb, Unretained(this)));
+  server_picker_->PickLeader(Bind(&WriteRpc::ReplicaFoundCb,
+                                  Unretained(this)),
+                              retrier().deadline());
 }
 
 string WriteRpc::ToString() const {
   return Substitute("Write(tablet: $0, num_ops: $1, num_attempts: $2)",
-                    tablet_->tablet_id(), ops_.size(), num_attempts());
+                    tablet_id_, ops_.size(), num_attempts());
 }
 
-void WriteRpc::LookupTabletCb(const Status& status) {
-  // If the table was deleted, the master will return TABLE_DELETED and the
-  // meta cache will return Status::NotFound.
-  if (status.IsNotFound()) {
+void WriteRpc::ReplicaFoundCb(const Status& status, RemoteTabletServer* replica) {
+  RetriableRpcStatus result = AnalyzeResponse(status);
+  if (RetryIfNeeded(result, replica)) return;
+
+  if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
     batcher_->ProcessWriteResponse(*this, status);
     delete this;
     return;
   }
 
-  // We should retry the RPC regardless of the outcome of the lookup, as
-  // leader election doesn't depend on the existence of a master at all.
-  //
-  // Retry() imposes a slight delay, which is desirable in a lookup loop,
-  // but unnecessary the first time through. Seeing as leader failures are
-  // rare, perhaps this doesn't matter.
-  followers_.clear();
-  mutable_retrier()->DelayedRetry(this, status);
-}
+  DCHECK_EQ(result.result, RetriableRpcStatus::OK);
 
-void WriteRpc::InitTSProxyCb(const Status& status) {
-  // Fail to a replica in the event of a DNS resolution failure.
-  if (!status.ok()) {
-    RetriableRpcStatus result = AnalyzeResponse(status);
-    CHECK(RetryIfNeeded(result)) << "Should have retried on a DNS resolution failure.";
-    return;
-  }
-
-  VLOG(2) << "Tablet " << tablet_->tablet_id() << ": Writing batch to
replica "
-          << current_ts_->ToString();
-  current_ts_->proxy()->WriteAsync(req_, &resp_,
-                                   mutable_retrier()->mutable_controller(),
-                                   boost::bind(&WriteRpc::SendRpcCb, this, Status::OK()));
+  VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica "
+          << replica->ToString();
+  current_ts_ = replica;
+  replica->proxy()->WriteAsync(req_, &resp_,
+                               mutable_retrier()->mutable_controller(),
+                               boost::bind(&WriteRpc::SendRpcCb, this, Status::OK()));
 }
 
 RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
@@ -495,7 +584,7 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status)
{
   return result;
 }
 
-bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result) {
+bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result, RemoteTabletServer* replica)
{
   // Handle the cases where we retry.
   switch (result.result) {
     // For writes, always retry a SERVER_BUSY error on the same server.
@@ -504,22 +593,19 @@ bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result) {
     }
     case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
       VLOG(1) << "Failing " << ToString() << " to a new replica: " <<
result.status.ToString();
-      bool found = tablet_->MarkReplicaFailed(current_ts_, result.status);
-      DCHECK(found) << Substitute("Tablet $0: Unable to mark replica $1 as failed.
Replicas: ",
-                                  tablet_->tablet_id(), current_ts_->ToString(),
-                                  tablet_->ReplicasAsString());
+      server_picker_->MarkServerFailed(replica, result.status);
       break;
     }
     // The TabletServer was not part of the config serving the tablet.
     // We mark our tablet cache as stale, forcing a master lookup on the next attempt.
     // TODO: Don't backoff the first time we hit this error (see KUDU-1314).
     case RetriableRpcStatus::RESOURCE_NOT_FOUND: {
-      tablet_->MarkStale();
+      server_picker_->MarkResourceNotFound(replica);
       break;
     }
     // The TabletServer was not the leader of the quorum.
     case RetriableRpcStatus::REPLICA_NOT_LEADER: {
-      followers_.insert(current_ts_);
+      server_picker_->MarkReplicaNotLeader(replica);
       break;
     }
     // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry.
@@ -533,7 +619,7 @@ bool WriteRpc::RetryIfNeeded(const RetriableRpcStatus& result) {
 
 void WriteRpc::SendRpcCb(const Status& status) {
   RetriableRpcStatus result = AnalyzeResponse(status);
-  if (RetryIfNeeded(result)) return;
+  if (RetryIfNeeded(result, current_ts_)) return;
 
   // From here on out the rpc has either succeeded of suffered a non-retriable
   // failure.
@@ -547,7 +633,7 @@ void WriteRpc::SendRpcCb(const Status& status) {
     }
     final_status = final_status.CloneAndPrepend(
         Substitute("Failed to write batch of $0 ops to tablet $1 $2 after $3 attempt(s)",
-                   ops_.size(), tablet_->tablet_id(), current_ts_string, num_attempts()));
+                   ops_.size(), tablet_id_, current_ts_string, num_attempts()));
     LOG(WARNING) << final_status.ToString();
   }
   batcher_->ProcessWriteResponse(*this, final_status);
@@ -874,11 +960,20 @@ void Batcher::FlushBuffer(RemoteTablet* tablet, const vector<InFlightOp*>&
ops)
   // its callback completes.
   //
   // The RPC object takes ownership of the ops.
+
+  // TODO Keep a replica picker per tablet and share it across writes
+  // to the same tablet.
+  scoped_refptr<MetaCacheServerPicker> server_picker(
+      new MetaCacheServerPicker(client_,
+                                client_->data_->meta_cache_,
+                                ops[0]->write_op->table(),
+                                tablet));
   WriteRpc* rpc = new WriteRpc(this,
-                               tablet,
+                               server_picker,
                                ops,
                                deadline_,
-                               client_->data_->messenger_);
+                               client_->data_->messenger_,
+                               tablet->tablet_id());
   rpc->SendRpc();
 }
 
@@ -924,8 +1019,7 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
       LOG(ERROR) << "Received a per_row_error for an out-of-bound op index "
                  << err_pb.row_index() << " (sent only "
                  << rpc.ops().size() << " ops)";
-      LOG(ERROR) << "Response from tablet " << rpc.tablet()->tablet_id() <<
":\n"
-                 << rpc.resp().DebugString();
+      LOG(ERROR) << "Response from tablet " << rpc.tablet_id() << ":\n"
<< rpc.resp().DebugString();
       continue;
     }
     gscoped_ptr<KuduWriteOperation> op = std::move(rpc.ops()[err_pb.row_index()]->write_op);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6ea07fb4/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc.h b/src/kudu/rpc/rpc.h
index c27abe6..fc49720 100644
--- a/src/kudu/rpc/rpc.h
+++ b/src/kudu/rpc/rpc.h
@@ -63,6 +63,35 @@ struct RetriableRpcStatus {
   Status status;
 };
 
+// This class picks a server among a possible set of servers serving a given resource.
+//
+// TODO Currently this only picks the leader, though it wouldn't be unfeasible to have this
+// have an enum so that it can pick any server.
+template <class Server>
+class ServerPicker : public RefCountedThreadSafe<ServerPicker<Server>> {
+ public:
+  virtual ~ServerPicker() {}
+
+  typedef Callback<void(const Status& status, Server* server)> ServerPickedCallback;
+
+  // Picks the leader among the replicas serving a resource.
+  // If the leader was found, it calls the callback with Status::OK() and
+  // with 'server' set to the current leader, otherwise calls the callback
+  // with 'status' set to the failure reason.
+  // If picking a leader takes longer than 'deadline' the callback is called with
+  // Status::TimedOut().
+  virtual void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline)
= 0;
+
+  // Marks a server as failed/unacessible.
+  virtual void MarkServerFailed(Server *server, const Status &status) = 0;
+
+  // Marks a server as not the leader of config serving the resource we're trying to interact
with.
+  virtual void MarkReplicaNotLeader(Server* replica) = 0;
+
+  // Marks a server as not serving the resource we want.
+  virtual void MarkResourceNotFound(Server *replica) = 0;
+};
+
 // Provides utilities for retrying failed RPCs.
 //
 // All RPCs should use HandleResponse() to retry certain generic errors.


Mime
View raw message