kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [5/5] kudu git commit: Rename remote bootstrap files to 'tablet copy'
Date Sun, 07 Aug 2016 03:56:53 GMT
Rename remote bootstrap files to 'tablet copy'

Change-Id: I3f31fb72d11d16ee05bf90be3a7c77a82e5db6f7
Reviewed-on: http://gerrit.cloudera.org:8080/3853
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/647f904b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/647f904b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/647f904b

Branch: refs/heads/master
Commit: 647f904b5d59a1342bd777a07c04245c590b7201
Parents: 7ee61f8
Author: Todd Lipcon <todd@apache.org>
Authored: Fri Aug 5 14:59:05 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Sun Aug 7 03:56:31 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |   2 +-
 .../integration-tests/remote_bootstrap-itest.cc | 807 -------------------
 src/kudu/integration-tests/tablet_copy-itest.cc | 807 +++++++++++++++++++
 src/kudu/tserver/CMakeLists.txt                 |  22 +-
 src/kudu/tserver/remote_bootstrap-test-base.h   | 126 ---
 src/kudu/tserver/remote_bootstrap.proto         | 204 -----
 .../tserver/remote_bootstrap_client-test.cc     | 241 ------
 src/kudu/tserver/remote_bootstrap_client.cc     | 563 -------------
 src/kudu/tserver/remote_bootstrap_client.h      | 210 -----
 .../tserver/remote_bootstrap_service-test.cc    | 491 -----------
 src/kudu/tserver/remote_bootstrap_service.cc    | 357 --------
 src/kudu/tserver/remote_bootstrap_service.h     | 112 ---
 .../tserver/remote_bootstrap_session-test.cc    | 334 --------
 src/kudu/tserver/remote_bootstrap_session.cc    | 386 ---------
 src/kudu/tserver/remote_bootstrap_session.h     | 199 -----
 src/kudu/tserver/tablet_copy-test-base.h        | 126 +++
 src/kudu/tserver/tablet_copy.proto              | 204 +++++
 src/kudu/tserver/tablet_copy_client-test.cc     | 241 ++++++
 src/kudu/tserver/tablet_copy_client.cc          | 563 +++++++++++++
 src/kudu/tserver/tablet_copy_client.h           | 210 +++++
 src/kudu/tserver/tablet_copy_service-test.cc    | 491 +++++++++++
 src/kudu/tserver/tablet_copy_service.cc         | 357 ++++++++
 src/kudu/tserver/tablet_copy_service.h          | 112 +++
 src/kudu/tserver/tablet_copy_session-test.cc    | 334 ++++++++
 src/kudu/tserver/tablet_copy_session.cc         | 386 +++++++++
 src/kudu/tserver/tablet_copy_session.h          | 199 +++++
 src/kudu/tserver/tablet_server-test-base.h      |   2 +-
 src/kudu/tserver/tablet_server.cc               |   2 +-
 src/kudu/tserver/tablet_service.cc              |   2 +-
 src/kudu/tserver/ts_tablet_manager.cc           |   2 +-
 30 files changed, 4046 insertions(+), 4046 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 0c2d449..5b99b7f 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -59,8 +59,8 @@ ADD_KUDU_TEST(master_replication-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(master-stress-test RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
 ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
-ADD_KUDU_TEST(remote_bootstrap-itest)
 ADD_KUDU_TEST(table_locations-itest)
+ADD_KUDU_TEST(tablet_copy-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
 ADD_KUDU_TEST(ts_recovery-itest)
 ADD_KUDU_TEST(ts_tablet_manager-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/integration-tests/remote_bootstrap-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/remote_bootstrap-itest.cc b/src/kudu/integration-tests/remote_bootstrap-itest.cc
deleted file mode 100644
index ec854bc..0000000
--- a/src/kudu/integration-tests/remote_bootstrap-itest.cc
+++ /dev/null
@@ -1,807 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/optional.hpp>
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
-#include <string>
-#include <unordered_map>
-
-#include "kudu/client/client-test-util.h"
-#include "kudu/client/client.h"
-#include "kudu/common/wire_protocol-test-util.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/integration-tests/cluster_verifier.h"
-#include "kudu/integration-tests/external_mini_cluster.h"
-#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
-#include "kudu/integration-tests/test_workload.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tablet/tablet_metadata.h"
-#include "kudu/tserver/remote_bootstrap_client.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/pstack_watcher.h"
-#include "kudu/util/test_util.h"
-
-DEFINE_int32(test_delete_leader_num_iters, 3,
-             "Number of iterations to run in TestDeleteLeaderDuringTabletCopyStressTest.");
-DEFINE_int32(test_delete_leader_min_rows_per_iter, 20,
-             "Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
-DEFINE_int32(test_delete_leader_payload_bytes, 16 * 1024,
-             "Payload byte size in TestDeleteLeaderDuringTabletCopyStressTest.");
-DEFINE_int32(test_delete_leader_num_writer_threads, 1,
-             "Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
-
-using kudu::client::KuduClient;
-using kudu::client::KuduClientBuilder;
-using kudu::client::KuduSchema;
-using kudu::client::KuduSchemaFromSchema;
-using kudu::client::KuduTableCreator;
-using kudu::client::sp::shared_ptr;
-using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
-using kudu::itest::TServerDetails;
-using kudu::tablet::TABLET_DATA_DELETED;
-using kudu::tablet::TABLET_DATA_TOMBSTONED;
-using kudu::tserver::ListTabletsResponsePB;
-using kudu::tserver::TabletCopyClient;
-using std::string;
-using std::unordered_map;
-using std::vector;
-using strings::Substitute;
-
-METRIC_DECLARE_entity(server);
-METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_UpdateConsensus);
-METRIC_DECLARE_counter(glog_info_messages);
-METRIC_DECLARE_counter(glog_warning_messages);
-METRIC_DECLARE_counter(glog_error_messages);
-
-namespace kudu {
-
-class TabletCopyITest : public KuduTest {
- public:
-  virtual void TearDown() OVERRIDE {
-    if (HasFatalFailure()) {
-      LOG(INFO) << "Found fatal failure";
-      for (int i = 0; i < 3; i++) {
-        if (!cluster_->tablet_server(i)->IsProcessAlive()) {
-          LOG(INFO) << "Tablet server " << i << " is not running. Cannot dump its stacks.";
-          continue;
-        }
-        LOG(INFO) << "Attempting to dump stacks of TS " << i
-                  << " with UUID " << cluster_->tablet_server(i)->uuid()
-                  << " and pid " << cluster_->tablet_server(i)->pid();
-        WARN_NOT_OK(PstackWatcher::DumpPidStacks(cluster_->tablet_server(i)->pid()),
-                    "Couldn't dump stacks");
-      }
-    }
-    if (cluster_) cluster_->Shutdown();
-    KuduTest::TearDown();
-    STLDeleteValues(&ts_map_);
-  }
-
- protected:
-  void StartCluster(const vector<string>& extra_tserver_flags = vector<string>(),
-                    const vector<string>& extra_master_flags = vector<string>(),
-                    int num_tablet_servers = 3);
-
-  gscoped_ptr<ExternalMiniCluster> cluster_;
-  gscoped_ptr<itest::ExternalMiniClusterFsInspector> inspect_;
-  shared_ptr<KuduClient> client_;
-  unordered_map<string, TServerDetails*> ts_map_;
-};
-
-void TabletCopyITest::StartCluster(const vector<string>& extra_tserver_flags,
-                                        const vector<string>& extra_master_flags,
-                                        int num_tablet_servers) {
-  ExternalMiniClusterOptions opts;
-  opts.num_tablet_servers = num_tablet_servers;
-  opts.extra_tserver_flags = extra_tserver_flags;
-  // Enable EO semantics for tests.
-  // TODO remove this once EO is the default.
-  opts.extra_tserver_flags.push_back("--enable_exactly_once");
-  opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
-  opts.extra_master_flags = extra_master_flags;
-  cluster_.reset(new ExternalMiniCluster(opts));
-  ASSERT_OK(cluster_->Start());
-  inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
-  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
-                                          cluster_->messenger(),
-                                          &ts_map_));
-  KuduClientBuilder builder;
-  ASSERT_OK(cluster_->CreateClient(builder, &client_));
-}
-
-// If a rogue (a.k.a. zombie) leader tries to replace a tombstoned
-// tablet via Tablet Copy, make sure its term isn't older than the latest term
-// we observed. If it is older, make sure we reject the request, to avoid allowing old
-// leaders to create a parallel universe. This is possible because config
-// change could cause nodes to move around. The term check is reasonable
-// because only one node can be elected leader for a given term.
-//
-// A leader can "go rogue" due to a VM pause, CTRL-z, partition, etc.
-TEST_F(TabletCopyITest, TestRejectRogueLeader) {
-  // This test pauses for at least 10 seconds. Only run in slow-test mode.
-  if (!AllowSlowTests()) {
-    LOG(INFO) << "Skipping test in fast-test mode.";
-    return;
-  }
-
-  vector<string> ts_flags, master_flags;
-  ts_flags.push_back("--enable_leader_failure_detection=false");
-  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
-  NO_FATALS(StartCluster(ts_flags, master_flags));
-
-  const MonoDelta timeout = MonoDelta::FromSeconds(30);
-  const int kTsIndex = 0; // We'll test with the first TS.
-  TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
-
-  TestWorkload workload(cluster_.get());
-  workload.Setup();
-
-  // Figure out the tablet id of the created tablet.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                            tablet_id, timeout));
-  }
-
-  // Elect a leader for term 1, then run some data through the cluster.
-  int zombie_leader_index = 1;
-  string zombie_leader_uuid = cluster_->tablet_server(zombie_leader_index)->uuid();
-  ASSERT_OK(itest::StartElection(ts_map_[zombie_leader_uuid], tablet_id, timeout));
-  workload.Start();
-  while (workload.rows_inserted() < 100) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  workload.StopAndJoin();
-
-  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
-
-  // Come out of the blue and try to initiate Tablet Copy from a running server while
-  // specifying an old term. That running server should reject the request.
-  // We are essentially masquerading as a rogue leader here.
-  Status s = itest::StartTabletCopy(ts, tablet_id, zombie_leader_uuid,
-                                         HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
-                                         0, // Say I'm from term 0.
-                                         timeout);
-  ASSERT_TRUE(s.IsInvalidArgument());
-  ASSERT_STR_CONTAINS(s.ToString(), "term 0 lower than last logged term 1");
-
-  // Now pause the actual leader so we can bring him back as a zombie later.
-  ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Pause());
-
-  // Trigger TS 2 to become leader of term 2.
-  int new_leader_index = 2;
-  string new_leader_uuid = cluster_->tablet_server(new_leader_index)->uuid();
-  ASSERT_OK(itest::StartElection(ts_map_[new_leader_uuid], tablet_id, timeout));
-  ASSERT_OK(itest::WaitUntilLeader(ts_map_[new_leader_uuid], tablet_id, timeout));
-
-  unordered_map<string, TServerDetails*> active_ts_map = ts_map_;
-  ASSERT_EQ(1, active_ts_map.erase(zombie_leader_uuid));
-
-  // Wait for the NO_OP entry from the term 2 election to propagate to the
-  // remaining nodes' logs so that we are guaranteed to reject the rogue
-  // leader's tablet copy request when we bring it back online.
-  int log_index = workload.batches_completed() + 2; // 2 terms == 2 additional NO_OP entries.
-  ASSERT_OK(WaitForServersToAgree(timeout, active_ts_map, tablet_id, log_index));
-  // TODO: Write more rows to the new leader once KUDU-1034 is fixed.
-
-  // Now kill the new leader and tombstone the replica on TS 0.
-  cluster_->tablet_server(new_leader_index)->Shutdown();
-  ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
-
-  // Zombies!!! Resume the rogue zombie leader.
-  // He should attempt to tablet copy TS 0 but fail.
-  ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Resume());
-
-  // Loop for a few seconds to ensure that the tablet doesn't transition to READY.
-  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
-  deadline.AddDelta(MonoDelta::FromSeconds(5));
-  while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
-    ASSERT_OK(itest::ListTablets(ts, timeout, &tablets));
-    ASSERT_EQ(1, tablets.size());
-    ASSERT_EQ(TABLET_DATA_TOMBSTONED, tablets[0].tablet_status().tablet_data_state());
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-
-  // Force the rogue leader to step down.
-  // Then, send a tablet copy start request from a "fake" leader that
-  // sends an up-to-date term in the RB request but the actual term stored
-  // in the copy source's consensus metadata would still be old.
-  LOG(INFO) << "Forcing rogue leader T " << tablet_id << " P " << zombie_leader_uuid
-            << " to step down...";
-  ASSERT_OK(itest::LeaderStepDown(ts_map_[zombie_leader_uuid], tablet_id, timeout));
-  ExternalTabletServer* zombie_ets = cluster_->tablet_server(zombie_leader_index);
-  // It's not necessarily part of the API but this could return faliure due to
-  // rejecting the remote. We intend to make that part async though, so ignoring
-  // this return value in this test.
-  ignore_result(itest::StartTabletCopy(ts, tablet_id, zombie_leader_uuid,
-                                            HostPort(zombie_ets->bound_rpc_addr()),
-                                            2, // Say I'm from term 2.
-                                            timeout));
-
-  // Wait another few seconds to be sure the tablet copy is rejected.
-  deadline = MonoTime::Now(MonoTime::FINE);
-  deadline.AddDelta(MonoDelta::FromSeconds(5));
-  while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
-    ASSERT_OK(itest::ListTablets(ts, timeout, &tablets));
-    ASSERT_EQ(1, tablets.size());
-    ASSERT_EQ(TABLET_DATA_TOMBSTONED, tablets[0].tablet_status().tablet_data_state());
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-}
-
-// Start tablet copy session and delete the tablet in the middle.
-// It should actually be possible to complete copying in such a case, because
-// when a Tablet Copy session is started on the "source" server, all of
-// the relevant files are either read or opened, meaning that an in-progress
-// Tablet Copy can complete even after a tablet is officially "deleted" on
-// the source server. This is also a regression test for KUDU-1009.
-TEST_F(TabletCopyITest, TestDeleteTabletDuringTabletCopy) {
-  MonoDelta timeout = MonoDelta::FromSeconds(10);
-  const int kTsIndex = 0; // We'll test with the first TS.
-  NO_FATALS(StartCluster());
-
-  // Populate a tablet with some data.
-  TestWorkload workload(cluster_.get());
-  workload.Setup();
-  workload.Start();
-  while (workload.rows_inserted() < 1000) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-
-  // Figure out the tablet id of the created tablet.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
-  ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Ensure all the servers agree before we proceed.
-  workload.StopAndJoin();
-  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
-
-  // Set up an FsManager to use with the TabletCopyClient.
-  FsManagerOpts opts;
-  string testbase = GetTestPath("fake-ts");
-  ASSERT_OK(env_->CreateDir(testbase));
-  opts.wal_path = JoinPathSegments(testbase, "wals");
-  opts.data_paths.push_back(JoinPathSegments(testbase, "data-0"));
-  gscoped_ptr<FsManager> fs_manager(new FsManager(env_.get(), opts));
-  ASSERT_OK(fs_manager->CreateInitialFileSystemLayout());
-  ASSERT_OK(fs_manager->Open());
-
-  {
-    // Start up a TabletCopyClient and open a tablet copy session.
-    TabletCopyClient tc_client(tablet_id, fs_manager.get(),
-                                    cluster_->messenger());
-    scoped_refptr<tablet::TabletMetadata> meta;
-    ASSERT_OK(tc_client.Start(cluster_->tablet_server(kTsIndex)->bound_rpc_hostport(),
-                              &meta));
-
-    // Tombstone the tablet on the remote!
-    ASSERT_OK(itest::DeleteTablet(ts, tablet_id,
-                                  TABLET_DATA_TOMBSTONED, boost::none, timeout));
-
-    // Now finish copying!
-    tablet::TabletStatusListener listener(meta);
-    ASSERT_OK(tc_client.FetchAll(&listener));
-    ASSERT_OK(tc_client.Finish());
-
-    // Run destructor, which closes the remote session.
-  }
-
-  SleepFor(MonoDelta::FromMilliseconds(50));  // Give a little time for a crash (KUDU-1009).
-  ASSERT_TRUE(cluster_->tablet_server(kTsIndex)->IsProcessAlive());
-}
-
-// This test ensures that a leader can Tablet Copy on top of a tombstoned replica
-// that has a higher term recorded in the replica's consensus metadata if the
-// replica's last-logged opid has the same term (or less) as the leader serving
-// as the tablet copy source. When a tablet is tombstoned, its last-logged
-// opid is stored in a field its on-disk superblock.
-TEST_F(TabletCopyITest, TestTabletCopyFollowerWithHigherTerm) {
-  vector<string> ts_flags, master_flags;
-  ts_flags.push_back("--enable_leader_failure_detection=false");
-  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
-  const int kNumTabletServers = 2;
-  NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
-
-  const MonoDelta timeout = MonoDelta::FromSeconds(30);
-  const int kFollowerIndex = 0;
-  TServerDetails* follower_ts = ts_map_[cluster_->tablet_server(kFollowerIndex)->uuid()];
-
-  TestWorkload workload(cluster_.get());
-  workload.set_num_replicas(2);
-  workload.Setup();
-
-  // Figure out the tablet id of the created tablet.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ASSERT_OK(WaitForNumTabletsOnTS(follower_ts, 1, timeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                            tablet_id, timeout));
-  }
-
-  // Elect a leader for term 1, then run some data through the cluster.
-  const int kLeaderIndex = 1;
-  TServerDetails* leader_ts = ts_map_[cluster_->tablet_server(kLeaderIndex)->uuid()];
-  ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
-  workload.Start();
-  while (workload.rows_inserted() < 100) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  workload.StopAndJoin();
-
-  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
-
-  // Pause the leader and increment the term on the follower by starting an
-  // election on the follower. The election will fail asynchronously but we
-  // just wait until we see that its term has incremented.
-  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Pause());
-  ASSERT_OK(itest::StartElection(follower_ts, tablet_id, timeout));
-  int64_t term = 0;
-  for (int i = 0; i < 1000; i++) {
-    consensus::ConsensusStatePB cstate;
-    ASSERT_OK(itest::GetConsensusState(follower_ts, tablet_id, CONSENSUS_CONFIG_COMMITTED,
-                                       timeout, &cstate));
-    term = cstate.current_term();
-    if (term == 2) break;
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  ASSERT_EQ(2, term);
-
-  // Now tombstone the follower.
-  ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
-                                timeout));
-
-  // Restart the follower's TS so that the leader's TS won't get its queued
-  // vote request messages. This is a hack but seems to work.
-  cluster_->tablet_server(kFollowerIndex)->Shutdown();
-  ASSERT_OK(cluster_->tablet_server(kFollowerIndex)->Restart());
-
-  // Now wake the leader. It should detect that the follower needs to be
-  // copied and proceed to bring it back up to date.
-  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Resume());
-
-  // Wait for the follower to come back up.
-  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
-}
-
-// Test that multiple concurrent tablet copys do not cause problems.
-// This is a regression test for KUDU-951, in which concurrent sessions on
-// multiple tablets between the same tablet copy client host and source host
-// could corrupt each other.
-TEST_F(TabletCopyITest, TestConcurrentTabletCopys) {
-  if (!AllowSlowTests()) {
-    LOG(INFO) << "Skipping test in fast-test mode.";
-    return;
-  }
-
-  vector<string> ts_flags, master_flags;
-  ts_flags.push_back("--enable_leader_failure_detection=false");
-  ts_flags.push_back("--log_cache_size_limit_mb=1");
-  ts_flags.push_back("--log_segment_size_mb=1");
-  ts_flags.push_back("--log_async_preallocate_segments=false");
-  ts_flags.push_back("--log_min_segments_to_retain=100");
-  ts_flags.push_back("--flush_threshold_mb=0"); // Constantly flush.
-  ts_flags.push_back("--maintenance_manager_polling_interval_ms=10");
-  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
-  NO_FATALS(StartCluster(ts_flags, master_flags));
-
-  const MonoDelta timeout = MonoDelta::FromSeconds(60);
-
-  // Create a table with several tablets. These will all be simultaneously
-  // copied to a single target node from the same leader host.
-  const int kNumTablets = 10;
-  KuduSchema client_schema(KuduSchemaFromSchema(GetSimpleTestSchema()));
-  vector<const KuduPartialRow*> splits;
-  for (int i = 0; i < kNumTablets - 1; i++) {
-    KuduPartialRow* row = client_schema.NewRow();
-    ASSERT_OK(row->SetInt32(0, numeric_limits<int32_t>::max() / kNumTablets * (i + 1)));
-    splits.push_back(row);
-  }
-  gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
-  ASSERT_OK(table_creator->table_name(TestWorkload::kDefaultTableName)
-                          .split_rows(splits)
-                          .schema(&client_schema)
-                          .set_range_partition_columns({ "key" })
-                          .num_replicas(3)
-                          .Create());
-
-  const int kTsIndex = 0; // We'll test with the first TS.
-  TServerDetails* target_ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
-
-  // Figure out the tablet ids of the created tablets.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ASSERT_OK(WaitForNumTabletsOnTS(target_ts, kNumTablets, timeout, &tablets));
-
-  vector<string> tablet_ids;
-  for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
-    tablet_ids.push_back(t.tablet_status().tablet_id());
-  }
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    for (const string& tablet_id : tablet_ids) {
-      ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                              tablet_id, timeout));
-    }
-  }
-
-  // Elect leaders on each tablet for term 1. All leaders will be on TS 1.
-  const int kLeaderIndex = 1;
-  const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
-  for (const string& tablet_id : tablet_ids) {
-    ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, timeout));
-  }
-
-  TestWorkload workload(cluster_.get());
-  workload.set_write_timeout_millis(10000);
-  workload.set_timeout_allowed(true);
-  workload.set_write_batch_size(10);
-  workload.set_num_write_threads(10);
-  workload.Setup();
-  workload.Start();
-  while (workload.rows_inserted() < 20000) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  workload.StopAndJoin();
-
-  for (const string& tablet_id : tablet_ids) {
-    ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
-  }
-
-  // Now pause the leader so we can tombstone the tablets.
-  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Pause());
-
-  for (const string& tablet_id : tablet_ids) {
-    LOG(INFO) << "Tombstoning tablet " << tablet_id << " on TS " << target_ts->uuid();
-    ASSERT_OK(itest::DeleteTablet(target_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
-                                  MonoDelta::FromSeconds(10)));
-  }
-
-  // Unpause the leader TS and wait for it to initiate Tablet Copy and replace the tombstoned
-  // tablets, in parallel.
-  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Resume());
-  for (const string& tablet_id : tablet_ids) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(target_ts, tablet_id, timeout));
-  }
-
-  ClusterVerifier v(cluster_.get());
-  NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
-                            workload.rows_inserted()));
-}
-
-// Test that repeatedly runs a load, tombstones a follower, then tombstones the
-// leader while the follower is copying. Regression test for
-// KUDU-1047.
-TEST_F(TabletCopyITest, TestDeleteLeaderDuringTabletCopyStressTest) {
-  // This test takes a while due to failure detection.
-  if (!AllowSlowTests()) {
-    LOG(INFO) << "Skipping test in fast-test mode.";
-    return;
-  }
-
-  const MonoDelta timeout = MonoDelta::FromSeconds(60);
-  NO_FATALS(StartCluster(vector<string>(), vector<string>(), 5));
-
-  TestWorkload workload(cluster_.get());
-  workload.set_num_replicas(5);
-  workload.set_payload_bytes(FLAGS_test_delete_leader_payload_bytes);
-  workload.set_num_write_threads(FLAGS_test_delete_leader_num_writer_threads);
-  workload.set_write_batch_size(1);
-  workload.set_write_timeout_millis(10000);
-  workload.set_timeout_allowed(true);
-  workload.set_not_found_allowed(true);
-  workload.Setup();
-
-  // Figure out the tablet id.
-  const int kTsIndex = 0;
-  TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                            tablet_id, timeout));
-  }
-
-  int leader_index = -1;
-  int follower_index = -1;
-  TServerDetails* leader_ts = nullptr;
-  TServerDetails* follower_ts = nullptr;
-
-  for (int i = 0; i < FLAGS_test_delete_leader_num_iters; i++) {
-    LOG(INFO) << "Iteration " << (i + 1);
-    int rows_previously_inserted = workload.rows_inserted();
-
-    // Find out who's leader.
-    ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, timeout, &leader_ts));
-    leader_index = cluster_->tablet_server_index_by_uuid(leader_ts->uuid());
-
-    // Select an arbitrary follower.
-    follower_index = (leader_index + 1) % cluster_->num_tablet_servers();
-    follower_ts = ts_map_[cluster_->tablet_server(follower_index)->uuid()];
-
-    // Spin up the workload.
-    workload.Start();
-    while (workload.rows_inserted() < rows_previously_inserted +
-                                      FLAGS_test_delete_leader_min_rows_per_iter) {
-      SleepFor(MonoDelta::FromMilliseconds(10));
-    }
-
-    // Tombstone the follower.
-    LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << follower_ts->uuid();
-    ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
-                                  timeout));
-
-    // Wait for tablet copy to start.
-    ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
-        follower_index, tablet_id,
-        { tablet::TABLET_DATA_COPYING, tablet::TABLET_DATA_READY },
-        timeout));
-
-    // Tombstone the leader.
-    LOG(INFO) << "Tombstoning leader tablet " << tablet_id << " on TS " << leader_ts->uuid();
-    ASSERT_OK(itest::DeleteTablet(leader_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
-                                  timeout));
-
-    // Quiesce and rebuild to full strength. This involves electing a new
-    // leader from the remaining three, which requires a unanimous vote, and
-    // that leader then copying the old leader.
-    workload.StopAndJoin();
-    ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
-  }
-
-  ClusterVerifier v(cluster_.get());
-  NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
-                            workload.rows_inserted()));
-}
-
-namespace {
-int64_t CountUpdateConsensusCalls(ExternalTabletServer* ets, const string& tablet_id) {
-  int64_t ret;
-  CHECK_OK(ets->GetInt64Metric(
-               &METRIC_ENTITY_server,
-               "kudu.tabletserver",
-               &METRIC_handler_latency_kudu_consensus_ConsensusService_UpdateConsensus,
-               "total_count",
-               &ret));
-  return ret;
-}
-int64_t CountLogMessages(ExternalTabletServer* ets) {
-  int64_t total = 0;
-
-  int64_t count;
-  CHECK_OK(ets->GetInt64Metric(
-               &METRIC_ENTITY_server,
-               "kudu.tabletserver",
-               &METRIC_glog_info_messages,
-               "value",
-               &count));
-  total += count;
-
-  CHECK_OK(ets->GetInt64Metric(
-               &METRIC_ENTITY_server,
-               "kudu.tabletserver",
-               &METRIC_glog_warning_messages,
-               "value",
-               &count));
-  total += count;
-
-  CHECK_OK(ets->GetInt64Metric(
-               &METRIC_ENTITY_server,
-               "kudu.tabletserver",
-               &METRIC_glog_error_messages,
-               "value",
-               &count));
-  total += count;
-
-  return total;
-}
-} // anonymous namespace
-
-// Test that if tablet copy is disabled by a flag, we don't get into
-// tight loops after a tablet is deleted. This is a regression test for situation
-// similar to the bug described in KUDU-821: we were previously handling a missing
-// tablet within consensus in such a way that we'd immediately send another RPC.
-TEST_F(TabletCopyITest, TestDisableTabletCopy_NoTightLoopWhenTabletDeleted) {
-  MonoDelta timeout = MonoDelta::FromSeconds(10);
-  vector<string> ts_flags, master_flags;
-  ts_flags.push_back("--enable_leader_failure_detection=false");
-  ts_flags.push_back("--enable_tablet_copy=false");
-  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
-  NO_FATALS(StartCluster(ts_flags, master_flags));
-
-  TestWorkload workload(cluster_.get());
-  workload.set_write_batch_size(1);
-  workload.Setup();
-
-  // Figure out the tablet id of the created tablet.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ExternalTabletServer* replica_ets = cluster_->tablet_server(1);
-  TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
-  ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                            tablet_id, timeout));
-  }
-
-  // Elect a leader (TS 0)
-  ExternalTabletServer* leader_ts = cluster_->tablet_server(0);
-  ASSERT_OK(itest::StartElection(ts_map_[leader_ts->uuid()], tablet_id, timeout));
-
-  // Start writing, wait for some rows to be inserted.
-  workload.Start();
-  while (workload.rows_inserted() < 100) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-
-  // Tombstone the tablet on one of the servers (TS 1)
-  ASSERT_OK(itest::DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
-                                timeout));
-
-  // Ensure that, if we sleep for a second while still doing writes to the leader:
-  // a) we don't spew logs on the leader side
-  // b) we don't get hit with a lot of UpdateConsensus calls on the replica.
-  int64_t num_update_rpcs_initial = CountUpdateConsensusCalls(replica_ets, tablet_id);
-  int64_t num_logs_initial = CountLogMessages(leader_ts);
-
-  SleepFor(MonoDelta::FromSeconds(1));
-  int64_t num_update_rpcs_after_sleep = CountUpdateConsensusCalls(replica_ets, tablet_id);
-  int64_t num_logs_after_sleep = CountLogMessages(leader_ts);
-
-  // Calculate rate per second of RPCs and log messages
-  int64_t update_rpcs_per_second = num_update_rpcs_after_sleep - num_update_rpcs_initial;
-  EXPECT_LT(update_rpcs_per_second, 20);
-  int64_t num_logs_per_second = num_logs_after_sleep - num_logs_initial;
-  EXPECT_LT(num_logs_per_second, 20);
-}
-
-// Test that if a Tablet Copy is taking a long time but the client peer is still responsive,
-// the leader won't mark it as failed.
-TEST_F(TabletCopyITest, TestSlowCopyDoesntFail) {
-  MonoDelta timeout = MonoDelta::FromSeconds(30);
-  vector<string> ts_flags, master_flags;
-  ts_flags.push_back("--enable_leader_failure_detection=false");
-  ts_flags.push_back("--tablet_copy_dowload_file_inject_latency_ms=5000");
-  ts_flags.push_back("--follower_unavailable_considered_failed_sec=2");
-  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
-  NO_FATALS(StartCluster(ts_flags, master_flags));
-
-  TestWorkload workload(cluster_.get());
-  workload.set_write_batch_size(1);
-  workload.Setup();
-
-  // Figure out the tablet id of the created tablet.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ExternalTabletServer* replica_ets = cluster_->tablet_server(1);
-  TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
-  ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                            tablet_id, timeout));
-  }
-
-  // Elect a leader (TS 0)
-  ExternalTabletServer* leader_ts = cluster_->tablet_server(0);
-  ASSERT_OK(itest::StartElection(ts_map_[leader_ts->uuid()], tablet_id, timeout));
-
-  // Start writing, wait for some rows to be inserted.
-  workload.Start();
-  while (workload.rows_inserted() < 100) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-
-
-  // Tombstone the follower.
-  LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << replica_ts->uuid();
-  ASSERT_OK(itest::DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
-                                timeout));
-
-  // Wait for tablet copy to start.
-  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(1, tablet_id,
-                                                 { tablet::TABLET_DATA_COPYING }, timeout));
-
-  workload.StopAndJoin();
-  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
-
-  ClusterVerifier v(cluster_.get());
-  NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
-                            workload.rows_inserted()));
-}
-
-// Attempting to start Tablet Copy on a tablet that was deleted with
-// TABLET_DATA_DELETED should fail. This behavior helps avoid thrashing when
-// a follower tablet is deleted and the leader notices before it has processed
-// its own DeleteTablet RPC, thinking that it needs to bring its follower back.
-TEST_F(TabletCopyITest, TestTabletCopyingDeletedTabletFails) {
-  // Delete the leader with TABLET_DATA_DELETED.
-  // Attempt to manually copy a replica to the leader from a follower.
-  // Should get an error saying it's illegal.
-
-  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
-  NO_FATALS(StartCluster({"--enable_leader_failure_detection=false"},
-                         {"--catalog_manager_wait_for_new_tablets_to_elect_leader=false"}));
-
-  TestWorkload workload(cluster_.get());
-  workload.set_num_replicas(3);
-  workload.Setup();
-
-  TServerDetails* leader = ts_map_[cluster_->tablet_server(0)->uuid()];
-
-  // Figure out the tablet id of the created tablet.
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ASSERT_OK(WaitForNumTabletsOnTS(leader, 1, kTimeout, &tablets));
-  string tablet_id = tablets[0].tablet_status().tablet_id();
-
-  // Wait until all replicas are up and running.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
-                                            tablet_id, kTimeout));
-  }
-
-  // Elect a leader for term 1, then run some data through the cluster.
-  ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
-  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
-
-  // Now delete the leader with TABLET_DATA_DELETED. We should not be able to
-  // bring back the leader after that until restarting the process.
-  ASSERT_OK(itest::DeleteTablet(leader, tablet_id, TABLET_DATA_DELETED, boost::none, kTimeout));
-
-  Status s = itest::StartTabletCopy(leader, tablet_id,
-                                         cluster_->tablet_server(1)->uuid(),
-                                         HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
-                                         1, // We are in term 1.
-                                         kTimeout);
-  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "Cannot transition from state TABLET_DATA_DELETED");
-
-  // Restart the server so that it won't remember the tablet was permanently
-  // deleted and we can tablet copy the server again.
-  cluster_->tablet_server(0)->Shutdown();
-  ASSERT_OK(cluster_->tablet_server(0)->Restart());
-
-  ASSERT_OK(itest::StartTabletCopy(leader, tablet_id,
-                                        cluster_->tablet_server(1)->uuid(),
-                                        HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
-                                        1, // We are in term 1.
-                                        kTimeout));
-  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
-}
-
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
new file mode 100644
index 0000000..3106818
--- /dev/null
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -0,0 +1,807 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/optional.hpp>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <string>
+#include <unordered_map>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/tablet_bootstrap.h"
+#include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tserver/tablet_copy_client.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pstack_watcher.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_delete_leader_num_iters, 3,
+             "Number of iterations to run in TestDeleteLeaderDuringTabletCopyStressTest.");
+DEFINE_int32(test_delete_leader_min_rows_per_iter, 20,
+             "Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
+DEFINE_int32(test_delete_leader_payload_bytes, 16 * 1024,
+             "Payload byte size in TestDeleteLeaderDuringTabletCopyStressTest.");
+DEFINE_int32(test_delete_leader_num_writer_threads, 1,
+             "Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaFromSchema;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
+using kudu::itest::TServerDetails;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::TabletCopyClient;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DECLARE_entity(server);
+METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_UpdateConsensus);
+METRIC_DECLARE_counter(glog_info_messages);
+METRIC_DECLARE_counter(glog_warning_messages);
+METRIC_DECLARE_counter(glog_error_messages);
+
+namespace kudu {
+
+class TabletCopyITest : public KuduTest {
+ public:
+  virtual void TearDown() OVERRIDE {
+    if (HasFatalFailure()) {
+      LOG(INFO) << "Found fatal failure";
+      for (int i = 0; i < 3; i++) {
+        if (!cluster_->tablet_server(i)->IsProcessAlive()) {
+          LOG(INFO) << "Tablet server " << i << " is not running. Cannot dump its stacks.";
+          continue;
+        }
+        LOG(INFO) << "Attempting to dump stacks of TS " << i
+                  << " with UUID " << cluster_->tablet_server(i)->uuid()
+                  << " and pid " << cluster_->tablet_server(i)->pid();
+        WARN_NOT_OK(PstackWatcher::DumpPidStacks(cluster_->tablet_server(i)->pid()),
+                    "Couldn't dump stacks");
+      }
+    }
+    if (cluster_) cluster_->Shutdown();
+    KuduTest::TearDown();
+    STLDeleteValues(&ts_map_);
+  }
+
+ protected:
+  void StartCluster(const vector<string>& extra_tserver_flags = vector<string>(),
+                    const vector<string>& extra_master_flags = vector<string>(),
+                    int num_tablet_servers = 3);
+
+  gscoped_ptr<ExternalMiniCluster> cluster_;
+  gscoped_ptr<itest::ExternalMiniClusterFsInspector> inspect_;
+  shared_ptr<KuduClient> client_;
+  unordered_map<string, TServerDetails*> ts_map_;
+};
+
+void TabletCopyITest::StartCluster(const vector<string>& extra_tserver_flags,
+                                        const vector<string>& extra_master_flags,
+                                        int num_tablet_servers) {
+  ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = num_tablet_servers;
+  opts.extra_tserver_flags = extra_tserver_flags;
+  // Enable EO semantics for tests.
+  // TODO remove this once EO is the default.
+  opts.extra_tserver_flags.push_back("--enable_exactly_once");
+  opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
+  opts.extra_master_flags = extra_master_flags;
+  cluster_.reset(new ExternalMiniCluster(opts));
+  ASSERT_OK(cluster_->Start());
+  inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+                                          cluster_->messenger(),
+                                          &ts_map_));
+  KuduClientBuilder builder;
+  ASSERT_OK(cluster_->CreateClient(builder, &client_));
+}
+
+// If a rogue (a.k.a. zombie) leader tries to replace a tombstoned
+// tablet via Tablet Copy, make sure its term isn't older than the latest term
+// we observed. If it is older, make sure we reject the request, to avoid allowing old
+// leaders to create a parallel universe. This is possible because config
+// change could cause nodes to move around. The term check is reasonable
+// because only one node can be elected leader for a given term.
+//
+// A leader can "go rogue" due to a VM pause, CTRL-z, partition, etc.
+TEST_F(TabletCopyITest, TestRejectRogueLeader) {
+  // This test pauses for at least 10 seconds. Only run in slow-test mode.
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in fast-test mode.";
+    return;
+  }
+
+  vector<string> ts_flags, master_flags;
+  ts_flags.push_back("--enable_leader_failure_detection=false");
+  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+  NO_FATALS(StartCluster(ts_flags, master_flags));
+
+  const MonoDelta timeout = MonoDelta::FromSeconds(30);
+  const int kTsIndex = 0; // We'll test with the first TS.
+  TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+
+  TestWorkload workload(cluster_.get());
+  workload.Setup();
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, timeout));
+  }
+
+  // Elect a leader for term 1, then run some data through the cluster.
+  int zombie_leader_index = 1;
+  string zombie_leader_uuid = cluster_->tablet_server(zombie_leader_index)->uuid();
+  ASSERT_OK(itest::StartElection(ts_map_[zombie_leader_uuid], tablet_id, timeout));
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Come out of the blue and try to initiate Tablet Copy from a running server while
+  // specifying an old term. That running server should reject the request.
+  // We are essentially masquerading as a rogue leader here.
+  Status s = itest::StartTabletCopy(ts, tablet_id, zombie_leader_uuid,
+                                         HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
+                                         0, // Say I'm from term 0.
+                                         timeout);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "term 0 lower than last logged term 1");
+
+  // Now pause the actual leader so we can bring him back as a zombie later.
+  ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Pause());
+
+  // Trigger TS 2 to become leader of term 2.
+  int new_leader_index = 2;
+  string new_leader_uuid = cluster_->tablet_server(new_leader_index)->uuid();
+  ASSERT_OK(itest::StartElection(ts_map_[new_leader_uuid], tablet_id, timeout));
+  ASSERT_OK(itest::WaitUntilLeader(ts_map_[new_leader_uuid], tablet_id, timeout));
+
+  unordered_map<string, TServerDetails*> active_ts_map = ts_map_;
+  ASSERT_EQ(1, active_ts_map.erase(zombie_leader_uuid));
+
+  // Wait for the NO_OP entry from the term 2 election to propagate to the
+  // remaining nodes' logs so that we are guaranteed to reject the rogue
+  // leader's tablet copy request when we bring it back online.
+  int log_index = workload.batches_completed() + 2; // 2 terms == 2 additional NO_OP entries.
+  ASSERT_OK(WaitForServersToAgree(timeout, active_ts_map, tablet_id, log_index));
+  // TODO: Write more rows to the new leader once KUDU-1034 is fixed.
+
+  // Now kill the new leader and tombstone the replica on TS 0.
+  cluster_->tablet_server(new_leader_index)->Shutdown();
+  ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+
+  // Zombies!!! Resume the rogue zombie leader.
+  // He should attempt to tablet copy TS 0 but fail.
+  ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Resume());
+
+  // Loop for a few seconds to ensure that the tablet doesn't transition to READY.
+  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
+  deadline.AddDelta(MonoDelta::FromSeconds(5));
+  while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
+    ASSERT_OK(itest::ListTablets(ts, timeout, &tablets));
+    ASSERT_EQ(1, tablets.size());
+    ASSERT_EQ(TABLET_DATA_TOMBSTONED, tablets[0].tablet_status().tablet_data_state());
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Force the rogue leader to step down.
+  // Then, send a tablet copy start request from a "fake" leader that
+  // sends an up-to-date term in the RB request but the actual term stored
+  // in the copy source's consensus metadata would still be old.
+  LOG(INFO) << "Forcing rogue leader T " << tablet_id << " P " << zombie_leader_uuid
+            << " to step down...";
+  ASSERT_OK(itest::LeaderStepDown(ts_map_[zombie_leader_uuid], tablet_id, timeout));
+  ExternalTabletServer* zombie_ets = cluster_->tablet_server(zombie_leader_index);
+  // It's not necessarily part of the API but this could return faliure due to
+  // rejecting the remote. We intend to make that part async though, so ignoring
+  // this return value in this test.
+  ignore_result(itest::StartTabletCopy(ts, tablet_id, zombie_leader_uuid,
+                                            HostPort(zombie_ets->bound_rpc_addr()),
+                                            2, // Say I'm from term 2.
+                                            timeout));
+
+  // Wait another few seconds to be sure the tablet copy is rejected.
+  deadline = MonoTime::Now(MonoTime::FINE);
+  deadline.AddDelta(MonoDelta::FromSeconds(5));
+  while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
+    ASSERT_OK(itest::ListTablets(ts, timeout, &tablets));
+    ASSERT_EQ(1, tablets.size());
+    ASSERT_EQ(TABLET_DATA_TOMBSTONED, tablets[0].tablet_status().tablet_data_state());
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+}
+
+// Start tablet copy session and delete the tablet in the middle.
+// It should actually be possible to complete copying in such a case, because
+// when a Tablet Copy session is started on the "source" server, all of
+// the relevant files are either read or opened, meaning that an in-progress
+// Tablet Copy can complete even after a tablet is officially "deleted" on
+// the source server. This is also a regression test for KUDU-1009.
+TEST_F(TabletCopyITest, TestDeleteTabletDuringTabletCopy) {
+  MonoDelta timeout = MonoDelta::FromSeconds(10);
+  const int kTsIndex = 0; // We'll test with the first TS.
+  NO_FATALS(StartCluster());
+
+  // Populate a tablet with some data.
+  TestWorkload workload(cluster_.get());
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Ensure all the servers agree before we proceed.
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Set up an FsManager to use with the TabletCopyClient.
+  FsManagerOpts opts;
+  string testbase = GetTestPath("fake-ts");
+  ASSERT_OK(env_->CreateDir(testbase));
+  opts.wal_path = JoinPathSegments(testbase, "wals");
+  opts.data_paths.push_back(JoinPathSegments(testbase, "data-0"));
+  gscoped_ptr<FsManager> fs_manager(new FsManager(env_.get(), opts));
+  ASSERT_OK(fs_manager->CreateInitialFileSystemLayout());
+  ASSERT_OK(fs_manager->Open());
+
+  {
+    // Start up a TabletCopyClient and open a tablet copy session.
+    TabletCopyClient tc_client(tablet_id, fs_manager.get(),
+                                    cluster_->messenger());
+    scoped_refptr<tablet::TabletMetadata> meta;
+    ASSERT_OK(tc_client.Start(cluster_->tablet_server(kTsIndex)->bound_rpc_hostport(),
+                              &meta));
+
+    // Tombstone the tablet on the remote!
+    ASSERT_OK(itest::DeleteTablet(ts, tablet_id,
+                                  TABLET_DATA_TOMBSTONED, boost::none, timeout));
+
+    // Now finish copying!
+    tablet::TabletStatusListener listener(meta);
+    ASSERT_OK(tc_client.FetchAll(&listener));
+    ASSERT_OK(tc_client.Finish());
+
+    // Run destructor, which closes the remote session.
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(50));  // Give a little time for a crash (KUDU-1009).
+  ASSERT_TRUE(cluster_->tablet_server(kTsIndex)->IsProcessAlive());
+}
+
+// This test ensures that a leader can Tablet Copy on top of a tombstoned replica
+// that has a higher term recorded in the replica's consensus metadata if the
+// replica's last-logged opid has the same term (or less) as the leader serving
+// as the tablet copy source. When a tablet is tombstoned, its last-logged
+// opid is stored in a field its on-disk superblock.
+TEST_F(TabletCopyITest, TestTabletCopyFollowerWithHigherTerm) {
+  vector<string> ts_flags, master_flags;
+  ts_flags.push_back("--enable_leader_failure_detection=false");
+  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+  const int kNumTabletServers = 2;
+  NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
+
+  const MonoDelta timeout = MonoDelta::FromSeconds(30);
+  const int kFollowerIndex = 0;
+  TServerDetails* follower_ts = ts_map_[cluster_->tablet_server(kFollowerIndex)->uuid()];
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2);
+  workload.Setup();
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(follower_ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, timeout));
+  }
+
+  // Elect a leader for term 1, then run some data through the cluster.
+  const int kLeaderIndex = 1;
+  TServerDetails* leader_ts = ts_map_[cluster_->tablet_server(kLeaderIndex)->uuid()];
+  ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Pause the leader and increment the term on the follower by starting an
+  // election on the follower. The election will fail asynchronously but we
+  // just wait until we see that its term has incremented.
+  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Pause());
+  ASSERT_OK(itest::StartElection(follower_ts, tablet_id, timeout));
+  int64_t term = 0;
+  for (int i = 0; i < 1000; i++) {
+    consensus::ConsensusStatePB cstate;
+    ASSERT_OK(itest::GetConsensusState(follower_ts, tablet_id, CONSENSUS_CONFIG_COMMITTED,
+                                       timeout, &cstate));
+    term = cstate.current_term();
+    if (term == 2) break;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  ASSERT_EQ(2, term);
+
+  // Now tombstone the follower.
+  ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                timeout));
+
+  // Restart the follower's TS so that the leader's TS won't get its queued
+  // vote request messages. This is a hack but seems to work.
+  cluster_->tablet_server(kFollowerIndex)->Shutdown();
+  ASSERT_OK(cluster_->tablet_server(kFollowerIndex)->Restart());
+
+  // Now wake the leader. It should detect that the follower needs to be
+  // copied and proceed to bring it back up to date.
+  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Resume());
+
+  // Wait for the follower to come back up.
+  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
+}
+
+// Test that multiple concurrent tablet copys do not cause problems.
+// This is a regression test for KUDU-951, in which concurrent sessions on
+// multiple tablets between the same tablet copy client host and source host
+// could corrupt each other.
+TEST_F(TabletCopyITest, TestConcurrentTabletCopys) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in fast-test mode.";
+    return;
+  }
+
+  vector<string> ts_flags, master_flags;
+  ts_flags.push_back("--enable_leader_failure_detection=false");
+  ts_flags.push_back("--log_cache_size_limit_mb=1");
+  ts_flags.push_back("--log_segment_size_mb=1");
+  ts_flags.push_back("--log_async_preallocate_segments=false");
+  ts_flags.push_back("--log_min_segments_to_retain=100");
+  ts_flags.push_back("--flush_threshold_mb=0"); // Constantly flush.
+  ts_flags.push_back("--maintenance_manager_polling_interval_ms=10");
+  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+  NO_FATALS(StartCluster(ts_flags, master_flags));
+
+  const MonoDelta timeout = MonoDelta::FromSeconds(60);
+
+  // Create a table with several tablets. These will all be simultaneously
+  // copied to a single target node from the same leader host.
+  const int kNumTablets = 10;
+  KuduSchema client_schema(KuduSchemaFromSchema(GetSimpleTestSchema()));
+  vector<const KuduPartialRow*> splits;
+  for (int i = 0; i < kNumTablets - 1; i++) {
+    KuduPartialRow* row = client_schema.NewRow();
+    ASSERT_OK(row->SetInt32(0, numeric_limits<int32_t>::max() / kNumTablets * (i + 1)));
+    splits.push_back(row);
+  }
+  gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(TestWorkload::kDefaultTableName)
+                          .split_rows(splits)
+                          .schema(&client_schema)
+                          .set_range_partition_columns({ "key" })
+                          .num_replicas(3)
+                          .Create());
+
+  const int kTsIndex = 0; // We'll test with the first TS.
+  TServerDetails* target_ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+
+  // Figure out the tablet ids of the created tablets.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(target_ts, kNumTablets, timeout, &tablets));
+
+  vector<string> tablet_ids;
+  for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
+    tablet_ids.push_back(t.tablet_status().tablet_id());
+  }
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    for (const string& tablet_id : tablet_ids) {
+      ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                              tablet_id, timeout));
+    }
+  }
+
+  // Elect leaders on each tablet for term 1. All leaders will be on TS 1.
+  const int kLeaderIndex = 1;
+  const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
+  for (const string& tablet_id : tablet_ids) {
+    ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, timeout));
+  }
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_timeout_millis(10000);
+  workload.set_timeout_allowed(true);
+  workload.set_write_batch_size(10);
+  workload.set_num_write_threads(10);
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 20000) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  for (const string& tablet_id : tablet_ids) {
+    ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
+  }
+
+  // Now pause the leader so we can tombstone the tablets.
+  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Pause());
+
+  for (const string& tablet_id : tablet_ids) {
+    LOG(INFO) << "Tombstoning tablet " << tablet_id << " on TS " << target_ts->uuid();
+    ASSERT_OK(itest::DeleteTablet(target_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                  MonoDelta::FromSeconds(10)));
+  }
+
+  // Unpause the leader TS and wait for it to initiate Tablet Copy and replace the tombstoned
+  // tablets, in parallel.
+  ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Resume());
+  for (const string& tablet_id : tablet_ids) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(target_ts, tablet_id, timeout));
+  }
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
+                            workload.rows_inserted()));
+}
+
+// Test that repeatedly runs a load, tombstones a follower, then tombstones the
+// leader while the follower is copying. Regression test for
+// KUDU-1047.
+TEST_F(TabletCopyITest, TestDeleteLeaderDuringTabletCopyStressTest) {
+  // This test takes a while due to failure detection.
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in fast-test mode.";
+    return;
+  }
+
+  const MonoDelta timeout = MonoDelta::FromSeconds(60);
+  NO_FATALS(StartCluster(vector<string>(), vector<string>(), 5));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(5);
+  workload.set_payload_bytes(FLAGS_test_delete_leader_payload_bytes);
+  workload.set_num_write_threads(FLAGS_test_delete_leader_num_writer_threads);
+  workload.set_write_batch_size(1);
+  workload.set_write_timeout_millis(10000);
+  workload.set_timeout_allowed(true);
+  workload.set_not_found_allowed(true);
+  workload.Setup();
+
+  // Figure out the tablet id.
+  const int kTsIndex = 0;
+  TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, timeout));
+  }
+
+  int leader_index = -1;
+  int follower_index = -1;
+  TServerDetails* leader_ts = nullptr;
+  TServerDetails* follower_ts = nullptr;
+
+  for (int i = 0; i < FLAGS_test_delete_leader_num_iters; i++) {
+    LOG(INFO) << "Iteration " << (i + 1);
+    int rows_previously_inserted = workload.rows_inserted();
+
+    // Find out who's leader.
+    ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, timeout, &leader_ts));
+    leader_index = cluster_->tablet_server_index_by_uuid(leader_ts->uuid());
+
+    // Select an arbitrary follower.
+    follower_index = (leader_index + 1) % cluster_->num_tablet_servers();
+    follower_ts = ts_map_[cluster_->tablet_server(follower_index)->uuid()];
+
+    // Spin up the workload.
+    workload.Start();
+    while (workload.rows_inserted() < rows_previously_inserted +
+                                      FLAGS_test_delete_leader_min_rows_per_iter) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+
+    // Tombstone the follower.
+    LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << follower_ts->uuid();
+    ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                  timeout));
+
+    // Wait for tablet copy to start.
+    ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
+        follower_index, tablet_id,
+        { tablet::TABLET_DATA_COPYING, tablet::TABLET_DATA_READY },
+        timeout));
+
+    // Tombstone the leader.
+    LOG(INFO) << "Tombstoning leader tablet " << tablet_id << " on TS " << leader_ts->uuid();
+    ASSERT_OK(itest::DeleteTablet(leader_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                  timeout));
+
+    // Quiesce and rebuild to full strength. This involves electing a new
+    // leader from the remaining three, which requires a unanimous vote, and
+    // that leader then copying the old leader.
+    workload.StopAndJoin();
+    ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
+  }
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
+                            workload.rows_inserted()));
+}
+
+namespace {
+int64_t CountUpdateConsensusCalls(ExternalTabletServer* ets, const string& tablet_id) {
+  int64_t ret;
+  CHECK_OK(ets->GetInt64Metric(
+               &METRIC_ENTITY_server,
+               "kudu.tabletserver",
+               &METRIC_handler_latency_kudu_consensus_ConsensusService_UpdateConsensus,
+               "total_count",
+               &ret));
+  return ret;
+}
+int64_t CountLogMessages(ExternalTabletServer* ets) {
+  int64_t total = 0;
+
+  int64_t count;
+  CHECK_OK(ets->GetInt64Metric(
+               &METRIC_ENTITY_server,
+               "kudu.tabletserver",
+               &METRIC_glog_info_messages,
+               "value",
+               &count));
+  total += count;
+
+  CHECK_OK(ets->GetInt64Metric(
+               &METRIC_ENTITY_server,
+               "kudu.tabletserver",
+               &METRIC_glog_warning_messages,
+               "value",
+               &count));
+  total += count;
+
+  CHECK_OK(ets->GetInt64Metric(
+               &METRIC_ENTITY_server,
+               "kudu.tabletserver",
+               &METRIC_glog_error_messages,
+               "value",
+               &count));
+  total += count;
+
+  return total;
+}
+} // anonymous namespace
+
+// Test that if tablet copy is disabled by a flag, we don't get into
+// tight loops after a tablet is deleted. This is a regression test for situation
+// similar to the bug described in KUDU-821: we were previously handling a missing
+// tablet within consensus in such a way that we'd immediately send another RPC.
+TEST_F(TabletCopyITest, TestDisableTabletCopy_NoTightLoopWhenTabletDeleted) {
+  MonoDelta timeout = MonoDelta::FromSeconds(10);
+  vector<string> ts_flags, master_flags;
+  ts_flags.push_back("--enable_leader_failure_detection=false");
+  ts_flags.push_back("--enable_tablet_copy=false");
+  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+  NO_FATALS(StartCluster(ts_flags, master_flags));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+  workload.Setup();
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ExternalTabletServer* replica_ets = cluster_->tablet_server(1);
+  TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
+  ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, timeout));
+  }
+
+  // Elect a leader (TS 0)
+  ExternalTabletServer* leader_ts = cluster_->tablet_server(0);
+  ASSERT_OK(itest::StartElection(ts_map_[leader_ts->uuid()], tablet_id, timeout));
+
+  // Start writing, wait for some rows to be inserted.
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Tombstone the tablet on one of the servers (TS 1)
+  ASSERT_OK(itest::DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                timeout));
+
+  // Ensure that, if we sleep for a second while still doing writes to the leader:
+  // a) we don't spew logs on the leader side
+  // b) we don't get hit with a lot of UpdateConsensus calls on the replica.
+  int64_t num_update_rpcs_initial = CountUpdateConsensusCalls(replica_ets, tablet_id);
+  int64_t num_logs_initial = CountLogMessages(leader_ts);
+
+  SleepFor(MonoDelta::FromSeconds(1));
+  int64_t num_update_rpcs_after_sleep = CountUpdateConsensusCalls(replica_ets, tablet_id);
+  int64_t num_logs_after_sleep = CountLogMessages(leader_ts);
+
+  // Calculate rate per second of RPCs and log messages
+  int64_t update_rpcs_per_second = num_update_rpcs_after_sleep - num_update_rpcs_initial;
+  EXPECT_LT(update_rpcs_per_second, 20);
+  int64_t num_logs_per_second = num_logs_after_sleep - num_logs_initial;
+  EXPECT_LT(num_logs_per_second, 20);
+}
+
+// Test that if a Tablet Copy is taking a long time but the client peer is still responsive,
+// the leader won't mark it as failed.
+TEST_F(TabletCopyITest, TestSlowCopyDoesntFail) {
+  MonoDelta timeout = MonoDelta::FromSeconds(30);
+  vector<string> ts_flags, master_flags;
+  ts_flags.push_back("--enable_leader_failure_detection=false");
+  ts_flags.push_back("--tablet_copy_dowload_file_inject_latency_ms=5000");
+  ts_flags.push_back("--follower_unavailable_considered_failed_sec=2");
+  master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+  NO_FATALS(StartCluster(ts_flags, master_flags));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+  workload.Setup();
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ExternalTabletServer* replica_ets = cluster_->tablet_server(1);
+  TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
+  ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, timeout));
+  }
+
+  // Elect a leader (TS 0)
+  ExternalTabletServer* leader_ts = cluster_->tablet_server(0);
+  ASSERT_OK(itest::StartElection(ts_map_[leader_ts->uuid()], tablet_id, timeout));
+
+  // Start writing, wait for some rows to be inserted.
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+
+  // Tombstone the follower.
+  LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << replica_ts->uuid();
+  ASSERT_OK(itest::DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                timeout));
+
+  // Wait for tablet copy to start.
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(1, tablet_id,
+                                                 { tablet::TABLET_DATA_COPYING }, timeout));
+
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
+                            workload.rows_inserted()));
+}
+
+// Attempting to start Tablet Copy on a tablet that was deleted with
+// TABLET_DATA_DELETED should fail. This behavior helps avoid thrashing when
+// a follower tablet is deleted and the leader notices before it has processed
+// its own DeleteTablet RPC, thinking that it needs to bring its follower back.
+TEST_F(TabletCopyITest, TestTabletCopyingDeletedTabletFails) {
+  // Delete the leader with TABLET_DATA_DELETED.
+  // Attempt to manually copy a replica to the leader from a follower.
+  // Should get an error saying it's illegal.
+
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  NO_FATALS(StartCluster({"--enable_leader_failure_detection=false"},
+                         {"--catalog_manager_wait_for_new_tablets_to_elect_leader=false"}));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(3);
+  workload.Setup();
+
+  TServerDetails* leader = ts_map_[cluster_->tablet_server(0)->uuid()];
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(leader, 1, kTimeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, kTimeout));
+  }
+
+  // Elect a leader for term 1, then run some data through the cluster.
+  ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+
+  // Now delete the leader with TABLET_DATA_DELETED. We should not be able to
+  // bring back the leader after that until restarting the process.
+  ASSERT_OK(itest::DeleteTablet(leader, tablet_id, TABLET_DATA_DELETED, boost::none, kTimeout));
+
+  Status s = itest::StartTabletCopy(leader, tablet_id,
+                                         cluster_->tablet_server(1)->uuid(),
+                                         HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
+                                         1, // We are in term 1.
+                                         kTimeout);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Cannot transition from state TABLET_DATA_DELETED");
+
+  // Restart the server so that it won't remember the tablet was permanently
+  // deleted and we can tablet copy the server again.
+  cluster_->tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+  ASSERT_OK(itest::StartTabletCopy(leader, tablet_id,
+                                        cluster_->tablet_server(1)->uuid(),
+                                        HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
+                                        1, // We are in term 1.
+                                        kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index a2e34e1..77b890d 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -16,14 +16,14 @@
 # under the License.
 
 #########################################
-# remote_bootstrap_proto
+# tablet_copy_proto
 #########################################
 
 KRPC_GENERATE(
   TABLET_COPY_KRPC_SRCS TABLET_COPY_KRPC_HDRS TABLET_COPY_KRPC_TGTS
   SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
-  PROTO_FILES remote_bootstrap.proto)
+  PROTO_FILES tablet_copy.proto)
 set(TABLET_COPY_KRPC_LIBS
   consensus_proto
   krpc
@@ -31,7 +31,7 @@ set(TABLET_COPY_KRPC_LIBS
   rpc_header_proto
   tablet_proto
   wire_protocol_proto)
-ADD_EXPORTABLE_LIBRARY(remote_bootstrap_proto
+ADD_EXPORTABLE_LIBRARY(tablet_copy_proto
   SRCS ${TABLET_COPY_KRPC_SRCS}
   DEPS ${TABLET_COPY_KRPC_LIBS}
   NONLINK_DEPS ${TABLET_COPY_KRPC_TGTS})
@@ -89,8 +89,8 @@ set(TSERVER_KRPC_LIBS
   krpc
   kudu_common_proto
   protobuf
-  remote_bootstrap_proto
   rpc_header_proto
+  tablet_copy_proto
   tserver_proto
   wire_protocol_proto)
 ADD_EXPORTABLE_LIBRARY(tserver_service_proto
@@ -105,11 +105,11 @@ ADD_EXPORTABLE_LIBRARY(tserver_service_proto
 set(TSERVER_SRCS
   heartbeater.cc
   mini_tablet_server.cc
-  remote_bootstrap_client.cc
-  remote_bootstrap_service.cc
-  remote_bootstrap_session.cc
   scanner_metrics.cc
   scanners.cc
+  tablet_copy_client.cc
+  tablet_copy_service.cc
+  tablet_copy_session.cc
   tablet_server.cc
   tablet_server_options.cc
   tablet_service.cc
@@ -120,10 +120,10 @@ set(TSERVER_SRCS
 add_library(tserver ${TSERVER_SRCS})
 target_link_libraries(tserver
   protobuf
+  tablet_copy_proto
   tserver_proto
   tserver_admin_proto
   tserver_service_proto
-  remote_bootstrap_proto
   master_rpc
   master_proto
   consensus_proto
@@ -164,9 +164,9 @@ set(KUDU_TEST_LINK_LIBS
   tserver
   tserver_test_util
   ${KUDU_MIN_TEST_LIBS})
-ADD_KUDU_TEST(remote_bootstrap_client-test)
-ADD_KUDU_TEST(remote_bootstrap_session-test)
-ADD_KUDU_TEST(remote_bootstrap_service-test)
+ADD_KUDU_TEST(tablet_copy_client-test)
+ADD_KUDU_TEST(tablet_copy_session-test)
+ADD_KUDU_TEST(tablet_copy_service-test)
 ADD_KUDU_TEST(tablet_server-test)
 ADD_KUDU_TEST(tablet_server-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(scanners-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap-test-base.h b/src/kudu/tserver/remote_bootstrap-test-base.h
deleted file mode 100644
index 0f7faa1..0000000
--- a/src/kudu/tserver/remote_bootstrap-test-base.h
+++ /dev/null
@@ -1,126 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TSERVER_TABLET_COPY_TEST_BASE_H_
-#define KUDU_TSERVER_TABLET_COPY_TEST_BASE_H_
-
-#include "kudu/tserver/tablet_server-test-base.h"
-
-#include <string>
-
-#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/fs/block_manager.h"
-#include "kudu/gutil/strings/fastmem.h"
-#include "kudu/tablet/metadata.pb.h"
-#include "kudu/tserver/remote_bootstrap.pb.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/test_util.h"
-
-namespace kudu {
-namespace tserver {
-
-using consensus::MinimumOpId;
-
-// Number of times to roll the log.
-static const int kNumLogRolls = 2;
-
-class TabletCopyTest : public TabletServerTestBase {
- public:
-  virtual void SetUp() OVERRIDE {
-    NO_FATALS(TabletServerTestBase::SetUp());
-    NO_FATALS(StartTabletServer());
-    // Prevent logs from being deleted out from under us until / unless we want
-    // to test that we are anchoring correctly. Since GenerateTestData() does a
-    // Flush(), Log GC is allowed to eat the logs before we get around to
-    // starting a tablet copy session.
-    tablet_peer_->log_anchor_registry()->Register(
-      MinimumOpId().index(), CURRENT_TEST_NAME(), &anchor_);
-    NO_FATALS(GenerateTestData());
-  }
-
-  virtual void TearDown() OVERRIDE {
-    ASSERT_OK(tablet_peer_->log_anchor_registry()->Unregister(&anchor_));
-    NO_FATALS(TabletServerTestBase::TearDown());
-  }
-
- protected:
-  // Grab the first column block we find in the SuperBlock.
-  static BlockId FirstColumnBlockId(const tablet::TabletSuperBlockPB& superblock) {
-    const tablet::RowSetDataPB& rowset = superblock.rowsets(0);
-    const tablet::ColumnDataPB& column = rowset.columns(0);
-    const BlockIdPB& block_id_pb = column.block();
-    return BlockId::FromPB(block_id_pb);
-  }
-
-  // Check that the contents and CRC32C of a DataChunkPB are equal to a local buffer.
-  static void AssertDataEqual(const uint8_t* local, int64_t size, const DataChunkPB& remote) {
-    ASSERT_EQ(size, remote.data().size());
-    ASSERT_TRUE(strings::memeq(local, remote.data().data(), size));
-    uint32_t crc32 = crc::Crc32c(local, size);
-    ASSERT_EQ(crc32, remote.crc32());
-  }
-
-  // Generate the test data for the tablet and do the flushing we assume will be
-  // done in the unit tests for tablet copy.
-  void GenerateTestData() {
-    const int kIncr = 50;
-    LOG_TIMING(INFO, "Loading test data") {
-      for (int row_id = 0; row_id < kNumLogRolls * kIncr; row_id += kIncr) {
-        InsertTestRowsRemote(0, row_id, kIncr);
-        ASSERT_OK(tablet_peer_->tablet()->Flush());
-        ASSERT_OK(tablet_peer_->log()->AllocateSegmentAndRollOver());
-      }
-    }
-  }
-
-  // Return the permananent_uuid of the local service.
-  const std::string GetLocalUUID() const {
-    return tablet_peer_->permanent_uuid();
-  }
-
-  const std::string& GetTabletId() const {
-    return tablet_peer_->tablet()->tablet_id();
-  }
-
-  // Read a block file from the file system fully into memory and return a
-  // Slice pointing to it.
-  Status ReadLocalBlockFile(FsManager* fs_manager, const BlockId& block_id,
-                            faststring* scratch, Slice* slice) {
-    gscoped_ptr<fs::ReadableBlock> block;
-    RETURN_NOT_OK(fs_manager->OpenBlock(block_id, &block));
-
-    uint64_t size = 0;
-    RETURN_NOT_OK(block->Size(&size));
-    scratch->resize(size);
-    RETURN_NOT_OK(block->Read(0, size, slice, scratch->data()));
-
-    // Since the mmap will go away on return, copy the data into scratch.
-    if (slice->data() != scratch->data()) {
-      memcpy(scratch->data(), slice->data(), slice->size());
-      *slice = Slice(scratch->data(), slice->size());
-    }
-    return Status::OK();
-  }
-
-  log::LogAnchor anchor_;
-};
-
-} // namespace tserver
-} // namespace kudu
-
-#endif // KUDU_TSERVER_TABLET_COPY_TEST_BASE_H_


Mime
View raw message