From commits-return-6510-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Fri Oct 12 08:08:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C0B9A18072F for ; Fri, 12 Oct 2018 08:08:50 +0200 (CEST) Received: (qmail 2501 invoked by uid 500); 12 Oct 2018 06:08:48 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 2491 invoked by uid 99); 12 Oct 2018 06:08:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Oct 2018 06:08:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4FA70E005A; Fri, 12 Oct 2018 06:08:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: awong@apache.org To: commits@kudu.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kudu git commit: KUDU-2463 pt 3: don't scan if MVCC hasn't moved Date: Fri, 12 Oct 2018 06:08:33 +0000 (UTC) Repository: kudu Updated Branches: refs/heads/master bc817a448 -> 5894af6ff KUDU-2463 pt 3: don't scan if MVCC hasn't moved In cases when a tablet bootstrap yields an MvccManager whose safe time has not been advanced (e.g. if there are no write ops in the WAL), and the tablet has otherwise not bumped its MVCC timestamps (e.g. if it has not yet elected a leader), a scan (whose correctness depends on the MvccManager to determine what transactions have been applied) will return incorrect results. In the same way that we prevent compactions from occuring if MVCC's timestamps have not been moved, this patch prevents incorrect results from being returend from a scan by returning an error that can be retried elsewhere. New tests are added to attempt to scan in this state, verifying that we get an error. A couple of tests that use the mock clock are also updated so the initial timestamp assigned to a no-op is a more organic, non-zero timestamp. Change-Id: Idc0f77673e1f04a34ab1f5c1930bbaa2498b39bf Reviewed-on: http://gerrit.cloudera.org:8080/11428 Reviewed-by: Mike Percy Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5894af6f Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5894af6f Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5894af6f Branch: refs/heads/master Commit: 5894af6ff12291ffab2d3304b34f77fe6d112771 Parents: bc817a4 Author: Andrew Wong Authored: Tue Sep 4 00:55:58 2018 -0700 Committer: Andrew Wong Committed: Fri Oct 12 06:08:01 2018 +0000 ---------------------------------------------------------------------- src/kudu/integration-tests/consistency-itest.cc | 9 + .../tablet_history_gc-itest.cc | 27 +-- .../timestamp_advancement-itest.cc | 176 ++++++++++++++----- src/kudu/mini-cluster/external_mini_cluster.cc | 6 + src/kudu/mini-cluster/external_mini_cluster.h | 5 + src/kudu/mini-cluster/internal_mini_cluster.cc | 7 + src/kudu/mini-cluster/internal_mini_cluster.h | 10 +- src/kudu/mini-cluster/mini_cluster.h | 8 + src/kudu/tablet/mvcc.cc | 9 + src/kudu/tablet/mvcc.h | 5 + src/kudu/tools/kudu-ts-cli-test.cc | 20 ++- src/kudu/tserver/tablet_service.cc | 11 ++ 12 files changed, 231 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/integration-tests/consistency-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index a0a2dc9..81231cd 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -113,6 +113,15 @@ class ConsistencyITest : public MiniClusterITestBase { virtual void SetUp() override { MiniClusterITestBase::SetUp(); StartCluster(num_tablet_servers_); + + // Since we're using mock NTP rather than the hybrid clock, it's possible + // that the first timestamp assigned to a tablet message is the initial + // timestamp (0). For correctness of scans, it is illegal to scan in this + // state. As such, we bump the clock up front so when we create tablets, + // they start out with more natural, non-0 values. + for (int i = 0; i < num_tablet_servers_; i++) { + cluster_->mini_tablet_server(i)->server()->clock()->Now(); + } } void ScannerThread(KuduClient::ReplicaSelection selection, http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/integration-tests/tablet_history_gc-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc index e2ec09b..40b8ad5 100644 --- a/src/kudu/integration-tests/tablet_history_gc-itest.cc +++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc @@ -523,6 +523,23 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { FLAGS_scanner_ttl_ms = 1000 * 60 * 60 * 24; StartCluster(1); // Start InternalMiniCluster with a single tablet server. + // Since we're using mock NTP rather than the hybrid clock, it's possible + // that if we created a tablet now, the first timestamp assigned to a tablet + // message would be the initial timestamp (0). For correctness of scans, it + // is illegal to scan in this state. As such, we bump the clock up front so + // when we create tablets, they start out with more natural, non-0 values. + MiniTabletServer* mts = cluster_->mini_tablet_server(0); + + // Directly access the tserver so we can control compaction and the clock. + TabletServer* ts = mts->server(); + clock_ = down_cast(ts->clock()); + + // Set initial clock time to 1000 seconds past 0, which is enough so that the + // AHM will not be negative. + const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000; + auto* ntp = down_cast(clock_->time_service()); + ntp->SetMockClockWallTimeForTests(kInitialMicroTime); + TestWorkload workload(cluster_.get()); workload.set_num_replicas(1); workload.Setup(); // Convenient way to create a table. @@ -530,21 +547,11 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { client::sp::shared_ptr table; ASSERT_OK(client_->OpenTable(workload.table_name(), &table)); - // Directly access the Tablet so we can control compaction and the clock. - MiniTabletServer* mts = cluster_->mini_tablet_server(0); - TabletServer* ts = mts->server(); - clock_ = down_cast(ts->clock()); std::vector> replicas; ts->tablet_manager()->GetTabletReplicas(&replicas); ASSERT_EQ(1, replicas.size()); Tablet* tablet = replicas[0]->tablet(); - // Set initial clock time to 1000 seconds past 0, which is enough so that the - // AHM will not be negative. - const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000; - auto* ntp = down_cast(clock_->time_service()); - ntp->SetMockClockWallTimeForTests(kInitialMicroTime); - LOG(INFO) << "Seeding random number generator"; Random random(SeedRandom()); http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/integration-tests/timestamp_advancement-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc index be30bdf..b0b9d51 100644 --- a/src/kudu/integration-tests/timestamp_advancement-itest.cc +++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc @@ -27,13 +27,19 @@ #include #include +#include "kudu/common/common.pb.h" +#include "kudu/common/schema.h" #include "kudu/common/timestamp.h" +#include "kudu/common/wire_protocol-test-util.h" +#include "kudu/common/wire_protocol.h" +#include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/log.h" #include "kudu/consensus/log.pb.h" #include "kudu/consensus/log_index.h" #include "kudu/consensus/log_reader.h" #include "kudu/consensus/log_util.h" +#include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/raft_consensus.h" #include "kudu/fs/fs_manager.h" #include "kudu/gutil/map-util.h" @@ -44,14 +50,18 @@ #include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/mini-cluster/mini_cluster.h" +#include "kudu/rpc/rpc_controller.h" #include "kudu/tablet/mvcc.h" #include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tserver/mini_tablet_server.h" #include "kudu/tserver/tablet_server.h" #include "kudu/tserver/ts_tablet_manager.h" +#include "kudu/tserver/tserver.pb.h" +#include "kudu/tserver/tserver_service.proxy.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" +#include "kudu/util/pb_util.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -62,27 +72,49 @@ DECLARE_bool(log_async_preallocate_segments); DECLARE_bool(raft_enable_pre_election); DECLARE_double(leader_failure_max_missed_heartbeat_periods); DECLARE_int32(consensus_inject_latency_ms_in_notifications); +DECLARE_int32(heartbeat_interval_ms); DECLARE_int32(raft_heartbeat_interval_ms); namespace kudu { -using cluster::InternalMiniCluster; -using cluster::InternalMiniClusterOptions; +using consensus::RaftPeerPB; using log::LogReader; +using pb_util::SecureShortDebugString; +using rpc::RpcController; using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; using tablet::TabletReplica; using tserver::MiniTabletServer; +using tserver::NewScanRequestPB; +using tserver::ScanResponsePB; +using tserver::ScanRequestPB; +using tserver::TabletServerErrorPB; +using tserver::TabletServerServiceProxy; namespace itest { class TimestampAdvancementITest : public MiniClusterITestBase { public: + const MonoDelta kTimeout = MonoDelta::FromSeconds(30); + + // Many tests will operate on a single tserver. The first one is chosen + // arbitrarily. + const int kTserver = 0; + // Sets up a cluster and returns the tablet replica on 'ts' that has written // to its WAL. 'replica' will write further messages to a new WAL segment. void SetupClusterWithWritesInWAL(int ts, scoped_refptr* replica) { + // We're going to manually trigger maintenance ops, so disable maintenance op + // scheduling. + FLAGS_enable_maintenance_manager = false; + + // Prevent preallocation of WAL segments in order to prevent races between + // the WAL allocation thread and our manual rolling over of the WAL. + FLAGS_log_preallocate_segments = false; + FLAGS_log_async_preallocate_segments = false; + NO_FATALS(StartCluster(3)); // Write some rows to the cluster. @@ -128,6 +160,21 @@ class TimestampAdvancementITest : public MiniClusterITestBase { return replicas[0]; } + // Returns a scan response from the tablet on the given tablet server. + ScanResponsePB ScanResponseForTablet(int ts, const string& tablet_id) const { + ScanResponsePB resp; + RpcController rpc; + ScanRequestPB req; + NewScanRequestPB* scan = req.mutable_new_scan_request(); + scan->set_tablet_id(tablet_id); + scan->set_read_mode(READ_LATEST); + const Schema schema = GetSimpleTestSchema(); + CHECK_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns())); + shared_ptr tserver_proxy = cluster_->tserver_proxy(ts); + CHECK_OK(tserver_proxy->Scan(req, &resp, &rpc)); + return resp; + } + // Returns true if there are any write replicate messages in the WALs of // 'tablet_id' on 'ts'. Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id, @@ -156,27 +203,52 @@ class TimestampAdvancementITest : public MiniClusterITestBase { *has_write_replicates = false; return Status::OK(); } + + // Repeatedly GCs the replica's WALs until there are no more write replicates + // in the WAL. + void GCUntilNoWritesInWAL(MiniTabletServer* tserver, + scoped_refptr replica) { + ASSERT_EVENTUALLY([&] { + LOG(INFO) << "GCing logs..."; + int64_t gcable_size; + ASSERT_OK(replica->GetGCableDataSize(&gcable_size)); + ASSERT_GT(gcable_size, 0); + ASSERT_OK(replica->RunLogGC()); + + // Ensure that we have no writes in our WALs. + bool has_write_replicates; + ASSERT_OK(CheckForWriteReplicatesInLog(tserver, replica->tablet_id(), + &has_write_replicates)); + ASSERT_FALSE(has_write_replicates); + }); + } + + // Shuts down all the nodes in a cluster and restarts the given tserver. + // Waits for the given replica on the tserver to start. + Status ShutdownAllNodesAndRestartTserver(MiniTabletServer* tserver, + const string& tablet_id) { + LOG(INFO) << "Shutting down cluster..."; + cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); + // Note: We shut down tservers individually rather than using + // ClusterNodes::TS, since the latter would invalidate our reference to + // 'tserver'. + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + cluster_->mini_tablet_server(i)->Shutdown(); + } + LOG(INFO) << "Restarting single tablet server..."; + RETURN_NOT_OK(tserver->Restart()); + TServerDetails* ts_details = FindOrDie(ts_map_, tserver->uuid()); + return WaitUntilTabletRunning(ts_details, tablet_id, kTimeout); + } }; // Test that bootstrapping a Raft no-op from the WAL will advance the replica's // MVCC safe time timestamps. TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { - // Set a low Raft heartbeat and encourage frequent elections so that we can - // fill up the WAL with no-op entries naturally. + // Set a low Raft heartbeat interval so we can inject churn elections. FLAGS_raft_heartbeat_interval_ms = 100; - FLAGS_raft_enable_pre_election = false; - - // Prevent preallocation of WAL segments in order to prevent races between - // the WAL allocation thread and our manual rolling over of the WAL. - FLAGS_log_preallocate_segments = false; - FLAGS_log_async_preallocate_segments = false; - - // We're going to manually trigger maintenance ops, so disable maintenance op - // scheduling. - FLAGS_enable_maintenance_manager = false; // Setup a cluster with some writes and a new WAL segment. - const int kTserver = 0; scoped_refptr replica; NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica)); MiniTabletServer* ts = tserver(kTserver); @@ -184,6 +256,7 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { // Now that we're on a new WAL segment, inject latency to consensus so we // trigger elections, and wait for some terms to pass. + FLAGS_raft_enable_pre_election = false; FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0; FLAGS_consensus_inject_latency_ms_in_notifications = 100; const int64_t kNumExtraTerms = 10; @@ -198,39 +271,60 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { // where we can GC our logs. Note: we won't GC if there are replicas that // need to be caught up. FLAGS_consensus_inject_latency_ms_in_notifications = 0; - ASSERT_EVENTUALLY([&] { - LOG(INFO) << "GCing logs..."; - int64_t gcable_size; - ASSERT_OK(replica->GetGCableDataSize(&gcable_size)); - ASSERT_GT(gcable_size, 0); - ASSERT_OK(replica->RunLogGC()); - - // Ensure that we have no writes in our WALs. - bool has_write_replicates; - ASSERT_OK(CheckForWriteReplicatesInLog(ts, tablet_id, &has_write_replicates)); - ASSERT_FALSE(has_write_replicates); - }); - - // Note: We shut down tservers individually rather than using - // ClusterNodes::TS, since the latter would invalidate our reference to 'ts'. + NO_FATALS(GCUntilNoWritesInWAL(ts, replica)); replica.reset(); - LOG(INFO) << "Shutting down the cluster..."; - cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); - for (int i = 0; i < cluster_->num_tablet_servers(); i++) { - cluster_->mini_tablet_server(i)->Shutdown(); - } - - // Now prevent elections to reduce consensus logging on the server. - LOG(INFO) << "Restarting single tablet server..."; - ASSERT_OK(ts->Restart()); - TServerDetails* ts_details = FindOrDie(ts_map_, ts->uuid()); + ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); // Despite there being no writes, there are no-ops, with which we can advance // MVCC's timestamps. - ASSERT_OK(WaitUntilTabletRunning(ts_details, tablet_id, MonoDelta::FromSeconds(30))); replica = tablet_replica_on_ts(kTserver); Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp(); ASSERT_NE(cleantime, Timestamp::kInitialTimestamp); + + // Verify that we can scan the replica with its MVCC timestamp raised. + ScanResponsePB resp = ScanResponseForTablet(kTserver, replica->tablet_id()); + ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); +} + +// Regression test for KUDU-2463, wherein scans would return incorrect results +// if a tablet's MVCC snapshot hasn't advanced. Currently, the only way to +// achieve this is if the cluster is restarted, the WAL only has change +// configs, and the tablet cannot join a quorum. +TEST_F(TimestampAdvancementITest, Kudu2463Test) { + scoped_refptr replica; + NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica)); + MiniTabletServer* ts = tserver(kTserver); + + const string tablet_id = replica->tablet_id(); + + // Update one of the followers repeatedly to generate a bunch of config + // changes in all the replicas' WALs. + TServerDetails* leader; + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader)); + vector followers; + ASSERT_OK(FindTabletFollowers(ts_map_, tablet_id, kTimeout, &followers)); + ASSERT_FALSE(followers.empty()); + for (int i = 0; i < 20; i++) { + RaftPeerPB::MemberType type = i % 2 == 0 ? RaftPeerPB::NON_VOTER : RaftPeerPB::VOTER; + WARN_NOT_OK(ChangeReplicaType(leader, tablet_id, followers[0], type, kTimeout), + "Couldn't send a change config!"); + } + NO_FATALS(GCUntilNoWritesInWAL(ts, replica)); + + // Note: we need to reset the replica reference before restarting the server. + replica.reset(); + ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); + + // Now open a scanner for the server. + ScanResponsePB resp = ScanResponseForTablet(kTserver, tablet_id); + + // Scanning the tablet should yield an error. + LOG(INFO) << "Got response: " << SecureShortDebugString(resp); + ASSERT_TRUE(resp.has_error()); + const TabletServerErrorPB& error = resp.error(); + ASSERT_EQ(error.code(), TabletServerErrorPB::TABLET_NOT_RUNNING); + ASSERT_STR_CONTAINS(resp.error().status().message(), "safe time has not yet been initialized"); + ASSERT_EQ(error.status().code(), AppStatusPB::UNINITIALIZED); } // Test to ensure that MVCC's current snapshot gets updated via Raft no-ops, in http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/mini-cluster/external_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc index f86294c..5bf98df 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.cc +++ b/src/kudu/mini-cluster/external_mini_cluster.cc @@ -685,6 +685,12 @@ std::shared_ptr ExternalMiniCluster::master_proxy(int idx) c return std::make_shared(messenger_, addr, addr.host()); } +std::shared_ptr ExternalMiniCluster::tserver_proxy(int idx) const { + CHECK_LT(idx, tablet_servers_.size()); + const auto& addr = CHECK_NOTNULL(tablet_server(idx))->bound_rpc_addr(); + return std::make_shared(messenger_, addr, addr.host()); +} + Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder, client::sp::shared_ptr* client) const { client::KuduClientBuilder defaults; http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/mini-cluster/external_mini_cluster.h ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h index 6244887..9dc2f87 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.h +++ b/src/kudu/mini-cluster/external_mini_cluster.h @@ -70,6 +70,10 @@ namespace server { class ServerStatusPB; } // namespace server +namespace tserver { +class TabletServerServiceProxy; +} // namespace tserver + namespace cluster { class ExternalDaemon; @@ -296,6 +300,7 @@ class ExternalMiniCluster : public MiniCluster { std::shared_ptr messenger() const override; std::shared_ptr master_proxy() const override; std::shared_ptr master_proxy(int idx) const override; + std::shared_ptr tserver_proxy(int idx) const override; std::string block_manager_type() const { return opts_.block_manager_type; http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/mini-cluster/internal_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc index e272102..7e18346 100644 --- a/src/kudu/mini-cluster/internal_mini_cluster.cc +++ b/src/kudu/mini-cluster/internal_mini_cluster.cc @@ -38,6 +38,7 @@ #include "kudu/tserver/mini_tablet_server.h" #include "kudu/tserver/tablet_server.h" #include "kudu/tserver/tablet_server_options.h" +#include "kudu/tserver/tserver_service.proxy.h" #include "kudu/util/env.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" @@ -65,6 +66,7 @@ using master::TSDescriptor; using std::shared_ptr; using tserver::MiniTabletServer; using tserver::TabletServer; +using tserver::TabletServerServiceProxy; InternalMiniClusterOptions::InternalMiniClusterOptions() : num_masters(1), @@ -395,6 +397,11 @@ std::shared_ptr InternalMiniCluster::master_proxy(int idx) c return std::make_shared(messenger_, addr, addr.host()); } +std::shared_ptr InternalMiniCluster::tserver_proxy(int idx) const { + const auto& addr = CHECK_NOTNULL(mini_tablet_server(idx))->bound_rpc_addr(); + return std::make_shared(messenger_, addr, addr.host()); +} + string InternalMiniCluster::WalRootForTS(int ts_idx) const { return mini_tablet_server(ts_idx)->options()->fs_opts.wal_root; } http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/mini-cluster/internal_mini_cluster.h ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h index 64b4e64..d69de98 100644 --- a/src/kudu/mini-cluster/internal_mini_cluster.h +++ b/src/kudu/mini-cluster/internal_mini_cluster.h @@ -37,21 +37,22 @@ class Status; namespace client { class KuduClient; class KuduClientBuilder; -} +} // namespace client namespace master { class MasterServiceProxy; class MiniMaster; class TSDescriptor; -} +} // namespace master namespace rpc { class Messenger; -} +} // namespace rpc namespace tserver { class MiniTabletServer; -} +class TabletServerServiceProxy; +} // namespace tserver namespace cluster { @@ -197,6 +198,7 @@ class InternalMiniCluster : public MiniCluster { std::shared_ptr messenger() const override; std::shared_ptr master_proxy() const override; std::shared_ptr master_proxy(int idx) const override; + std::shared_ptr tserver_proxy(int idx) const override; private: http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/mini-cluster/mini_cluster.h ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/mini_cluster.h b/src/kudu/mini-cluster/mini_cluster.h index 304fd81..627a683 100644 --- a/src/kudu/mini-cluster/mini_cluster.h +++ b/src/kudu/mini-cluster/mini_cluster.h @@ -42,6 +42,10 @@ namespace rpc { class Messenger; } // namespace rpc +namespace tserver { +class TabletServerServiceProxy; +} // namespace tserver + namespace cluster { // Mode to which node types a certain action (like Shutdown()) should apply. @@ -153,6 +157,10 @@ class MiniCluster { // master at 'idx' is running. virtual std::shared_ptr master_proxy(int idx) const = 0; + // Returns an RPC proxy to the tserver at 'idx'. Requires that the tserver at + // 'idx' is running. + virtual std::shared_ptr tserver_proxy(int idx) const = 0; + // Returns the UUID for the tablet server 'ts_idx' virtual std::string UuidForTS(int ts_idx) const = 0; http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/tablet/mvcc.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc index 21e97e0..ce65068 100644 --- a/src/kudu/tablet/mvcc.cc +++ b/src/kudu/tablet/mvcc.cc @@ -55,6 +55,15 @@ MvccManager::MvccManager() cur_snap_.none_committed_at_or_after_ = Timestamp::kInitialTimestamp; } +Status MvccManager::CheckIsSafeTimeInitialized() const { + // We initialize the MVCC safe time and clean time at the same time, so if + // clean time has not been updated, neither has safe time. + if (GetCleanTimestamp() == Timestamp::kInitialTimestamp) { + return Status::Uninitialized("safe time has not yet been initialized"); + } + return Status::OK(); +} + void MvccManager::StartTransaction(Timestamp timestamp) { MAYBE_INJECT_RANDOM_LATENCY(FLAGS_inject_latency_ms_before_starting_txn); std::lock_guard l(lock_); http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/tablet/mvcc.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h index 5ce1f95..347376e 100644 --- a/src/kudu/tablet/mvcc.h +++ b/src/kudu/tablet/mvcc.h @@ -185,6 +185,11 @@ class MvccManager { public: MvccManager(); + // Returns an error if the current snapshot has not been adjusted past its + // initial state. While in this state, it is unsafe for the MvccManager to + // serve information about already-applied transactions. + Status CheckIsSafeTimeInitialized() const; + // Begins a new transaction, which is assigned the provided timestamp. // // Requires that 'timestamp' is not committed. http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/tools/kudu-ts-cli-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/kudu-ts-cli-test.cc b/src/kudu/tools/kudu-ts-cli-test.cc index 736afc8..f1d9e7a 100644 --- a/src/kudu/tools/kudu-ts-cli-test.cc +++ b/src/kudu/tools/kudu-ts-cli-test.cc @@ -37,6 +37,7 @@ #include "kudu/util/net/sockaddr.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" using kudu::itest::TabletServerMap; using kudu::itest::TServerDetails; @@ -111,13 +112,18 @@ TEST_F(KuduTsCliTest, TestDumpTablet) { string out; // Test for dump_tablet when there is no data in tablet. - ASSERT_OK(RunKuduTool({ - "remote_replica", - "dump", - cluster_->tablet_server(0)->bound_rpc_addr().ToString(), - tablet_id - }, &out)); - ASSERT_EQ("", out); + // Because it's unsafe to scan a tablet replica that hasn't advanced its + // clean time, i.e. one that hasn't had any writes or elections, we assert + // that we are able to eventually scan. + ASSERT_EVENTUALLY([&] { + ASSERT_OK(RunKuduTool({ + "remote_replica", + "dump", + cluster_->tablet_server(0)->bound_rpc_addr().ToString(), + tablet_id + }, &out)); + ASSERT_EQ("", out); + }); // Insert very little data and dump_tablet again. workload.Start(); http://git-wip-us.apache.org/repos/asf/kudu/blob/5894af6f/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index d470cf3..8957ce5 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -1868,6 +1868,17 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, // tablet replica's shutdown is run concurrently with the code below. shared_ptr tablet; RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code)); + + // Ensure the tablet has a valid clean time. + s = tablet->mvcc_manager()->CheckIsSafeTimeInitialized(); + if (!s.ok()) { + LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1", + tablet->tablet_id(), s.ToString()); + // Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail + // over to another tablet server if fault-tolerant). + *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; + return s; + } { TRACE("Creating iterator"); TRACE_EVENT0("tserver", "Create iterator");