kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/2] kudu git commit: tablet copy: Ensure no data loss on tablet copy failure
Date Thu, 04 May 2017 20:47:23 GMT
Repository: kudu
Updated Branches:
  refs/heads/master dcd029eca -> 5ae27d733


tablet copy: Ensure no data loss on tablet copy failure

This patch adds an integration test to ensure that we don't see data
loss on tablet copy failure. Tablet copy failure is triggered a couple
of different ways at the source server side.

This also implements the ThreadSafeRandom::NextDoubleFraction() method,
which existed in the Random class but was missing from the thread-safe
version.

Change-Id: Ie57590e2473e09a6836bb02d5fccb7689614f8e7
Reviewed-on: http://gerrit.cloudera.org:8080/6732
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/89806c6f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/89806c6f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/89806c6f

Branch: refs/heads/master
Commit: 89806c6f69e28343beef99f31c1c81118e4019a1
Parents: dcd029e
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Apr 24 10:49:47 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu May 4 20:44:41 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/cluster_itest_util.cc     |  52 ++++-
 src/kudu/integration-tests/cluster_itest_util.h |  14 +-
 .../create-table-stress-test.cc                 |   5 +-
 .../integration-tests/delete_tablet-itest.cc    |   2 +-
 .../external_mini_cluster-itest-base.cc         |   2 +-
 src/kudu/integration-tests/tablet_copy-itest.cc | 207 ++++++++++++++++++-
 src/kudu/integration-tests/ts_itest-base.h      |   2 +-
 src/kudu/integration-tests/ts_recovery-itest.cc |   4 +-
 .../ts_tablet_manager-itest.cc                  |   5 +-
 src/kudu/tools/kudu-tool-test.cc                |   2 +-
 src/kudu/tserver/tablet_copy_service.cc         |  23 ++-
 src/kudu/tserver/tablet_copy_service.h          |   2 +
 src/kudu/util/random.h                          |   5 +
 13 files changed, 299 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index e7b44b3..31c6bbe 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -68,6 +68,7 @@ using consensus::RunLeaderElectionResponsePB;
 using consensus::RunLeaderElectionRequestPB;
 using consensus::kInvalidOpIdIndex;
 using master::ListTabletServersResponsePB;
+using master::ListTabletServersResponsePB_Entry;
 using master::MasterServiceProxy;
 using master::TabletLocationsPB;
 using rpc::Messenger;
@@ -229,21 +230,15 @@ Status WaitUntilAllReplicasHaveOp(const int64_t log_index,
                                      log_index, passed.ToString(), replicas_str));
 }
 
-Status CreateTabletServerMap(MasterServiceProxy* master_proxy,
+Status CreateTabletServerMap(const shared_ptr<MasterServiceProxy>& master_proxy,
                              const shared_ptr<Messenger>& messenger,
                              unordered_map<string, TServerDetails*>* ts_map) {
-  master::ListTabletServersRequestPB req;
-  master::ListTabletServersResponsePB resp;
-  rpc::RpcController controller;
-
-  RETURN_NOT_OK(master_proxy->ListTabletServers(req, &resp, &controller));
-  RETURN_NOT_OK(controller.status());
-  if (resp.has_error()) {
-    return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error()));
-  }
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  vector<ListTabletServersResponsePB_Entry> tservers;
+  RETURN_NOT_OK(ListTabletServers(master_proxy, kTimeout, &tservers));
 
   ts_map->clear();
-  for (const ListTabletServersResponsePB::Entry& entry : resp.servers()) {
+  for (const auto& entry : tservers) {
     HostPort host_port;
     RETURN_NOT_OK(HostPortFromPB(entry.registration().rpc_addresses(0), &host_port));
     vector<Sockaddr> addresses;
@@ -347,6 +342,41 @@ Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
                                      SecureShortDebugString(cstate), s.ToString()));
 }
 
+Status ListTabletServers(
+    const shared_ptr<MasterServiceProxy>& master_proxy,
+    const MonoDelta& timeout,
+    vector<ListTabletServersResponsePB_Entry>* tservers) {
+  master::ListTabletServersRequestPB req;
+  master::ListTabletServersResponsePB resp;
+  rpc::RpcController controller;
+  controller.set_timeout(timeout);
+
+  RETURN_NOT_OK(master_proxy->ListTabletServers(req, &resp, &controller));
+  RETURN_NOT_OK(controller.status());
+  if (resp.has_error()) {
+    return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error()));
+  }
+  tservers->assign(resp.servers().begin(), resp.servers().end());
+  return Status::OK();
+}
+
+Status WaitForNumTabletServers(
+    const shared_ptr<MasterServiceProxy>& master_proxy,
+    int num_servers, const MonoDelta& timeout) {
+  const MonoTime kStartTime = MonoTime::Now();
+  const MonoTime kDeadline = kStartTime + timeout;
+  vector<ListTabletServersResponsePB_Entry> tservers;
+  while (MonoTime::Now() < kDeadline) {
+    RETURN_NOT_OK(ListTabletServers(master_proxy, kDeadline - MonoTime::Now(), &tservers));
+    if (tservers.size() >= num_servers) return Status::OK();
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+
+  return Status::TimedOut(Substitute(
+      "Timed out waiting for $0 tablet servers to be registered with the master. Found $1",
+      num_servers, tservers.size()));
+}
+
 Status WaitForReplicasReportedToMaster(
     const shared_ptr<master::MasterServiceProxy>& master_proxy,
     int num_replicas, const string& tablet_id,

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 01be8c8..883ea8d 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -98,7 +98,7 @@ client::KuduSchema SimpleIntKeyKuduSchema();
 // Create a populated TabletServerMap by interrogating the master.
 // Note: The bare-pointer TServerDetails values must be deleted by the caller!
 // Consider using ValueDeleter (in gutil/stl_util.h) for that.
-Status CreateTabletServerMap(master::MasterServiceProxy* master_proxy,
+Status CreateTabletServerMap(const std::shared_ptr<master::MasterServiceProxy>&
master_proxy,
                              const std::shared_ptr<rpc::Messenger>& messenger,
                              std::unordered_map<std::string, TServerDetails*>* ts_map);
 
@@ -156,6 +156,18 @@ Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
                                            const std::string& tablet_id,
                                            const MonoDelta& timeout);
 
+// List the tablet servers registered with the specified master.
+Status ListTabletServers(
+    const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
+    const MonoDelta& timeout,
+    std::vector<master::ListTabletServersResponsePB_Entry>* tservers);
+
+// Wait for *at least* the specified number of tablet servers to be registered
+// with the master.
+Status WaitForNumTabletServers(
+    const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
+    int num_servers, const MonoDelta& timeout);
+
 enum WaitForLeader {
   DONT_WAIT_FOR_LEADER = 0,
   WAIT_FOR_LEADER = 1

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/create-table-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc
index 9e1abed..4926753 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -54,6 +54,7 @@ DECLARE_int32(heartbeat_interval_ms);
 DECLARE_bool(log_preallocate_segments);
 DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test");
 
+using std::shared_ptr;
 using std::thread;
 using std::unique_ptr;
 using strings::Substitute;
@@ -100,7 +101,7 @@ class CreateTableStressTest : public KuduTest {
               .Build(&messenger_));
     master_proxy_.reset(new MasterServiceProxy(messenger_,
                                                cluster_->mini_master()->bound_rpc_addr()));
-    ASSERT_OK(CreateTabletServerMap(master_proxy_.get(), messenger_, &ts_map_));
+    ASSERT_OK(CreateTabletServerMap(master_proxy_, messenger_, &ts_map_));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -115,7 +116,7 @@ class CreateTableStressTest : public KuduTest {
   unique_ptr<MiniCluster> cluster_;
   KuduSchema schema_;
   std::shared_ptr<Messenger> messenger_;
-  unique_ptr<MasterServiceProxy> master_proxy_;
+  shared_ptr<MasterServiceProxy> master_proxy_;
   TabletServerMap ts_map_;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/delete_tablet-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_tablet-itest.cc b/src/kudu/integration-tests/delete_tablet-itest.cc
index 5cc292c..af95003 100644
--- a/src/kudu/integration-tests/delete_tablet-itest.cc
+++ b/src/kudu/integration-tests/delete_tablet-itest.cc
@@ -47,7 +47,7 @@ TEST_F(DeleteTabletITest, TestDeleteFailedReplica) {
 
   std::unordered_map<std::string, itest::TServerDetails*> ts_map;
   ValueDeleter del(&ts_map);
-  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
                                          cluster_->messenger(),
                                          &ts_map));
   auto* mts = cluster_->mini_tablet_server(0);

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
index 62a18a1..f0a030c 100644
--- a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
+++ b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
@@ -48,7 +48,7 @@ void ExternalMiniClusterITestBase::StartCluster(
   cluster_.reset(new ExternalMiniCluster(opts));
   ASSERT_OK(cluster_->Start());
   inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
-  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
                                          cluster_->messenger(),
                                          &ts_map_));
   ASSERT_OK(cluster_->CreateClient(nullptr, &client_));

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 9a6f51d..9b16c4a 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -55,19 +55,23 @@ using kudu::client::KuduSchemaFromSchema;
 using kudu::client::KuduTableCreator;
 using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
 using kudu::itest::TServerDetails;
+using kudu::itest::WaitForNumTabletServers;
 using kudu::tablet::TABLET_DATA_DELETED;
 using kudu::tablet::TABLET_DATA_TOMBSTONED;
 using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::ListTabletsResponsePB_StatusAndSchemaPB;
 using kudu::tserver::TabletCopyClient;
 using std::string;
 using std::unordered_map;
 using std::vector;
+using strings::Substitute;
 
 METRIC_DECLARE_entity(server);
 METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_UpdateConsensus);
 METRIC_DECLARE_counter(glog_info_messages);
 METRIC_DECLARE_counter(glog_warning_messages);
 METRIC_DECLARE_counter(glog_error_messages);
+METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 
 namespace kudu {
 
@@ -623,7 +627,17 @@ TEST_F(TabletCopyITest, TestDeleteLeaderDuringTabletCopyStressTest) {
 }
 
 namespace {
-int64_t CountUpdateConsensusCalls(ExternalTabletServer* ets, const string& tablet_id)
{
+int64_t CountBlocksUnderManagement(ExternalTabletServer* ets) {
+  int64_t ret;
+  CHECK_OK(ets->GetInt64Metric(
+               &METRIC_ENTITY_server,
+               "kudu.tabletserver",
+               &METRIC_log_block_manager_blocks_under_management,
+               "value",
+               &ret));
+  return ret;
+}
+int64_t CountUpdateConsensusCalls(ExternalTabletServer* ets) {
   int64_t ret;
   CHECK_OK(ets->GetInt64Metric(
                &METRIC_ENTITY_server,
@@ -712,11 +726,11 @@ TEST_F(TabletCopyITest, TestDisableTabletCopy_NoTightLoopWhenTabletDeleted)
{
   // a) we don't spew logs on the leader side
   // b) we don't get hit with a lot of UpdateConsensus calls on the replica.
   MonoTime start = MonoTime::Now();
-  int64_t num_update_rpcs_initial = CountUpdateConsensusCalls(replica_ets, tablet_id);
+  int64_t num_update_rpcs_initial = CountUpdateConsensusCalls(replica_ets);
   int64_t num_logs_initial = CountLogMessages(leader_ts);
 
   SleepFor(MonoDelta::FromSeconds(1));
-  int64_t num_update_rpcs_after_sleep = CountUpdateConsensusCalls(replica_ets, tablet_id);
+  int64_t num_update_rpcs_after_sleep = CountUpdateConsensusCalls(replica_ets);
   int64_t num_logs_after_sleep = CountLogMessages(leader_ts);
   MonoTime end = MonoTime::Now();
   MonoDelta elapsed = end - start;
@@ -915,4 +929,191 @@ TEST_F(TabletCopyITest, TestTabletCopyThrottling) {
   LOG(INFO) << "Number of Service unavailable responses: " << num_service_unavailable;
 }
 
+// This test uses CountBlocksUnderManagement() which only works with the
+// LogBlockManager.
+#ifndef __APPLE__
+
+class BadTabletCopyITest : public TabletCopyITest,
+                           public ::testing::WithParamInterface<const char*> {
+ protected:
+  // Helper function to load a table with the specified amount of data and
+  // ensure the specified number of blocks are persisted on one of the replicas.
+  void LoadTable(TestWorkload* workload, int min_rows, int min_blocks);
+};
+
+// Tablet copy should either trigger a crash or a session timeout on the source
+// server.
+const char* kFlagFaultOnFetch = "fault_crash_on_handle_tc_fetch_data";
+const char* kFlagEarlyTimeout = "tablet_copy_early_session_timeout_prob";
+const char* tablet_copy_failure_flags[] = { kFlagFaultOnFetch, kFlagEarlyTimeout };
+INSTANTIATE_TEST_CASE_P(FaultFlags, BadTabletCopyITest,
+                        ::testing::ValuesIn(tablet_copy_failure_flags));
+
+void BadTabletCopyITest::LoadTable(TestWorkload* workload, int min_rows, int min_blocks)
{
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  // Always include TS 1 and check its blocks. This is always included.
+  const int kReplicaIdx = 1;
+
+  // Find the replicas for the table.
+  string tablet_id;
+  itest::TabletServerMap replicas;
+  for (const auto& entry : ts_map_) {
+    TServerDetails* ts = entry.second;
+    vector<ListTabletsResponsePB_StatusAndSchemaPB> tablets;
+    Status s = ListTablets(ts, kTimeout, &tablets);
+    if (!s.ok()) {
+      continue;
+    }
+    for (const auto& tablet : tablets) {
+      if (tablet.tablet_status().table_name() == workload->table_name()) {
+        replicas.insert(entry);
+        tablet_id = tablet.tablet_status().tablet_id();
+      }
+    }
+  }
+  ASSERT_EQ(3, replicas.size());
+  ASSERT_TRUE(ContainsKey(replicas, cluster_->tablet_server(kReplicaIdx)->uuid()));
+
+  int64_t before_blocks = CountBlocksUnderManagement(cluster_->tablet_server(kReplicaIdx));
+  int64_t after_blocks;
+  int64_t blocks_diff;
+  workload->Start();
+  do {
+    after_blocks = CountBlocksUnderManagement(cluster_->tablet_server(kReplicaIdx));
+    blocks_diff = after_blocks - before_blocks;
+    KLOG_EVERY_N_SECS(INFO, 1) << "Blocks diff: " << blocks_diff;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  } while (workload->rows_inserted() < min_rows || blocks_diff < min_blocks);
+  workload->StopAndJoin();
+
+  // Ensure all 3 replicas have replicated the same data.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, replicas, tablet_id, 0));
+}
+
+// Ensure that a tablet copy failure results in no orphaned blocks and no data loss.
+TEST_P(BadTabletCopyITest, TestBadCopy) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "Not running " << CURRENT_TEST_NAME() << " because
it is a slow test.";
+    return;
+  }
+
+  // Load 2 tablets with 3 replicas each across 3 tablet servers s.t. we end up
+  // with a replication distribution like: ([A], [A,B], [A,B], [B]).
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  const int kNumTabletServers = 4;
+  const int kMinRows = 10000;
+  const int kMinBlocks = 10;
+  const string& failure_flag = GetParam();
+  StartCluster(
+      {
+        // Flush aggressively to create many blocks.
+        "--flush_threshold_mb=0",
+        "--maintenance_manager_polling_interval_ms=10",
+        // Wait only 10 seconds before evicting a failed replica.
+        "--follower_unavailable_considered_failed_sec=10",
+        Substitute("--$0=1.0", failure_flag),
+      }, {
+        // Wait only 5 seconds before master decides TS not eligible for a replica.
+        "--tserver_unresponsive_timeout_ms=5000",
+      }, kNumTabletServers);
+
+  // Don't allow TS 3 to get a copy of Table A.
+  cluster_->tablet_server(3)->Shutdown();
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+  ASSERT_OK(WaitForNumTabletServers(cluster_->master_proxy(), 3, kTimeout));
+
+  // Create Table A.
+  TestWorkload workload1(cluster_.get());
+  const char* kTableA = "table_a";
+  workload1.set_table_name(kTableA);
+  workload1.Setup();
+  ASSERT_OK(cluster_->tablet_server(3)->Restart());
+
+  // Load Table A.
+  LoadTable(&workload1, kMinRows, kMinBlocks);
+
+  // Don't allow TS 0 to get a copy of Table B.
+  cluster_->tablet_server(0)->Shutdown();
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+  ASSERT_OK(WaitForNumTabletServers(cluster_->master_proxy(), 3, kTimeout));
+
+  // Create Table B.
+  TestWorkload workload2(cluster_.get());
+  const char* kTableB = "table_b";
+  workload2.set_table_name(kTableB);
+  workload2.Setup();
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+  // Load Table B.
+  LoadTable(&workload2, kMinRows, kMinBlocks);
+
+  // Shut down replica with only [A].
+  cluster_->tablet_server(0)->Shutdown();
+
+  // The leader of A will evict TS 0 and the master will trigger re-replication to TS 3.
+
+  if (failure_flag == kFlagFaultOnFetch) {
+    // Tablet copy will cause the leader tablet server for A (either TS 1 or TS 2)
+    // to crash because of --fault_crash_on_handle_tc_fetch_data=1.0
+    // This will also cause the tablet copy session to fail and the new replica
+    // on TS 3 to be tombstoned.
+    bool crashed = false;
+    while (!crashed) {
+      for (int i : {1, 2}) {
+        Status s = cluster_->tablet_server(i)
+            ->WaitForInjectedCrash(MonoDelta::FromMilliseconds(10));
+        if (s.ok()) {
+          crashed = true;
+          break;
+        }
+      }
+    }
+  } else {
+    // The early timeout flag will also cause the tablet copy to fail, but
+    // without crashing the source TS.
+    ASSERT_EQ(kFlagEarlyTimeout, failure_flag);
+  }
+
+  // Wait for TS 3 to tombstone the replica that failed to copy.
+  TServerDetails* ts = ts_map_[cluster_->tablet_server(3)->uuid()];
+  AssertEventually([&] {
+    vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB> tablets;
+    ASSERT_OK(ListTablets(ts, MonoDelta::FromSeconds(10), &tablets));
+    ASSERT_EQ(2, tablets.size());
+    int num_tombstoned = 0;
+    for (const auto& t : tablets) {
+      if (t.tablet_status().tablet_data_state() == TABLET_DATA_TOMBSTONED) {
+        num_tombstoned++;
+      }
+    }
+    ASSERT_EQ(1, num_tombstoned);
+  });
+  NO_FATALS();
+
+  // Restart the whole cluster without failure injection so that it can finish
+  // re-replicating.
+  cluster_->Shutdown();
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster_->tablet_server(i)->mutable_flags()
+        ->push_back(Substitute("--$0=0.0", failure_flag));
+  }
+  ASSERT_OK(cluster_->Restart());
+
+  // Run ksck, ensure no data loss.
+  LOG(INFO) << "Running ksck...";
+  ClusterVerifier v(cluster_.get());
+  v.SetVerificationTimeout(kTimeout);
+  NO_FATALS(v.CheckCluster());
+
+  LOG(INFO) << "Checking row count...";
+  for (TestWorkload* workload : {&workload1, &workload2}) {
+    NO_FATALS(v.CheckRowCount(workload->table_name(), ClusterVerifier::AT_LEAST,
+                              workload->rows_inserted()));
+  }
+}
+
+#endif // __APPLE__
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index 3a88ed1..898a349 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -128,7 +128,7 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
   // in 'tablet_servers_'.
   void CreateTSProxies() {
     CHECK(tablet_servers_.empty());
-    CHECK_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+    CHECK_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
                                           client_messenger_,
                                           &tablet_servers_));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index bbc3c11..dd85464 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -275,7 +275,7 @@ TEST_F(TsRecoveryITest, TestChangeMaxCellSize) {
   std::unordered_map<std::string, itest::TServerDetails*> ts_map;
   ValueDeleter del(&ts_map);
 
-  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
                                          cluster_->messenger(),
                                          &ts_map));
   ASSERT_EVENTUALLY([&]() {
@@ -320,7 +320,7 @@ TEST_F(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
 
   std::unordered_map<std::string, itest::TServerDetails*> ts_map;
   ValueDeleter del(&ts_map);
-  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
                                          cluster_->messenger(),
                                          &ts_map));
   vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/integration-tests/ts_tablet_manager-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 2a3f0d3..f841313 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -65,6 +65,7 @@ using master::ReportedTabletPB;
 using master::TabletReportPB;
 using rpc::Messenger;
 using rpc::MessengerBuilder;
+using std::shared_ptr;
 using strings::Substitute;
 using tablet::TabletPeer;
 using tserver::MiniTabletServer;
@@ -126,11 +127,11 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
   ASSERT_OK(client_->OpenTable(kTableName, &table));
 
   // Build a TServerDetails map so we can check for convergence.
-  gscoped_ptr<MasterServiceProxy> master_proxy(
+  shared_ptr<MasterServiceProxy> master_proxy(
       new MasterServiceProxy(client_messenger_, cluster_->mini_master()->bound_rpc_addr()));
 
   itest::TabletServerMap ts_map;
-  ASSERT_OK(CreateTabletServerMap(master_proxy.get(), client_messenger_, &ts_map));
+  ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map));
   ValueDeleter deleter(&ts_map);
 
   // Collect the tablet peers so we get direct access to consensus.

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index c2c45ba..d996e1e 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -277,7 +277,7 @@ void ToolTest::StartExternalMiniCluster(const vector<string>&
extra_master_flags
   cluster_.reset(new ExternalMiniCluster(cluster_opts_));
   ASSERT_OK(cluster_->Start());
   inspect_.reset(new ExternalMiniClusterFsInspector(cluster_.get()));
-  ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy().get(),
+  ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(),
                                   cluster_->messenger(), &ts_map_));
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/tserver/tablet_copy_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
index 37e686e..74a34c5 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/random_util.h"
 
 #define RPC_RETURN_NOT_OK(expr, app_err, message, context) \
   do { \
@@ -62,6 +63,11 @@ DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0,
               "(For testing only!)");
 TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe);
 
+DEFINE_double(tablet_copy_early_session_timeout_prob, 0,
+              "The probability that a tablet copy session will time out early, "
+              "resulting in tablet copy failure. (For testing only!)");
+TAG_FLAG(tablet_copy_early_session_timeout_prob, unsafe);
+
 using strings::Substitute;
 
 namespace kudu {
@@ -83,6 +89,7 @@ TabletCopyServiceImpl::TabletCopyServiceImpl(
       server_(server),
       fs_manager_(CHECK_NOTNULL(server->fs_manager())),
       tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
+      rand_(GetRandomSeed32()),
       shutdown_latch_(1) {
   CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
                           &TabletCopyServiceImpl::EndExpiredSessions, this,
@@ -152,6 +159,19 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
     resp->add_wal_segment_seqnos(segment->header().sequence_number());
   }
 
+  // For testing: Close the session prematurely if unsafe gflag is set but
+  // still respond as if it was opened.
+  if (PREDICT_FALSE(FLAGS_tablet_copy_early_session_timeout_prob > 0 &&
+      rand_.NextDoubleFraction() <= FLAGS_tablet_copy_early_session_timeout_prob)) {
+    LOG_WITH_PREFIX(WARNING) << "Timing out tablet copy session due to flag "
+                             << "--tablet_copy_early_session_timeout_prob "
+                             << "being set to " << FLAGS_tablet_copy_early_session_timeout_prob;
+    MutexLock l(sessions_lock_);
+    TabletCopyErrorPB::Code app_error;
+    WARN_NOT_OK(TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(session_id, &app_error),
+                Substitute("Unable to forcibly end tablet copy session $0", session_id));
+  }
+
   context->RespondSuccess();
 }
 
@@ -266,7 +286,8 @@ void TabletCopyServiceImpl::Shutdown() {
     LOG_WITH_PREFIX(INFO) << "Destroying tablet copy session " << session_id
                           << " due to service shutdown";
     TabletCopyErrorPB::Code app_error;
-    CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
+    WARN_NOT_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error),
+                "Unable to end tablet copy session during service shutdown");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/tserver/tablet_copy_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.h b/src/kudu/tserver/tablet_copy_service.h
index a49772d..81b1be2 100644
--- a/src/kudu/tserver/tablet_copy_service.h
+++ b/src/kudu/tserver/tablet_copy_service.h
@@ -27,6 +27,7 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
@@ -116,6 +117,7 @@ class TabletCopyServiceImpl : public TabletCopyServiceIf {
   // Protects sessions_ map.
   mutable Mutex sessions_lock_;
   SessionMap sessions_;
+  ThreadSafeRandom rand_;
 
   // Session expiration thread.
   // TODO(mpercy): This is a hack, replace some kind of timer. See KUDU-286.

http://git-wip-us.apache.org/repos/asf/kudu/blob/89806c6f/src/kudu/util/random.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/random.h b/src/kudu/util/random.h
index ae89da6..242c42d 100644
--- a/src/kudu/util/random.h
+++ b/src/kudu/util/random.h
@@ -205,6 +205,11 @@ class ThreadSafeRandom {
     return random_.Normal(mean, std_dev);
   }
 
+  double NextDoubleFraction() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.NextDoubleFraction();
+  }
+
   template<class Collection, class Set, class T>
   void ReservoirSample(const Collection& c, int k, const Set& avoid,
                        std::vector<T>* result) {


Mime
View raw message