kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [4/6] kudu git commit: KUDU-798 (part 5) Correct safe time advancement
Date Thu, 08 Dec 2016 16:14:36 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
index 0eaaefe..5e901a0 100644
--- a/src/kudu/tablet/tablet_peer.cc
+++ b/src/kudu/tablet/tablet_peer.cc
@@ -92,6 +92,7 @@ using consensus::OpId;
 using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
 using consensus::RaftConsensus;
+using consensus::TimeManager;
 using consensus::ALTER_SCHEMA_OP;
 using consensus::WRITE_OP;
 using log::Log;
@@ -162,11 +163,14 @@ Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
     RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
                                           meta_->fs_manager()->uuid(), &cmeta));
 
+    scoped_refptr<TimeManager> time_manager(new TimeManager(
+        clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
+
     consensus_ = RaftConsensus::Create(options,
                                        std::move(cmeta),
                                        local_peer_pb_,
                                        metric_entity,
-                                       clock_,
+                                       time_manager,
                                        this,
                                        messenger_,
                                        log_.get(),
@@ -532,9 +536,6 @@ Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>&
   // TODO(todd) Look at wiring the stuff below on the driver
   TransactionState* state = transaction->state();
   state->set_consensus_round(round);
-  Timestamp ts(replicate_msg->timestamp());
-  state->set_timestamp(ts);
-  clock_->Update(ts);
 
   scoped_refptr<TransactionDriver> driver;
   RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
@@ -555,8 +556,7 @@ Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction>
transacti
     log_.get(),
     prepare_pool_.get(),
     apply_pool_,
-    &txn_order_verifier_,
-    clock_);
+    &txn_order_verifier_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
   driver->swap(tx_driver);
 
@@ -571,8 +571,7 @@ Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction>
transact
     log_.get(),
     prepare_pool_.get(),
     apply_pool_,
-    &txn_order_verifier_,
-    clock_);
+    &txn_order_verifier_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
   driver->swap(tx_driver);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
index 762925b..d622626 100644
--- a/src/kudu/tablet/tablet_peer.h
+++ b/src/kudu/tablet/tablet_peer.h
@@ -26,6 +26,7 @@
 
 #include "kudu/consensus/consensus.h"
 #include "kudu/consensus/log.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/tablet.h"
@@ -148,6 +149,10 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
     return tablet_.get();
   }
 
+  scoped_refptr<consensus::TimeManager> time_manager() const {
+    return consensus_->time_manager();
+  }
+
   std::shared_ptr<Tablet> shared_tablet() const {
     std::lock_guard<simple_spinlock> lock(lock_);
     return tablet_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/alter_schema_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.cc b/src/kudu/tablet/transactions/alter_schema_transaction.cc
index 4ee5f0f..bb262fd 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.cc
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.cc
@@ -62,7 +62,6 @@ void AlterSchemaTransactionState::ReleaseSchemaLock() {
   TRACE("Released schema lock");
 }
 
-
 AlterSchemaTransaction::AlterSchemaTransaction(unique_ptr<AlterSchemaTransactionState>
state,
                                                DriverType type)
     : Transaction(state.get(), type, Transaction::ALTER_SCHEMA_TXN),
@@ -96,9 +95,9 @@ Status AlterSchemaTransaction::Prepare() {
 }
 
 Status AlterSchemaTransaction::Start() {
-  if (!state_->has_timestamp()) {
-    state_->set_timestamp(state_->tablet_peer()->clock()->Now());
-  }
+  DCHECK(!state_->has_timestamp());
+  DCHECK(state_->consensus_round()->replicate_msg()->has_timestamp());
+  state_->set_timestamp(Timestamp(state_->consensus_round()->replicate_msg()->timestamp()));
   TRACE("START. Timestamp: $0", server::HybridClock::GetPhysicalValueMicros(state_->timestamp()));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index ab0606d..8c5b4b9 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -20,6 +20,7 @@
 #include <mutex>
 
 #include "kudu/consensus/consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/tablet_peer.h"
@@ -76,7 +77,6 @@ class FollowerTransactionCompletionCallback : public TransactionCompletionCallba
   scoped_refptr<ResultTracker> result_tracker_;
 };
 
-
 ////////////////////////////////////////////////////////////
 // TransactionDriver
 ////////////////////////////////////////////////////////////
@@ -86,15 +86,13 @@ TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker,
                                      Log* log,
                                      ThreadPool* prepare_pool,
                                      ThreadPool* apply_pool,
-                                     TransactionOrderVerifier* order_verifier,
-                                     scoped_refptr<server::Clock> clock)
+                                     TransactionOrderVerifier* order_verifier)
     : txn_tracker_(txn_tracker),
       consensus_(consensus),
       log_(log),
       prepare_pool_(prepare_pool),
       apply_pool_(apply_pool),
       order_verifier_(order_verifier),
-      clock_(std::move(clock)),
       trace_(new Trace()),
       start_time_(MonoTime::Now()),
       replication_state_(NOT_REPLICATING),
@@ -285,21 +283,9 @@ Status TransactionDriver::Prepare() {
   switch (repl_state_copy) {
     case NOT_REPLICATING:
     {
-      // Assign the timestamp just before submitting the transaction to consensus, if
-      // it doesn't have one.
-      // This is a placeholder since in the near future the timestamp will be assigned.
-      // within consensus.
-      // TODO(dralves) Remove this when the new TimeManager class gets in (part of KUDU-798)
-      DCHECK(!transaction_->state()->has_timestamp());
-      if (transaction_->state()->external_consistency_mode() == COMMIT_WAIT) {
-        transaction_->state()->set_timestamp(clock_->NowLatest());
-      } else {
-        transaction_->state()->set_timestamp(clock_->Now());
-      }
-
-      transaction_->state()->consensus_round()->replicate_msg()->set_timestamp(
-          transaction_->state()->timestamp().ToUint64());
-
+      // Assign a timestamp to the transaction before we Start() it.
+      RETURN_NOT_OK(consensus_->time_manager()->AssignTimestamp(
+                        mutable_state()->consensus_round()->replicate_msg()));
       RETURN_NOT_OK(transaction_->Start());
       VLOG_WITH_PREFIX(4) << "Triggering consensus replication.";
       // Trigger consensus replication.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index df62a20..044edda 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -224,8 +224,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver>
{
                     log::Log* log,
                     ThreadPool* prepare_pool,
                     ThreadPool* apply_pool,
-                    TransactionOrderVerifier* order_verifier,
-                    scoped_refptr<server::Clock> clock);
+                    TransactionOrderVerifier* order_verifier);
 
   // Perform any non-constructor initialization. Sets the transaction
   // that will be executed.
@@ -355,11 +354,6 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver>
{
   // Lock that synchronizes access to the transaction's state.
   mutable simple_spinlock lock_;
 
-  // Temporarily have the clock on the driver so that we can assign timestamps to
-  // transactions.
-  // TODO(dralves) Remove this when the new TimeManager class gets in (part of KUDU-798).
-  scoped_refptr<server::Clock> clock_;
-
   // A copy of the transaction's OpId, set when the transaction first
   // receives one from Consensus and uninitialized until then.
   // TODO(todd): we have three separate copies of this now -- in TransactionState,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_tracker-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker-test.cc b/src/kudu/tablet/transactions/transaction_tracker-test.cc
index 54ef6fe..695dd6a 100644
--- a/src/kudu/tablet/transactions/transaction_tracker-test.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker-test.cc
@@ -94,8 +94,7 @@ class TransactionTrackerTest : public KuduTest {
                                 nullptr,
                                 nullptr,
                                 nullptr,
-                                nullptr,
-                                scoped_refptr<server::Clock>()));
+                                nullptr));
       gscoped_ptr<NoOpTransaction> tx(new NoOpTransaction(new NoOpTransactionState));
       RETURN_NOT_OK(driver->Init(tx.PassAs<Transaction>(), consensus::LEADER));
       local_drivers.push_back(driver);

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker.cc b/src/kudu/tablet/transactions/transaction_tracker.cc
index 95f9f1e..3661747 100644
--- a/src/kudu/tablet/transactions/transaction_tracker.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker.cc
@@ -21,7 +21,6 @@
 #include <limits>
 #include <vector>
 
-
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet_peer.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index d7dce19..d9c7347 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -20,8 +20,8 @@
 #include <algorithm>
 #include <vector>
 
-#include "kudu/common/wire_protocol.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/walltime.h"
@@ -108,6 +108,9 @@ void WriteTransaction::AbortPrepare() {
 Status WriteTransaction::Start() {
   TRACE_EVENT0("txn", "WriteTransaction::Start");
   TRACE("Start()");
+  DCHECK(!state_->has_timestamp());
+  DCHECK(state_->consensus_round()->replicate_msg()->has_timestamp());
+  state_->set_timestamp(Timestamp(state_->consensus_round()->replicate_msg()->timestamp()));
   state_->tablet_peer()->tablet()->StartTransaction(state_.get());
   TRACE("Timestamp: $0", state_->tablet_peer()->clock()->Stringify(state_->timestamp()));
   return Status::OK();
@@ -224,13 +227,8 @@ WriteTransactionState::WriteTransactionState(TabletPeer* tablet_peer,
   }
 }
 
-void WriteTransactionState::SetMvccTxAndTimestamp(gscoped_ptr<ScopedTransaction> mvcc_tx)
{
+void WriteTransactionState::SetMvccTx(gscoped_ptr<ScopedTransaction> mvcc_tx) {
   DCHECK(!mvcc_tx_) << "Mvcc transaction already started/set.";
-  if (has_timestamp()) {
-    DCHECK_EQ(timestamp(), mvcc_tx->timestamp());
-  } else {
-    set_timestamp(mvcc_tx->timestamp());
-  }
   mvcc_tx_ = std::move(mvcc_tx);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index 5cb8f23..2d35d27 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -100,11 +100,10 @@ class WriteTransactionState : public TransactionState {
   }
 
   // Set the MVCC transaction associated with this Write operation.
-  // This must be called exactly once, during the PREPARE phase just
-  // after the MvccManager has assigned a timestamp.
+  // This must be called exactly once, after the timestamp was acquired.
   // This also copies the timestamp from the MVCC transaction into the
   // WriteTransactionState object.
-  void SetMvccTxAndTimestamp(gscoped_ptr<ScopedTransaction> mvcc_tx);
+  void SetMvccTx(gscoped_ptr<ScopedTransaction> mvcc_tx);
 
   // Set the Tablet components that this transaction will write into.
   // Called exactly once at the beginning of Apply, before applying its

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tools/tool_action_remote_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index ec8c6ea..57fa19d 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -95,14 +95,17 @@ class ReplicaDumper {
     ScanRequestPB req;
     ScanResponsePB resp;
 
+    // Scan and dump the tablet.
+    // Note that we do a READ_LATEST scan as we might be scanning a tablet who lost majority
+    // and thus cannot do snapshot scans.
+    // TODO(dalves) When KUDU-1704 is in change this to perform stale snapshot reads, which
+    // can be ordered.
     NewScanRequestPB* new_req = req.mutable_new_scan_request();
     RETURN_NOT_OK(SchemaToColumnPBs(
         schema, new_req->mutable_projected_columns(),
         SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES));
     new_req->set_tablet_id(tablet_id);
     new_req->set_cache_blocks(false);
-    new_req->set_order_mode(ORDERED);
-    new_req->set_read_mode(READ_AT_SNAPSHOT);
 
     do {
       RpcController rpc;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 84312f3..684209e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -28,6 +28,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/stl_util.h"
@@ -79,6 +80,12 @@ DEFINE_bool(scanner_allow_snapshot_scans_with_logical_timestamps, false,
             "If set, the server will support snapshot scans with logical timestamps.");
 TAG_FLAG(scanner_allow_snapshot_scans_with_logical_timestamps, unsafe);
 
+DEFINE_int32(scanner_max_wait_ms, 1000,
+             "The maximum amount of time (in milliseconds) we'll hang a scanner thread waiting
for "
+             "safe time to advance or transactions to commit, even if its deadline allows
waiting "
+             "longer.");
+TAG_FLAG(scanner_max_wait_ms, advanced);
+
 // Fault injection flags.
 DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
              "If set, the scanner will pause the specified number of milliesconds "
@@ -1374,6 +1381,26 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
   return Status::OK();
 }
 
+namespace {
+// Checks if 'timestamp' is before the 'tablet's AHM if this is a READ_AT_SNAPSHOT scan.
+// Returns Status::OK() if it's not or Status::InvalidArgument() if it is.
+Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp timestamp) {
+  tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
+  if (read_mode == READ_AT_SNAPSHOT && history_gc_opts.IsAncientHistory(timestamp))
{
+    return Status::InvalidArgument(
+        Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider
"
+                       "increasing the value of the configuration parameter "
+                       "--tablet_history_max_age_sec. Snapshot timestamp: $0 "
+                       "Ancient History Mark: $1 Physical time difference: $2",
+                   tablet->clock()->Stringify(timestamp),
+                   tablet->clock()->Stringify(history_gc_opts.ancient_history_mark()),
+                   tablet->clock()->GetPhysicalComponentDifference(
+                       timestamp, history_gc_opts.ancient_history_mark()).ToString()));
+  }
+  return Status::OK();
+}
+} // anonymous namespace
+
 // Start a new scan.
 Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
                                                const ScanRequestPB* req,
@@ -1483,13 +1510,21 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
         break;
       }
       case READ_AT_SNAPSHOT: {
-        s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, tablet, &iter, snap_timestamp);
+        s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, tablet_peer,
+                                 &iter, snap_timestamp);
+        // If we got a Status::ServiceUnavailable() from HandleScanAtSnapshot() it might
+        // mean we're just behind so let the client try again.
+        if (s.IsServiceUnavailable()) {
+          *error_code = TabletServerErrorPB::THROTTLED;
+          return s;
+        }
+
         if (!s.ok()) {
           tmp_error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
         }
         break;
       }
-      TRACE("Iterator created");
+        TRACE("Iterator created");
     }
   }
 
@@ -1535,23 +1570,15 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
   // end up with a valid snapshot in that case. It would be more correct to
   // initialize the row iterator and then select the latest timestamp
   // represented by those open files in that case.
-  tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
-  if (scan_pb.read_mode() == READ_AT_SNAPSHOT &&
-      history_gc_opts.IsAncientHistory(*snap_timestamp)) {
-    // Now that we have initialized our row iterator at a snapshot, return an
-    // error if the snapshot timestamp was prior to the ancient history mark.
-    // We have to check after we open the iterator in order to avoid a TOCTOU
-    // error.
+  //
+  // Now that we have initialized our row iterator at a snapshot, return an
+  // error if the snapshot timestamp was prior to the ancient history mark.
+  // We have to check after we open the iterator in order to avoid a TOCTOU
+  // error.
+  s = VerifyNotAncientHistory(tablet.get(), scan_pb.read_mode(), *snap_timestamp);
+  if (!s.ok()) {
     *error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
-    return Status::InvalidArgument(
-        Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider
"
-                   "increasing the value of the configuration parameter "
-                   "--tablet_history_max_age_sec. Snapshot timestamp: $0 "
-                   "Ancient History Mark: $1 Physical time difference: $2",
-                   server_->clock()->Stringify(*snap_timestamp),
-                   server_->clock()->Stringify(history_gc_opts.ancient_history_mark()),
-                   server_->clock()->GetPhysicalComponentDifference(
-                       *snap_timestamp, history_gc_opts.ancient_history_mark()).ToString()));
+    return s;
   }
 
   *has_more_results = iter->HasNext();
@@ -1726,16 +1753,25 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB*
req,
   return Status::OK();
 }
 
+namespace {
+// Helper to clamp a client deadline for a scan to the max supported by the server.
+MonoTime ClampScanDeadlineForWait(const MonoTime& deadline, bool* was_clamped) {
+  MonoTime now = MonoTime::Now();
+  if (deadline.GetDeltaSince(now).ToMilliseconds() > FLAGS_scanner_max_wait_ms) {
+    *was_clamped = true;
+    return now + MonoDelta::FromMilliseconds(FLAGS_scanner_max_wait_ms);
+  }
+  *was_clamped = false;
+  return deadline;
+}
+} // anonymous namespace
+
 Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
                                                const RpcContext* rpc_context,
                                                const Schema& projection,
-                                               const shared_ptr<Tablet>& tablet,
+                                               TabletPeer* tablet_peer,
                                                gscoped_ptr<RowwiseIterator>* iter,
                                                Timestamp* snap_timestamp) {
-
-  // TODO check against the earliest boundary (i.e. how early can we go) right
-  // now we're keeping all undos/redos forever!
-
   // If the client sent a timestamp update our clock with it.
   if (scan_pb.has_propagated_timestamp()) {
     Timestamp propagated_timestamp(scan_pb.propagated_timestamp());
@@ -1776,32 +1812,51 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB&
scan_pb,
     }
   }
 
-  tablet::MvccSnapshot snap;
+  // Before we wait on anything check that the timestamp is after the AHM.
+  // This is not the final check. We'll check this again after the iterators are open but
+  // there is no point in waiting if we can't actually scan afterwards.
+  RETURN_NOT_OK(VerifyNotAncientHistory(tablet_peer->tablet(),
+                                        ReadMode::READ_AT_SNAPSHOT,
+                                        tmp_snap_timestamp));
 
-  // Wait for the in-flights in the snapshot to be finished.
-  // We'll use the client-provided deadline, but not if it's more than 5 seconds from
-  // now -- it's better to make the client retry than hold RPC threads busy.
-  //
-  // TODO(KUDU-1127): even this may not be sufficient -- perhaps we should check how long
it
-  // has been since the MVCC manager was able to advance its safe time. If it has been
-  // a long time, it's likely that the majority of voters for this tablet are down
-  // and some writes are "stuck" and therefore won't be committed.
-  // Subtract a little bit from the client deadline so that it's more likely we actually
-  // have time to send our response sent back before it times out.
-  MonoTime client_deadline =
-      rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10);
+  tablet::MvccSnapshot snap;
+  Tablet* tablet = tablet_peer->tablet();
+  scoped_refptr<consensus::TimeManager> time_manager = tablet_peer->time_manager();
+  tablet::MvccManager* mvcc_manager = tablet->mvcc_manager();
+
+  // Reduce the client's deadline by a few msecs to allow for overhead.
+  MonoTime client_deadline = rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10);
+
+  // Its not good for the tablet server or for the client if we hang here forever. The tablet
+  // server will have one less available thread and the client might be stuck spending all
+  // of the allotted time for the scan on a partitioned server that will never have a consistent
+  // snapshot at 'snap_timestamp'.
+  // Because of this we clamp the client's deadline to the max. configured. If the client
+  // sets a long timeout then it can use it by trying in other servers.
+  bool was_clamped = false;
+  MonoTime final_deadline = ClampScanDeadlineForWait(client_deadline, &was_clamped);
+
+  // Wait for the tablet to know that 'snap_timestamp' is safe. I.e. that all operations
+  // that came before it are, at least, started. This, together with waiting for the mvcc
+  // snapshot to be clean below, allows us to always return the same data when scanning at
+  // the same timestamp (repeatable reads).
+  TRACE("Waiting safe time to advance");
+  MonoTime before = MonoTime::Now();
+  Status s = time_manager->WaitUntilSafe(tmp_snap_timestamp, final_deadline);
 
-  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
-  if (client_deadline < deadline) {
-    deadline = client_deadline;
+  if (s.ok()) {
+    // Wait for the in-flights in the snapshot to be finished.
+    TRACE("Waiting for operations to commit");
+    s = mvcc_manager->WaitForSnapshotWithAllCommitted(tmp_snap_timestamp, &snap, client_deadline);
   }
 
-  TRACE("Waiting for operations in snapshot to commit");
-  MonoTime before = MonoTime::Now();
-  RETURN_NOT_OK_PREPEND(
-      tablet->mvcc_manager()->WaitForCleanSnapshotAtTimestamp(
-          tmp_snap_timestamp, &snap, deadline),
-      "could not wait for desired snapshot timestamp to be consistent");
+  // If we got an TimeOut but we had clamped the deadline, return a ServiceUnavailable instead
+  // so that the client retries.
+  if (s.IsTimedOut() && was_clamped) {
+    return Status::ServiceUnavailable(s.CloneAndPrepend(
+        "could not wait for desired snapshot timestamp to be consistent").ToString());
+  }
+  RETURN_NOT_OK(s);
 
   uint64_t duration_usec = (MonoTime::Now() - before).ToMicroseconds();
   tablet->metrics()->snapshot_read_inflight_wait_duration->Increment(duration_usec);

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tserver/tablet_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h
index 430fb13..3cdc95e 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -93,7 +93,7 @@ class TabletServiceImpl : public TabletServerServiceIf {
   Status HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
                               const rpc::RpcContext* rpc_context,
                               const Schema& projection,
-                              const std::shared_ptr<tablet::Tablet>& tablet,
+                              tablet::TabletPeer* tablet_peer,
                               gscoped_ptr<RowwiseIterator>* iter,
                               Timestamp* snap_timestamp);
 


Mime
View raw message