kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From abu...@apache.org
Subject [2/3] kudu git commit: KUDU-2463 pt 3: don't scan if MVCC hasn't moved
Date Tue, 16 Oct 2018 08:45:00 GMT
KUDU-2463 pt 3: don't scan if MVCC hasn't moved

In cases when a tablet bootstrap yields an MvccManager whose safe time
has not been advanced (e.g. if there are no write ops in the WAL), and
the tablet has otherwise not bumped its MVCC timestamps (e.g. if it has
not yet elected a leader), a scan (whose correctness depends on the
MvccManager to determine what transactions have been applied) will
return incorrect results.

In the same way that we prevent compactions from occuring if MVCC's
timestamps have not been moved, this patch prevents incorrect results
from being returend from a scan by returning an error that can be
retried elsewhere.

New tests are added to attempt to scan in this state, verifying that we
get an error. A couple of tests that use the mock clock are also updated
so the initial timestamp assigned to a no-op is a more organic, non-zero
timestamp.

Change-Id: Idc0f77673e1f04a34ab1f5c1930bbaa2498b39bf
Reviewed-on: http://gerrit.cloudera.org:8080/11428
Reviewed-by: Mike Percy <mpercy@apache.org>
Tested-by: Kudu Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/11690
Reviewed-by: Grant Henke <granthenke@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fae09bdd
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fae09bdd
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fae09bdd

Branch: refs/heads/branch-1.8.x
Commit: fae09bdd659fad59c4659d0860af6449ce40a990
Parents: b3bb51e
Author: Andrew Wong <awong@cloudera.com>
Authored: Tue Sep 4 00:55:58 2018 -0700
Committer: Attila Bukor <abukor@apache.org>
Committed: Tue Oct 16 08:40:22 2018 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/consistency-itest.cc |   9 +
 .../tablet_history_gc-itest.cc                  |  27 +--
 .../timestamp_advancement-itest.cc              | 176 ++++++++++++++-----
 src/kudu/mini-cluster/external_mini_cluster.cc  |   6 +
 src/kudu/mini-cluster/external_mini_cluster.h   |   5 +
 src/kudu/mini-cluster/internal_mini_cluster.cc  |   7 +
 src/kudu/mini-cluster/internal_mini_cluster.h   |  10 +-
 src/kudu/mini-cluster/mini_cluster.h            |   8 +
 src/kudu/tablet/mvcc.cc                         |   9 +
 src/kudu/tablet/mvcc.h                          |   5 +
 src/kudu/tools/kudu-ts-cli-test.cc              |  20 ++-
 src/kudu/tserver/tablet_service.cc              |  11 ++
 12 files changed, 231 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index a0a2dc9..81231cd 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -113,6 +113,15 @@ class ConsistencyITest : public MiniClusterITestBase {
   virtual void SetUp() override {
     MiniClusterITestBase::SetUp();
     StartCluster(num_tablet_servers_);
+
+    // Since we're using mock NTP rather than the hybrid clock, it's possible
+    // that the first timestamp assigned to a tablet message is the initial
+    // timestamp (0). For correctness of scans, it is illegal to scan in this
+    // state. As such, we bump the clock up front so when we create tablets,
+    // they start out with more natural, non-0 values.
+    for (int i = 0; i < num_tablet_servers_; i++) {
+      cluster_->mini_tablet_server(i)->server()->clock()->Now();
+    }
   }
 
   void ScannerThread(KuduClient::ReplicaSelection selection,

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/integration-tests/tablet_history_gc-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index e2ec09b..40b8ad5 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -523,6 +523,23 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
   FLAGS_scanner_ttl_ms = 1000 * 60 * 60 * 24;
 
   StartCluster(1); // Start InternalMiniCluster with a single tablet server.
+  // Since we're using mock NTP rather than the hybrid clock, it's possible
+  // that if we created a tablet now, the first timestamp assigned to a tablet
+  // message would be the initial timestamp (0). For correctness of scans, it
+  // is illegal to scan in this state. As such, we bump the clock up front so
+  // when we create tablets, they start out with more natural, non-0 values.
+  MiniTabletServer* mts = cluster_->mini_tablet_server(0);
+
+  // Directly access the tserver so we can control compaction and the clock.
+  TabletServer* ts = mts->server();
+  clock_ = down_cast<HybridClock*>(ts->clock());
+
+  // Set initial clock time to 1000 seconds past 0, which is enough so that the
+  // AHM will not be negative.
+  const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000;
+  auto* ntp = down_cast<clock::MockNtp*>(clock_->time_service());
+  ntp->SetMockClockWallTimeForTests(kInitialMicroTime);
+
   TestWorkload workload(cluster_.get());
   workload.set_num_replicas(1);
   workload.Setup(); // Convenient way to create a table.
@@ -530,21 +547,11 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload)
{
   client::sp::shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable(workload.table_name(), &table));
 
-  // Directly access the Tablet so we can control compaction and the clock.
-  MiniTabletServer* mts = cluster_->mini_tablet_server(0);
-  TabletServer* ts = mts->server();
-  clock_ = down_cast<HybridClock*>(ts->clock());
   std::vector<scoped_refptr<TabletReplica>> replicas;
   ts->tablet_manager()->GetTabletReplicas(&replicas);
   ASSERT_EQ(1, replicas.size());
   Tablet* tablet = replicas[0]->tablet();
 
-  // Set initial clock time to 1000 seconds past 0, which is enough so that the
-  // AHM will not be negative.
-  const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000;
-  auto* ntp = down_cast<clock::MockNtp*>(clock_->time_service());
-  ntp->SetMockClockWallTimeForTests(kInitialMicroTime);
-
   LOG(INFO) << "Seeding random number generator";
   Random random(SeedRandom());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/integration-tests/timestamp_advancement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index be30bdf..b0b9d51 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -27,13 +27,19 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/log_index.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
+#include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
@@ -44,14 +50,18 @@
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -62,27 +72,49 @@ DECLARE_bool(log_async_preallocate_segments);
 DECLARE_bool(raft_enable_pre_election);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(raft_heartbeat_interval_ms);
 
 namespace kudu {
 
-using cluster::InternalMiniCluster;
-using cluster::InternalMiniClusterOptions;
+using consensus::RaftPeerPB;
 using log::LogReader;
+using pb_util::SecureShortDebugString;
+using rpc::RpcController;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
 using tablet::TabletReplica;
 using tserver::MiniTabletServer;
+using tserver::NewScanRequestPB;
+using tserver::ScanResponsePB;
+using tserver::ScanRequestPB;
+using tserver::TabletServerErrorPB;
+using tserver::TabletServerServiceProxy;
 
 namespace itest {
 
 class TimestampAdvancementITest : public MiniClusterITestBase {
  public:
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  // Many tests will operate on a single tserver. The first one is chosen
+  // arbitrarily.
+  const int kTserver = 0;
+
   // Sets up a cluster and returns the tablet replica on 'ts' that has written
   // to its WAL. 'replica' will write further messages to a new WAL segment.
   void SetupClusterWithWritesInWAL(int ts, scoped_refptr<TabletReplica>* replica) {
+    // We're going to manually trigger maintenance ops, so disable maintenance op
+    // scheduling.
+    FLAGS_enable_maintenance_manager = false;
+
+    // Prevent preallocation of WAL segments in order to prevent races between
+    // the WAL allocation thread and our manual rolling over of the WAL.
+    FLAGS_log_preallocate_segments = false;
+    FLAGS_log_async_preallocate_segments = false;
+
     NO_FATALS(StartCluster(3));
 
     // Write some rows to the cluster.
@@ -128,6 +160,21 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
     return replicas[0];
   }
 
+  // Returns a scan response from the tablet on the given tablet server.
+  ScanResponsePB ScanResponseForTablet(int ts, const string& tablet_id) const {
+    ScanResponsePB resp;
+    RpcController rpc;
+    ScanRequestPB req;
+    NewScanRequestPB* scan = req.mutable_new_scan_request();
+    scan->set_tablet_id(tablet_id);
+    scan->set_read_mode(READ_LATEST);
+    const Schema schema = GetSimpleTestSchema();
+    CHECK_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+    shared_ptr<TabletServerServiceProxy> tserver_proxy = cluster_->tserver_proxy(ts);
+    CHECK_OK(tserver_proxy->Scan(req, &resp, &rpc));
+    return resp;
+  }
+
   // Returns true if there are any write replicate messages in the WALs of
   // 'tablet_id' on 'ts'.
   Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id,
@@ -156,27 +203,52 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
     *has_write_replicates = false;
     return Status::OK();
   }
+
+  // Repeatedly GCs the replica's WALs until there are no more write replicates
+  // in the WAL.
+  void GCUntilNoWritesInWAL(MiniTabletServer* tserver,
+                            scoped_refptr<TabletReplica> replica) {
+    ASSERT_EVENTUALLY([&] {
+      LOG(INFO) << "GCing logs...";
+      int64_t gcable_size;
+      ASSERT_OK(replica->GetGCableDataSize(&gcable_size));
+      ASSERT_GT(gcable_size, 0);
+      ASSERT_OK(replica->RunLogGC());
+
+      // Ensure that we have no writes in our WALs.
+      bool has_write_replicates;
+      ASSERT_OK(CheckForWriteReplicatesInLog(tserver, replica->tablet_id(),
+                                             &has_write_replicates));
+      ASSERT_FALSE(has_write_replicates);
+    });
+  }
+
+  // Shuts down all the nodes in a cluster and restarts the given tserver.
+  // Waits for the given replica on the tserver to start.
+  Status ShutdownAllNodesAndRestartTserver(MiniTabletServer* tserver,
+                                          const string& tablet_id) {
+    LOG(INFO) << "Shutting down cluster...";
+    cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
+    // Note: We shut down tservers individually rather than using
+    // ClusterNodes::TS, since the latter would invalidate our reference to
+    // 'tserver'.
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      cluster_->mini_tablet_server(i)->Shutdown();
+    }
+    LOG(INFO) << "Restarting single tablet server...";
+    RETURN_NOT_OK(tserver->Restart());
+    TServerDetails* ts_details = FindOrDie(ts_map_, tserver->uuid());
+    return WaitUntilTabletRunning(ts_details, tablet_id, kTimeout);
+  }
 };
 
 // Test that bootstrapping a Raft no-op from the WAL will advance the replica's
 // MVCC safe time timestamps.
 TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
-  // Set a low Raft heartbeat and encourage frequent elections so that we can
-  // fill up the WAL with no-op entries naturally.
+  // Set a low Raft heartbeat interval so we can inject churn elections.
   FLAGS_raft_heartbeat_interval_ms = 100;
-  FLAGS_raft_enable_pre_election = false;
-
-  // Prevent preallocation of WAL segments in order to prevent races between
-  // the WAL allocation thread and our manual rolling over of the WAL.
-  FLAGS_log_preallocate_segments = false;
-  FLAGS_log_async_preallocate_segments = false;
-
-  // We're going to manually trigger maintenance ops, so disable maintenance op
-  // scheduling.
-  FLAGS_enable_maintenance_manager = false;
 
   // Setup a cluster with some writes and a new WAL segment.
-  const int kTserver = 0;
   scoped_refptr<TabletReplica> replica;
   NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica));
   MiniTabletServer* ts = tserver(kTserver);
@@ -184,6 +256,7 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap)
{
 
   // Now that we're on a new WAL segment, inject latency to consensus so we
   // trigger elections, and wait for some terms to pass.
+  FLAGS_raft_enable_pre_election = false;
   FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0;
   FLAGS_consensus_inject_latency_ms_in_notifications = 100;
   const int64_t kNumExtraTerms = 10;
@@ -198,39 +271,60 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap)
{
   // where we can GC our logs. Note: we won't GC if there are replicas that
   // need to be caught up.
   FLAGS_consensus_inject_latency_ms_in_notifications = 0;
-  ASSERT_EVENTUALLY([&] {
-    LOG(INFO) << "GCing logs...";
-    int64_t gcable_size;
-    ASSERT_OK(replica->GetGCableDataSize(&gcable_size));
-    ASSERT_GT(gcable_size, 0);
-    ASSERT_OK(replica->RunLogGC());
-
-    // Ensure that we have no writes in our WALs.
-    bool has_write_replicates;
-    ASSERT_OK(CheckForWriteReplicatesInLog(ts, tablet_id, &has_write_replicates));
-    ASSERT_FALSE(has_write_replicates);
-  });
-
-  // Note: We shut down tservers individually rather than using
-  // ClusterNodes::TS, since the latter would invalidate our reference to 'ts'.
+  NO_FATALS(GCUntilNoWritesInWAL(ts, replica));
   replica.reset();
-  LOG(INFO) << "Shutting down the cluster...";
-  cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    cluster_->mini_tablet_server(i)->Shutdown();
-  }
-
-  // Now prevent elections to reduce consensus logging on the server.
-  LOG(INFO) << "Restarting single tablet server...";
-  ASSERT_OK(ts->Restart());
-  TServerDetails* ts_details = FindOrDie(ts_map_, ts->uuid());
+  ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id));
 
   // Despite there being no writes, there are no-ops, with which we can advance
   // MVCC's timestamps.
-  ASSERT_OK(WaitUntilTabletRunning(ts_details, tablet_id, MonoDelta::FromSeconds(30)));
   replica = tablet_replica_on_ts(kTserver);
   Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp();
   ASSERT_NE(cleantime, Timestamp::kInitialTimestamp);
+
+  // Verify that we can scan the replica with its MVCC timestamp raised.
+  ScanResponsePB resp = ScanResponseForTablet(kTserver, replica->tablet_id());
+  ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp);
+}
+
+// Regression test for KUDU-2463, wherein scans would return incorrect results
+// if a tablet's MVCC snapshot hasn't advanced. Currently, the only way to
+// achieve this is if the cluster is restarted, the WAL only has change
+// configs, and the tablet cannot join a quorum.
+TEST_F(TimestampAdvancementITest, Kudu2463Test) {
+  scoped_refptr<TabletReplica> replica;
+  NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica));
+  MiniTabletServer* ts = tserver(kTserver);
+
+  const string tablet_id = replica->tablet_id();
+
+  // Update one of the followers repeatedly to generate a bunch of config
+  // changes in all the replicas' WALs.
+  TServerDetails* leader;
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(ts_map_, tablet_id, kTimeout, &followers));
+  ASSERT_FALSE(followers.empty());
+  for (int i = 0; i < 20; i++) {
+    RaftPeerPB::MemberType type = i % 2 == 0 ? RaftPeerPB::NON_VOTER : RaftPeerPB::VOTER;
+    WARN_NOT_OK(ChangeReplicaType(leader, tablet_id, followers[0], type, kTimeout),
+                "Couldn't send a change config!");
+  }
+  NO_FATALS(GCUntilNoWritesInWAL(ts, replica));
+
+  // Note: we need to reset the replica reference before restarting the server.
+  replica.reset();
+  ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id));
+
+  // Now open a scanner for the server.
+  ScanResponsePB resp = ScanResponseForTablet(kTserver, tablet_id);
+
+  // Scanning the tablet should yield an error.
+  LOG(INFO) << "Got response: " << SecureShortDebugString(resp);
+  ASSERT_TRUE(resp.has_error());
+  const TabletServerErrorPB& error = resp.error();
+  ASSERT_EQ(error.code(), TabletServerErrorPB::TABLET_NOT_RUNNING);
+  ASSERT_STR_CONTAINS(resp.error().status().message(), "safe time has not yet been initialized");
+  ASSERT_EQ(error.status().code(), AppStatusPB::UNINITIALIZED);
 }
 
 // Test to ensure that MVCC's current snapshot gets updated via Raft no-ops, in

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index f86294c..5bf98df 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -685,6 +685,12 @@ std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int
idx) c
   return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
 }
 
+std::shared_ptr<TabletServerServiceProxy> ExternalMiniCluster::tserver_proxy(int idx)
const {
+  CHECK_LT(idx, tablet_servers_.size());
+  const auto& addr = CHECK_NOTNULL(tablet_server(idx))->bound_rpc_addr();
+  return std::make_shared<TabletServerServiceProxy>(messenger_, addr, addr.host());
+}
+
 Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
                                          client::sp::shared_ptr<client::KuduClient>*
client) const {
   client::KuduClientBuilder defaults;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index 6244887..9dc2f87 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -70,6 +70,10 @@ namespace server {
 class ServerStatusPB;
 } // namespace server
 
+namespace tserver {
+class TabletServerServiceProxy;
+} // namespace tserver
+
 namespace cluster {
 
 class ExternalDaemon;
@@ -296,6 +300,7 @@ class ExternalMiniCluster : public MiniCluster {
   std::shared_ptr<rpc::Messenger> messenger() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+  std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) const override;
 
   std::string block_manager_type() const {
     return opts_.block_manager_type;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/internal_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc
index e272102..7e18346 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.cc
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -38,6 +38,7 @@
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_server_options.h"
+#include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/env.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -65,6 +66,7 @@ using master::TSDescriptor;
 using std::shared_ptr;
 using tserver::MiniTabletServer;
 using tserver::TabletServer;
+using tserver::TabletServerServiceProxy;
 
 InternalMiniClusterOptions::InternalMiniClusterOptions()
   : num_masters(1),
@@ -395,6 +397,11 @@ std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int
idx) c
   return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
 }
 
+std::shared_ptr<TabletServerServiceProxy> InternalMiniCluster::tserver_proxy(int idx)
const {
+  const auto& addr = CHECK_NOTNULL(mini_tablet_server(idx))->bound_rpc_addr();
+  return std::make_shared<TabletServerServiceProxy>(messenger_, addr, addr.host());
+}
+
 string InternalMiniCluster::WalRootForTS(int ts_idx) const {
   return mini_tablet_server(ts_idx)->options()->fs_opts.wal_root;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/internal_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h
index 64b4e64..d69de98 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.h
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -37,21 +37,22 @@ class Status;
 namespace client {
 class KuduClient;
 class KuduClientBuilder;
-}
+} // namespace client
 
 namespace master {
 class MasterServiceProxy;
 class MiniMaster;
 class TSDescriptor;
-}
+} // namespace master
 
 namespace rpc {
 class Messenger;
-}
+} // namespace rpc
 
 namespace tserver {
 class MiniTabletServer;
-}
+class TabletServerServiceProxy;
+} // namespace tserver
 
 namespace cluster {
 
@@ -197,6 +198,7 @@ class InternalMiniCluster : public MiniCluster {
   std::shared_ptr<rpc::Messenger> messenger() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+  std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) const override;
 
  private:
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/mini_cluster.h b/src/kudu/mini-cluster/mini_cluster.h
index 304fd81..627a683 100644
--- a/src/kudu/mini-cluster/mini_cluster.h
+++ b/src/kudu/mini-cluster/mini_cluster.h
@@ -42,6 +42,10 @@ namespace rpc {
 class Messenger;
 } // namespace rpc
 
+namespace tserver {
+class TabletServerServiceProxy;
+} // namespace tserver
+
 namespace cluster {
 
 // Mode to which node types a certain action (like Shutdown()) should apply.
@@ -153,6 +157,10 @@ class MiniCluster {
   // master at 'idx' is running.
   virtual std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const =
0;
 
+  // Returns an RPC proxy to the tserver at 'idx'. Requires that the tserver at
+  // 'idx' is running.
+  virtual std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx)
const = 0;
+
   // Returns the UUID for the tablet server 'ts_idx'
   virtual std::string UuidForTS(int ts_idx) const = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index 21e97e0..ce65068 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -55,6 +55,15 @@ MvccManager::MvccManager()
   cur_snap_.none_committed_at_or_after_ = Timestamp::kInitialTimestamp;
 }
 
+Status MvccManager::CheckIsSafeTimeInitialized() const {
+  // We initialize the MVCC safe time and clean time at the same time, so if
+  // clean time has not been updated, neither has safe time.
+  if (GetCleanTimestamp() == Timestamp::kInitialTimestamp) {
+    return Status::Uninitialized("safe time has not yet been initialized");
+  }
+  return Status::OK();
+}
+
 void MvccManager::StartTransaction(Timestamp timestamp) {
   MAYBE_INJECT_RANDOM_LATENCY(FLAGS_inject_latency_ms_before_starting_txn);
   std::lock_guard<LockType> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 5ce1f95..347376e 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -185,6 +185,11 @@ class MvccManager {
  public:
   MvccManager();
 
+  // Returns an error if the current snapshot has not been adjusted past its
+  // initial state. While in this state, it is unsafe for the MvccManager to
+  // serve information about already-applied transactions.
+  Status CheckIsSafeTimeInitialized() const;
+
   // Begins a new transaction, which is assigned the provided timestamp.
   //
   // Requires that 'timestamp' is not committed.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tools/kudu-ts-cli-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-ts-cli-test.cc b/src/kudu/tools/kudu-ts-cli-test.cc
index 736afc8..f1d9e7a 100644
--- a/src/kudu/tools/kudu-ts-cli-test.cc
+++ b/src/kudu/tools/kudu-ts-cli-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
 using kudu::itest::TabletServerMap;
 using kudu::itest::TServerDetails;
@@ -111,13 +112,18 @@ TEST_F(KuduTsCliTest, TestDumpTablet) {
 
   string out;
   // Test for dump_tablet when there is no data in tablet.
-  ASSERT_OK(RunKuduTool({
-    "remote_replica",
-    "dump",
-    cluster_->tablet_server(0)->bound_rpc_addr().ToString(),
-    tablet_id
-  }, &out));
-  ASSERT_EQ("", out);
+  // Because it's unsafe to scan a tablet replica that hasn't advanced its
+  // clean time, i.e. one that hasn't had any writes or elections, we assert
+  // that we are able to eventually scan.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(RunKuduTool({
+      "remote_replica",
+      "dump",
+      cluster_->tablet_server(0)->bound_rpc_addr().ToString(),
+      tablet_id
+    }, &out));
+    ASSERT_EQ("", out);
+  });
 
   // Insert very little data and dump_tablet again.
   workload.Start();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index d470cf3..8957ce5 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1868,6 +1868,17 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // tablet replica's shutdown is run concurrently with the code below.
   shared_ptr<Tablet> tablet;
   RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code));
+
+  // Ensure the tablet has a valid clean time.
+  s = tablet->mvcc_manager()->CheckIsSafeTimeInitialized();
+  if (!s.ok()) {
+    LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1",
+                               tablet->tablet_id(), s.ToString());
+    // Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail
+    // over to another tablet server if fault-tolerant).
+    *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
+    return s;
+  }
   {
     TRACE("Creating iterator");
     TRACE_EVENT0("tserver", "Create iterator");


Mime
View raw message