kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/5] kudu git commit: Rename remote bootstrap files to 'tablet copy'
Date Sun, 07 Aug 2016 03:56:51 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_session-test.cc b/src/kudu/tserver/remote_bootstrap_session-test.cc
deleted file mode 100644
index 0f0236a..0000000
--- a/src/kudu/tserver/remote_bootstrap_session-test.cc
+++ /dev/null
@@ -1,334 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tablet/tablet-test-util.h"
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-#include <memory>
-
-#include "kudu/common/partial_row.h"
-#include "kudu/common/row_operations.h"
-#include "kudu/common/schema.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/fs/block_id.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/fastmem.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/tserver/remote_bootstrap_session.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/test_util.h"
-#include "kudu/util/threadpool.h"
-
-METRIC_DECLARE_entity(tablet);
-
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-
-namespace kudu {
-namespace tserver {
-
-using consensus::ConsensusMetadata;
-using consensus::OpId;
-using consensus::RaftConfigPB;
-using consensus::RaftPeerPB;
-using fs::ReadableBlock;
-using log::Log;
-using log::LogOptions;
-using log::LogAnchorRegistry;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
-using strings::Substitute;
-using tablet::ColumnDataPB;
-using tablet::DeltaDataPB;
-using tablet::KuduTabletTest;
-using tablet::RowSetDataPB;
-using tablet::TabletPeer;
-using tablet::TabletSuperBlockPB;
-using tablet::WriteTransactionState;
-
-class TabletCopyTest : public KuduTabletTest {
- public:
-  TabletCopyTest()
-    : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
-                              ColumnSchema("val", INT32) }, 1)) {
-    CHECK_OK(ThreadPoolBuilder("test-exec").Build(&apply_pool_));
-  }
-
-  virtual void SetUp() OVERRIDE {
-    NO_FATALS(KuduTabletTest::SetUp());
-    NO_FATALS(SetUpTabletPeer());
-    NO_FATALS(PopulateTablet());
-    NO_FATALS(InitSession());
-  }
-
-  virtual void TearDown() OVERRIDE {
-    session_.reset();
-    tablet_peer_->Shutdown();
-    KuduTabletTest::TearDown();
-  }
-
- protected:
-  void SetUpTabletPeer() {
-    scoped_refptr<Log> log;
-    ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
-                        *tablet()->schema(),
-                        0, // schema_version
-                        NULL, &log));
-
-    scoped_refptr<MetricEntity> metric_entity =
-      METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME());
-
-    RaftPeerPB config_peer;
-    config_peer.set_permanent_uuid(fs_manager()->uuid());
-    config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
-    config_peer.mutable_last_known_addr()->set_port(0);
-    config_peer.set_member_type(RaftPeerPB::VOTER);
-
-    tablet_peer_.reset(
-        new TabletPeer(tablet()->metadata(),
-                       config_peer,
-                       apply_pool_.get(),
-                       Bind(&TabletCopyTest::TabletPeerStateChangedCallback,
-                            Unretained(this),
-                            tablet()->tablet_id())));
-
-    // TODO similar to code in tablet_peer-test, consider refactor.
-    RaftConfigPB config;
-    config.add_peers()->CopyFrom(config_peer);
-    config.set_opid_index(consensus::kInvalidOpIdIndex);
-
-    gscoped_ptr<ConsensusMetadata> cmeta;
-    ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
-                                        tablet()->tablet_id(), fs_manager()->uuid(),
-                                        config, consensus::kMinimumTerm, &cmeta));
-
-    shared_ptr<Messenger> messenger;
-    MessengerBuilder mbuilder(CURRENT_TEST_NAME());
-    mbuilder.Build(&messenger);
-
-    log_anchor_registry_.reset(new LogAnchorRegistry());
-    tablet_peer_->SetBootstrapping();
-    ASSERT_OK(tablet_peer_->Init(tablet(),
-                                 clock(),
-                                 messenger,
-                                 scoped_refptr<rpc::ResultTracker>(),
-                                 log,
-                                 metric_entity));
-    consensus::ConsensusBootstrapInfo boot_info;
-    ASSERT_OK(tablet_peer_->Start(boot_info));
-    ASSERT_OK(tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
-    ASSERT_OK(tablet_peer_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
-  }
-
-  void TabletPeerStateChangedCallback(const string& tablet_id, const string& reason) {
-    LOG(INFO) << "Tablet peer state changed for tablet " << tablet_id << ". Reason: " << reason;
-  }
-
-  void PopulateTablet() {
-    for (int32_t i = 0; i < 1000; i++) {
-      WriteRequestPB req;
-      req.set_tablet_id(tablet_peer_->tablet_id());
-      ASSERT_OK(SchemaToPB(client_schema_, req.mutable_schema()));
-      RowOperationsPB* data = req.mutable_row_operations();
-      RowOperationsPBEncoder enc(data);
-      KuduPartialRow row(&client_schema_);
-
-      string key = Substitute("key$0", i);
-      ASSERT_OK(row.SetString(0, key));
-      ASSERT_OK(row.SetInt32(1, i));
-      enc.Add(RowOperationsPB::INSERT, row);
-
-      WriteResponsePB resp;
-      CountDownLatch latch(1);
-
-      unique_ptr<tablet::WriteTransactionState> state(
-          new tablet::WriteTransactionState(tablet_peer_.get(),
-                                            &req,
-                                            nullptr, // No RequestIdPB
-                                            &resp));
-      state->set_completion_callback(gscoped_ptr<tablet::TransactionCompletionCallback>(
-          new tablet::LatchTransactionCompletionCallback<WriteResponsePB>(&latch, &resp)));
-      ASSERT_OK(tablet_peer_->SubmitWrite(std::move(state)));
-      latch.Wait();
-      ASSERT_FALSE(resp.has_error()) << "Request failed: " << resp.error().ShortDebugString();
-      ASSERT_EQ(0, resp.per_row_errors_size()) << "Insert error: " << resp.ShortDebugString();
-    }
-    ASSERT_OK(tablet()->Flush());
-  }
-
-  void InitSession() {
-    session_.reset(new TabletCopySession(tablet_peer_.get(), "TestSession", "FakeUUID",
-                   fs_manager()));
-    ASSERT_OK(session_->Init());
-  }
-
-  // Read the specified BlockId, via the TabletCopySession, into a file.
-  // 'path' will be populated with the name of the file used.
-  // 'file' will be set to point to the SequentialFile containing the data.
-  void FetchBlockToFile(const BlockId& block_id,
-                        string* path,
-                        gscoped_ptr<SequentialFile>* file) {
-    string data;
-    int64_t block_file_size = 0;
-    TabletCopyErrorPB::Code error_code;
-    CHECK_OK(session_->GetBlockPiece(block_id, 0, 0, &data, &block_file_size, &error_code));
-    if (block_file_size > 0) {
-      CHECK_GT(data.size(), 0);
-    }
-
-    // Write the file to a temporary location.
-    WritableFileOptions opts;
-    string path_template = GetTestPath(Substitute("test_block_$0.tmp.XXXXXX", block_id.ToString()));
-    gscoped_ptr<WritableFile> writable_file;
-    CHECK_OK(Env::Default()->NewTempWritableFile(opts, path_template, path, &writable_file));
-    CHECK_OK(writable_file->Append(Slice(data.data(), data.size())));
-    CHECK_OK(writable_file->Close());
-
-    CHECK_OK(Env::Default()->NewSequentialFile(*path, file));
-  }
-
-  MetricRegistry metric_registry_;
-  scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
-  gscoped_ptr<ThreadPool> apply_pool_;
-  scoped_refptr<TabletPeer> tablet_peer_;
-  scoped_refptr<TabletCopySession> session_;
-};
-
-// Ensure that the serialized SuperBlock included in the TabletCopySession is
-// equal to the serialized live superblock (on a quiesced tablet).
-TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
-  // Compare content of superblocks.
-  faststring session_buf;
-  faststring tablet_buf;
-
-  {
-    const TabletSuperBlockPB& session_superblock = session_->tablet_superblock();
-    int size = session_superblock.ByteSize();
-    session_buf.resize(size);
-    uint8_t* session_dst = session_buf.data();
-    session_dst = session_superblock.SerializeWithCachedSizesToArray(session_dst);
-  }
-
-  {
-    TabletSuperBlockPB tablet_superblock;
-    ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
-    int size = tablet_superblock.ByteSize();
-    tablet_buf.resize(size);
-    uint8_t* tablet_dst = tablet_buf.data();
-    tablet_dst = tablet_superblock.SerializeWithCachedSizesToArray(tablet_dst);
-  }
-
-  ASSERT_EQ(session_buf.size(), tablet_buf.size());
-  int size = tablet_buf.size();
-  ASSERT_EQ(0, strings::fastmemcmp_inlined(session_buf.data(), tablet_buf.data(), size));
-}
-
-// Test fetching all files from tablet server, ensure the checksums for each
-// chunk and the total file sizes match.
-TEST_F(TabletCopyTest, TestBlocksEqual) {
-  TabletSuperBlockPB tablet_superblock;
-  ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
-  for (int i = 0; i < tablet_superblock.rowsets_size(); i++) {
-    const RowSetDataPB& rowset = tablet_superblock.rowsets(i);
-    for (int j = 0; j < rowset.columns_size(); j++) {
-      const ColumnDataPB& column = rowset.columns(j);
-      const BlockIdPB& block_id_pb = column.block();
-      BlockId block_id = BlockId::FromPB(block_id_pb);
-
-      string path;
-      gscoped_ptr<SequentialFile> file;
-      FetchBlockToFile(block_id, &path, &file);
-      uint64_t session_block_size = 0;
-      ASSERT_OK(Env::Default()->GetFileSize(path, &session_block_size));
-      faststring buf;
-      buf.resize(session_block_size);
-      Slice data;
-      ASSERT_OK(file->Read(session_block_size, &data, buf.data()));
-      uint32_t session_crc = crc::Crc32c(data.data(), data.size());
-      LOG(INFO) << "session block file has size of " << session_block_size
-                << " and CRC32C of " << session_crc << ": " << path;
-
-      gscoped_ptr<ReadableBlock> tablet_block;
-      ASSERT_OK(fs_manager()->OpenBlock(block_id, &tablet_block));
-      uint64_t tablet_block_size = 0;
-      ASSERT_OK(tablet_block->Size(&tablet_block_size));
-      buf.resize(tablet_block_size);
-      ASSERT_OK(tablet_block->Read(0, tablet_block_size, &data, buf.data()));
-      uint32_t tablet_crc = crc::Crc32c(data.data(), data.size());
-      LOG(INFO) << "tablet block file has size of " << tablet_block_size
-                << " and CRC32C of " << tablet_crc
-                << ": " << block_id;
-
-      // Compare the blocks.
-      ASSERT_EQ(tablet_block_size, session_block_size);
-      ASSERT_EQ(tablet_crc, session_crc);
-    }
-  }
-}
-
-// Ensure that blocks are still readable through the open session even
-// after they've been deleted.
-TEST_F(TabletCopyTest, TestBlocksAreFetchableAfterBeingDeleted) {
-  TabletSuperBlockPB tablet_superblock;
-  ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
-
-  // Gather all the blocks.
-  vector<BlockId> data_blocks;
-  for (const RowSetDataPB& rowset : tablet_superblock.rowsets()) {
-    for (const DeltaDataPB& redo : rowset.redo_deltas()) {
-      data_blocks.push_back(BlockId::FromPB(redo.block()));
-    }
-    for (const DeltaDataPB& undo : rowset.undo_deltas()) {
-      data_blocks.push_back(BlockId::FromPB(undo.block()));
-    }
-    for (const ColumnDataPB& column : rowset.columns()) {
-      data_blocks.push_back(BlockId::FromPB(column.block()));
-    }
-    if (rowset.has_bloom_block()) {
-      data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
-    }
-    if (rowset.has_adhoc_index_block()) {
-      data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
-    }
-  }
-
-  // Delete them.
-  for (const BlockId& block_id : data_blocks) {
-    ASSERT_OK(fs_manager()->DeleteBlock(block_id));
-  }
-
-  // Read them back.
-  for (const BlockId& block_id : data_blocks) {
-    ASSERT_TRUE(session_->IsBlockOpenForTests(block_id));
-    string data;
-    TabletCopyErrorPB::Code error_code;
-    int64_t piece_size;
-    ASSERT_OK(session_->GetBlockPiece(block_id, 0, 0,
-                                      &data, &piece_size, &error_code));
-  }
-}
-
-}  // namespace tserver
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_session.cc b/src/kudu/tserver/remote_bootstrap_session.cc
deleted file mode 100644
index 61d79df..0000000
--- a/src/kudu/tserver/remote_bootstrap_session.cc
+++ /dev/null
@@ -1,386 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap_session.h"
-
-#include <algorithm>
-#include <mutex>
-
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_reader.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/fs/block_manager.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/type_traits.h"
-#include "kudu/rpc/transfer.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/mutex.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/trace.h"
-
-DEFINE_int32(tablet_copy_transfer_chunk_size_bytes, 4 * 1024 * 1024,
-             "Size of chunks to transfer when copying tablets between "
-             "tablet servers.");
-TAG_FLAG(tablet_copy_transfer_chunk_size_bytes, hidden);
-
-namespace kudu {
-namespace tserver {
-
-using consensus::MinimumOpId;
-using consensus::OpId;
-using fs::ReadableBlock;
-using log::LogAnchorRegistry;
-using log::ReadableLogSegment;
-using std::shared_ptr;
-using strings::Substitute;
-using tablet::ColumnDataPB;
-using tablet::DeltaDataPB;
-using tablet::RowSetDataPB;
-using tablet::TabletMetadata;
-using tablet::TabletPeer;
-using tablet::TabletSuperBlockPB;
-
-TabletCopySession::TabletCopySession(
-    const scoped_refptr<TabletPeer>& tablet_peer, std::string session_id,
-    std::string requestor_uuid, FsManager* fs_manager)
-    : tablet_peer_(tablet_peer),
-      session_id_(std::move(session_id)),
-      requestor_uuid_(std::move(requestor_uuid)),
-      fs_manager_(fs_manager),
-      blocks_deleter_(&blocks_),
-      logs_deleter_(&logs_) {}
-
-TabletCopySession::~TabletCopySession() {
-  // No lock taken in the destructor, should only be 1 thread with access now.
-  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
-}
-
-Status TabletCopySession::Init() {
-  MutexLock l(session_lock_);
-  CHECK(!initted_);
-
-  const string& tablet_id = tablet_peer_->tablet_id();
-
-  // Prevent log GC while we grab log segments and Tablet metadata.
-  string anchor_owner_token = Substitute("TabletCopy-$0", session_id_);
-  tablet_peer_->log_anchor_registry()->Register(
-      MinimumOpId().index(), anchor_owner_token, &log_anchor_);
-
-  // Read the SuperBlock from disk.
-  const scoped_refptr<TabletMetadata>& metadata = tablet_peer_->tablet_metadata();
-  RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_),
-                        Substitute("Unable to access superblock for tablet $0",
-                                   tablet_id));
-
-  // Anchor the data blocks by opening them and adding them to the cache.
-  //
-  // All subsequent requests should reuse the opened blocks.
-  vector<BlockIdPB> data_blocks;
-  TabletMetadata::CollectBlockIdPBs(tablet_superblock_, &data_blocks);
-  for (const BlockIdPB& block_id : data_blocks) {
-    VLOG(1) << "Opening block " << block_id.DebugString();
-    RETURN_NOT_OK(OpenBlockUnlocked(BlockId::FromPB(block_id)));
-  }
-
-  // Get the latest opid in the log at this point in time so we can re-anchor.
-  OpId last_logged_opid;
-  tablet_peer_->log()->GetLatestEntryOpId(&last_logged_opid);
-
-  // Get the current segments from the log, including the active segment.
-  // The Log doesn't add the active segment to the log reader's list until
-  // a header has been written to it (but it will not have a footer).
-  shared_ptr<log::LogReader> reader = tablet_peer_->log()->reader();
-  if (!reader) {
-    tablet::TabletStatePB tablet_state = tablet_peer_->state();
-    return Status::IllegalState(Substitute(
-        "Unable to initialize tablet copy session for tablet $0. "
-        "Log reader is not available. Tablet state: $1 ($2)",
-        tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
-  }
-  reader->GetSegmentsSnapshot(&log_segments_);
-  for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
-    RETURN_NOT_OK(OpenLogSegmentUnlocked(segment->header().sequence_number()));
-  }
-
-  // Look up the committed consensus state.
-  // We do this after snapshotting the log to avoid a scenario where the latest
-  // entry in the log has a term higher than the term stored in the consensus
-  // metadata, which will results in a CHECK failure on RaftConsensus init.
-  scoped_refptr<consensus::Consensus> consensus = tablet_peer_->shared_consensus();
-  if (!consensus) {
-    tablet::TabletStatePB tablet_state = tablet_peer_->state();
-    return Status::IllegalState(Substitute(
-        "Unable to initialize tablet copy session for tablet $0. "
-        "Consensus is not available. Tablet state: $1 ($2)",
-        tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
-  }
-  initial_committed_cstate_ = consensus->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED);
-
-  // Re-anchor on the highest OpId that was in the log right before we
-  // snapshotted the log segments. This helps ensure that we don't end up in a
-  // tablet copy loop due to a follower falling too far behind the
-  // leader's log when tablet copy is slow. The remote controls when
-  // this anchor is released by ending the tablet copy session.
-  RETURN_NOT_OK(tablet_peer_->log_anchor_registry()->UpdateRegistration(
-      last_logged_opid.index(), anchor_owner_token, &log_anchor_));
-
-  LOG(INFO) << Substitute(
-      "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",
-      tablet_id, consensus->peer_uuid(), data_blocks.size(), log_segments_.size());
-  initted_ = true;
-  return Status::OK();
-}
-
-const std::string& TabletCopySession::tablet_id() const {
-  DCHECK(initted_);
-  return tablet_peer_->tablet_id();
-}
-
-const std::string& TabletCopySession::requestor_uuid() const {
-  DCHECK(initted_);
-  return requestor_uuid_;
-}
-
-// Determine the length of the data chunk to return to the client.
-static int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) {
-  // Overhead in the RPC for things like headers, protobuf data, etc.
-  static const int kOverhead = 4096;
-
-  if (requested_len <= 0) {
-    requested_len = FLAGS_tablet_copy_transfer_chunk_size_bytes;
-  } else {
-    requested_len = std::min<int64_t>(requested_len,
-                                      FLAGS_tablet_copy_transfer_chunk_size_bytes);
-  }
-  requested_len = std::min<int64_t>(requested_len, FLAGS_rpc_max_message_size - kOverhead);
-  CHECK_GT(requested_len, 0) << "rpc_max_message_size is too low to transfer data: "
-                                     << FLAGS_rpc_max_message_size;
-  return std::min(bytes_remaining, requested_len);
-}
-
-// Calculate the size of the data to return given a maximum client message
-// length, the file itself, and the offset into the file to be read from.
-static Status GetResponseDataSize(int64_t total_size,
-                                  uint64_t offset, int64_t client_maxlen,
-                                  TabletCopyErrorPB::Code* error_code, int64_t* data_size) {
-  // If requested offset is off the end of the data, bail.
-  if (offset >= total_size) {
-    *error_code = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
-    return Status::InvalidArgument(
-        Substitute("Requested offset ($0) is beyond the data size ($1)",
-                   offset, total_size));
-  }
-
-  int64_t bytes_remaining = total_size - offset;
-
-  *data_size = DetermineReadLength(bytes_remaining, client_maxlen);
-  DCHECK_GT(*data_size, 0);
-  if (client_maxlen > 0) {
-    DCHECK_LE(*data_size, client_maxlen);
-  }
-
-  return Status::OK();
-}
-
-// Read a chunk of a file into a buffer.
-// data_name provides a string for the block/log to be used in error messages.
-template <class Info>
-static Status ReadFileChunkToBuf(const Info* info,
-                                 uint64_t offset, int64_t client_maxlen,
-                                 const string& data_name,
-                                 string* data, int64_t* file_size,
-                                 TabletCopyErrorPB::Code* error_code) {
-  int64_t response_data_size = 0;
-  RETURN_NOT_OK_PREPEND(GetResponseDataSize(info->size, offset, client_maxlen, error_code,
-                                            &response_data_size),
-                        Substitute("Error reading $0", data_name));
-
-  Stopwatch chunk_timer(Stopwatch::THIS_THREAD);
-  chunk_timer.start();
-
-  // Writing into a std::string buffer is basically guaranteed to work on C++11,
-  // however any modern compiler should be compatible with it.
-  // Violates the API contract, but avoids excessive copies.
-  data->resize(response_data_size);
-  uint8_t* buf = reinterpret_cast<uint8_t*>(const_cast<char*>(data->data()));
-  Slice slice;
-  Status s = info->ReadFully(offset, response_data_size, &slice, buf);
-  if (PREDICT_FALSE(!s.ok())) {
-    s = s.CloneAndPrepend(
-        Substitute("Unable to read existing file for $0", data_name));
-    LOG(WARNING) << s.ToString();
-    *error_code = TabletCopyErrorPB::IO_ERROR;
-    return s;
-  }
-  // Figure out if Slice points to buf or if Slice points to the mmap.
-  // If it points to the mmap then copy into buf.
-  if (slice.data() != buf) {
-    memcpy(buf, slice.data(), slice.size());
-  }
-  chunk_timer.stop();
-  TRACE("Tablet Copy: $0: $1 total bytes read. Total time elapsed: $2",
-        data_name, response_data_size, chunk_timer.elapsed().ToString());
-
-  *file_size = info->size;
-  return Status::OK();
-}
-
-Status TabletCopySession::GetBlockPiece(const BlockId& block_id,
-                                             uint64_t offset, int64_t client_maxlen,
-                                             string* data, int64_t* block_file_size,
-                                             TabletCopyErrorPB::Code* error_code) {
-  ImmutableReadableBlockInfo* block_info;
-  RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code));
-
-  RETURN_NOT_OK(ReadFileChunkToBuf(block_info, offset, client_maxlen,
-                                   Substitute("block $0", block_id.ToString()),
-                                   data, block_file_size, error_code));
-
-  // Note: We do not eagerly close the block, as doing so may delete the
-  // underlying data if this was its last reader and it had been previously
-  // marked for deletion. This would be a problem for parallel readers in
-  // the same session; they would not be able to find the block.
-
-  return Status::OK();
-}
-
-Status TabletCopySession::GetLogSegmentPiece(uint64_t segment_seqno,
-                                                  uint64_t offset, int64_t client_maxlen,
-                                                  std::string* data, int64_t* block_file_size,
-                                                  TabletCopyErrorPB::Code* error_code) {
-  ImmutableRandomAccessFileInfo* file_info;
-  RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
-  RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset, client_maxlen,
-                                   Substitute("log segment $0", segment_seqno),
-                                   data, block_file_size, error_code));
-
-  // Note: We do not eagerly close log segment files, since we share ownership
-  // of the LogSegment objects with the Log itself.
-
-  return Status::OK();
-}
-
-bool TabletCopySession::IsBlockOpenForTests(const BlockId& block_id) const {
-  MutexLock l(session_lock_);
-  return ContainsKey(blocks_, block_id);
-}
-
-// Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo
-// object with the file ref and size.
-template <class Collection, class Key, class Readable>
-static Status AddImmutableFileToMap(Collection* const cache,
-                                    const Key& key,
-                                    const Readable& readable,
-                                    uint64_t size) {
-  // Sanity check for 0-length files.
-  if (size == 0) {
-    return Status::Corruption("Found 0-length object");
-  }
-
-  // Looks good, add it to the cache.
-  typedef typename Collection::mapped_type InfoPtr;
-  typedef typename base::remove_pointer<InfoPtr>::type Info;
-  InsertOrDie(cache, key, new Info(readable, size));
-
-  return Status::OK();
-}
-
-Status TabletCopySession::OpenBlockUnlocked(const BlockId& block_id) {
-  session_lock_.AssertAcquired();
-
-  gscoped_ptr<ReadableBlock> block;
-  Status s = fs_manager_->OpenBlock(block_id, &block);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << "Unable to open requested (existing) block file: "
-                 << block_id.ToString() << ": " << s.ToString();
-    return s.CloneAndPrepend(Substitute("Unable to open block file for block $0",
-                                        block_id.ToString()));
-  }
-
-  uint64_t size;
-  s = block->Size(&size);
-  if (PREDICT_FALSE(!s.ok())) {
-    return s.CloneAndPrepend("Unable to get size of block");
-  }
-
-  s = AddImmutableFileToMap(&blocks_, block_id, block.get(), size);
-  if (!s.ok()) {
-    s = s.CloneAndPrepend(Substitute("Error accessing data for block $0", block_id.ToString()));
-    LOG(DFATAL) << "Data block disappeared: " << s.ToString();
-  } else {
-    ignore_result(block.release());
-  }
-  return s;
-}
-
-Status TabletCopySession::FindBlock(const BlockId& block_id,
-                                         ImmutableReadableBlockInfo** block_info,
-                                         TabletCopyErrorPB::Code* error_code) {
-  Status s;
-  MutexLock l(session_lock_);
-  if (!FindCopy(blocks_, block_id, block_info)) {
-    *error_code = TabletCopyErrorPB::BLOCK_NOT_FOUND;
-    s = Status::NotFound("Block not found", block_id.ToString());
-  }
-  return s;
-}
-
-Status TabletCopySession::OpenLogSegmentUnlocked(uint64_t segment_seqno) {
-  session_lock_.AssertAcquired();
-
-  scoped_refptr<log::ReadableLogSegment> log_segment;
-  int position = -1;
-  if (!log_segments_.empty()) {
-    position = segment_seqno - log_segments_[0]->header().sequence_number();
-  }
-  if (position < 0 || position >= log_segments_.size()) {
-    return Status::NotFound(Substitute("Segment with sequence number $0 not found",
-                                       segment_seqno));
-  }
-  log_segment = log_segments_[position];
-  CHECK_EQ(log_segment->header().sequence_number(), segment_seqno);
-
-  uint64_t size = log_segment->readable_up_to();
-  Status s = AddImmutableFileToMap(&logs_, segment_seqno, log_segment->readable_file(), size);
-  if (!s.ok()) {
-    s = s.CloneAndPrepend(
-            Substitute("Error accessing data for log segment with seqno $0",
-                       segment_seqno));
-    LOG(INFO) << s.ToString();
-  }
-  return s;
-}
-
-Status TabletCopySession::FindLogSegment(uint64_t segment_seqno,
-                                              ImmutableRandomAccessFileInfo** file_info,
-                                              TabletCopyErrorPB::Code* error_code) {
-  MutexLock l(session_lock_);
-  if (!FindCopy(logs_, segment_seqno, file_info)) {
-    *error_code = TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND;
-    return Status::NotFound(Substitute("Segment with sequence number $0 not found",
-                                       segment_seqno));
-  }
-  return Status::OK();
-}
-
-Status TabletCopySession::UnregisterAnchorIfNeededUnlocked() {
-  return tablet_peer_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_);
-}
-
-} // namespace tserver
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_session.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_session.h b/src/kudu/tserver/remote_bootstrap_session.h
deleted file mode 100644
index 7eb2383..0000000
--- a/src/kudu/tserver/remote_bootstrap_session.h
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TSERVER_TABLET_COPY_SESSION_H_
-#define KUDU_TSERVER_TABLET_COPY_SESSION_H_
-
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/fs/block_id.h"
-#include "kudu/fs/block_manager.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/tserver/remote_bootstrap.pb.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-
-class FsManager;
-
-namespace tablet {
-class TabletPeer;
-} // namespace tablet
-
-namespace tserver {
-
-class TabletPeerLookupIf;
-
-// Caches file size and holds a shared_ptr reference to a RandomAccessFile.
-// Assumes that the file underlying the RandomAccessFile is immutable.
-struct ImmutableRandomAccessFileInfo {
-  std::shared_ptr<RandomAccessFile> readable;
-  int64_t size;
-
-  ImmutableRandomAccessFileInfo(std::shared_ptr<RandomAccessFile> readable,
-                                int64_t size)
-      : readable(std::move(readable)), size(size) {}
-
-  Status ReadFully(uint64_t offset, int64_t size, Slice* data, uint8_t* scratch) const {
-    return env_util::ReadFully(readable.get(), offset, size, data, scratch);
-  }
-};
-
-// Caches block size and holds an exclusive reference to a ReadableBlock.
-// Assumes that the block underlying the ReadableBlock is immutable.
-struct ImmutableReadableBlockInfo {
-  gscoped_ptr<fs::ReadableBlock> readable;
-  int64_t size;
-
-  ImmutableReadableBlockInfo(fs::ReadableBlock* readable,
-                             int64_t size)
-  : readable(readable),
-    size(size) {
-  }
-
-  Status ReadFully(uint64_t offset, int64_t size, Slice* data, uint8_t* scratch) const {
-    return readable->Read(offset, size, data, scratch);
-  }
-};
-
-// A potential Learner must establish a TabletCopySession with the leader in order
-// to fetch the needed superblock, blocks, and log segments.
-// This class is refcounted to make it easy to remove it from the session map
-// on expiration while it is in use by another thread.
-class TabletCopySession : public RefCountedThreadSafe<TabletCopySession> {
- public:
-  TabletCopySession(const scoped_refptr<tablet::TabletPeer>& tablet_peer,
-                         std::string session_id, std::string requestor_uuid,
-                         FsManager* fs_manager);
-
-  // Initialize the session, including anchoring files (TODO) and fetching the
-  // tablet superblock and list of WAL segments.
-  //
-  // Must not be called more than once.
-  Status Init();
-
-  // Return ID of tablet corresponding to this session.
-  const std::string& tablet_id() const;
-
-  // Return UUID of the requestor that initiated this session.
-  const std::string& requestor_uuid() const;
-
-  // Open block for reading, if it's not already open, and read some of it.
-  // If maxlen is 0, we use a system-selected length for the data piece.
-  // *data is set to a std::string containing the data. Ownership of this object
-  // is passed to the caller. A string is used because the RPC interface is
-  // sending data serialized as protobuf and we want to minimize copying.
-  // On error, Status is set to a non-OK value and error_code is filled in.
-  //
-  // This method is thread-safe.
-  Status GetBlockPiece(const BlockId& block_id,
-                       uint64_t offset, int64_t client_maxlen,
-                       std::string* data, int64_t* block_file_size,
-                       TabletCopyErrorPB::Code* error_code);
-
-  // Get a piece of a log segment.
-  // The behavior and params are very similar to GetBlockPiece(), but this one
-  // is only for sending WAL segment files.
-  Status GetLogSegmentPiece(uint64_t segment_seqno,
-                            uint64_t offset, int64_t client_maxlen,
-                            std::string* data, int64_t* log_file_size,
-                            TabletCopyErrorPB::Code* error_code);
-
-  const tablet::TabletSuperBlockPB& tablet_superblock() const {
-    DCHECK(initted_);
-    return tablet_superblock_;
-  }
-
-  const consensus::ConsensusStatePB& initial_committed_cstate() const {
-    DCHECK(initted_);
-    return initial_committed_cstate_;
-  }
-
-  const log::SegmentSequence& log_segments() const {
-    DCHECK(initted_);
-    return log_segments_;
-  }
-
-  // Check if a block is currently open.
-  bool IsBlockOpenForTests(const BlockId& block_id) const;
-
- private:
-  friend class RefCountedThreadSafe<TabletCopySession>;
-
-  typedef std::unordered_map<BlockId, ImmutableReadableBlockInfo*, BlockIdHash> BlockMap;
-  typedef std::unordered_map<uint64_t, ImmutableRandomAccessFileInfo*> LogMap;
-
-  ~TabletCopySession();
-
-  // Open the block and add it to the block map.
-  Status OpenBlockUnlocked(const BlockId& block_id);
-
-  // Look up cached block information.
-  Status FindBlock(const BlockId& block_id,
-                   ImmutableReadableBlockInfo** block_info,
-                   TabletCopyErrorPB::Code* error_code);
-
-  // Snapshot the log segment's length and put it into segment map.
-  Status OpenLogSegmentUnlocked(uint64_t segment_seqno);
-
-  // Look up log segment in cache or log segment map.
-  Status FindLogSegment(uint64_t segment_seqno,
-                        ImmutableRandomAccessFileInfo** file_info,
-                        TabletCopyErrorPB::Code* error_code);
-
-  // Unregister log anchor, if it's registered.
-  Status UnregisterAnchorIfNeededUnlocked();
-
-  scoped_refptr<tablet::TabletPeer> tablet_peer_;
-  const std::string session_id_;
-  const std::string requestor_uuid_;
-  FsManager* const fs_manager_;
-
-  mutable Mutex session_lock_;
-  bool initted_ = false;
-  BlockMap blocks_; // Protected by session_lock_.
-  LogMap logs_;     // Protected by session_lock_.
-  ValueDeleter blocks_deleter_;
-  ValueDeleter logs_deleter_;
-
-  tablet::TabletSuperBlockPB tablet_superblock_;
-
-  consensus::ConsensusStatePB initial_committed_cstate_;
-
-  // The sequence of log segments that will be sent in the course of this
-  // session.
-  log::SegmentSequence log_segments_;
-
-  log::LogAnchor log_anchor_;
-
-  DISALLOW_COPY_AND_ASSIGN(TabletCopySession);
-};
-
-} // namespace tserver
-} // namespace kudu
-
-#endif // KUDU_TSERVER_TABLET_COPY_SESSION_H_

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h
new file mode 100644
index 0000000..bdd4a42
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TSERVER_TABLET_COPY_TEST_BASE_H_
+#define KUDU_TSERVER_TABLET_COPY_TEST_BASE_H_
+
+#include "kudu/tserver/tablet_server-test-base.h"
+
+#include <string>
+
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tserver/tablet_copy.pb.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace tserver {
+
+using consensus::MinimumOpId;
+
+// Number of times to roll the log.
+static const int kNumLogRolls = 2;
+
+class TabletCopyTest : public TabletServerTestBase {
+ public:
+  virtual void SetUp() OVERRIDE {
+    NO_FATALS(TabletServerTestBase::SetUp());
+    NO_FATALS(StartTabletServer());
+    // Prevent logs from being deleted out from under us until / unless we want
+    // to test that we are anchoring correctly. Since GenerateTestData() does a
+    // Flush(), Log GC is allowed to eat the logs before we get around to
+    // starting a tablet copy session.
+    tablet_peer_->log_anchor_registry()->Register(
+      MinimumOpId().index(), CURRENT_TEST_NAME(), &anchor_);
+    NO_FATALS(GenerateTestData());
+  }
+
+  virtual void TearDown() OVERRIDE {
+    ASSERT_OK(tablet_peer_->log_anchor_registry()->Unregister(&anchor_));
+    NO_FATALS(TabletServerTestBase::TearDown());
+  }
+
+ protected:
+  // Grab the first column block we find in the SuperBlock.
+  static BlockId FirstColumnBlockId(const tablet::TabletSuperBlockPB& superblock) {
+    const tablet::RowSetDataPB& rowset = superblock.rowsets(0);
+    const tablet::ColumnDataPB& column = rowset.columns(0);
+    const BlockIdPB& block_id_pb = column.block();
+    return BlockId::FromPB(block_id_pb);
+  }
+
+  // Check that the contents and CRC32C of a DataChunkPB are equal to a local buffer.
+  static void AssertDataEqual(const uint8_t* local, int64_t size, const DataChunkPB& remote) {
+    ASSERT_EQ(size, remote.data().size());
+    ASSERT_TRUE(strings::memeq(local, remote.data().data(), size));
+    uint32_t crc32 = crc::Crc32c(local, size);
+    ASSERT_EQ(crc32, remote.crc32());
+  }
+
+  // Generate the test data for the tablet and do the flushing we assume will be
+  // done in the unit tests for tablet copy.
+  void GenerateTestData() {
+    const int kIncr = 50;
+    LOG_TIMING(INFO, "Loading test data") {
+      for (int row_id = 0; row_id < kNumLogRolls * kIncr; row_id += kIncr) {
+        InsertTestRowsRemote(0, row_id, kIncr);
+        ASSERT_OK(tablet_peer_->tablet()->Flush());
+        ASSERT_OK(tablet_peer_->log()->AllocateSegmentAndRollOver());
+      }
+    }
+  }
+
+  // Return the permananent_uuid of the local service.
+  const std::string GetLocalUUID() const {
+    return tablet_peer_->permanent_uuid();
+  }
+
+  const std::string& GetTabletId() const {
+    return tablet_peer_->tablet()->tablet_id();
+  }
+
+  // Read a block file from the file system fully into memory and return a
+  // Slice pointing to it.
+  Status ReadLocalBlockFile(FsManager* fs_manager, const BlockId& block_id,
+                            faststring* scratch, Slice* slice) {
+    gscoped_ptr<fs::ReadableBlock> block;
+    RETURN_NOT_OK(fs_manager->OpenBlock(block_id, &block));
+
+    uint64_t size = 0;
+    RETURN_NOT_OK(block->Size(&size));
+    scratch->resize(size);
+    RETURN_NOT_OK(block->Read(0, size, slice, scratch->data()));
+
+    // Since the mmap will go away on return, copy the data into scratch.
+    if (slice->data() != scratch->data()) {
+      memcpy(scratch->data(), slice->data(), slice->size());
+      *slice = Slice(scratch->data(), slice->size());
+    }
+    return Status::OK();
+  }
+
+  log::LogAnchor anchor_;
+};
+
+} // namespace tserver
+} // namespace kudu
+
+#endif // KUDU_TSERVER_TABLET_COPY_TEST_BASE_H_

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy.proto b/src/kudu/tserver/tablet_copy.proto
new file mode 100644
index 0000000..1e89919
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy.proto
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package kudu.tserver;
+
+option java_package = "org.apache.kudu.tserver";
+
+import "kudu/common/wire_protocol.proto";
+import "kudu/consensus/metadata.proto";
+import "kudu/fs/fs.proto";
+import "kudu/rpc/rpc_header.proto";
+import "kudu/tablet/metadata.proto";
+
+// RaftConfig tablet copy RPC calls.
+service TabletCopyService {
+  // Establish a tablet copy session.
+  rpc BeginTabletCopySession(BeginTabletCopySessionRequestPB)
+      returns (BeginTabletCopySessionResponsePB);
+
+  // Check whether the specified session is active.
+  rpc CheckSessionActive(CheckTabletCopySessionActiveRequestPB)
+      returns (CheckTabletCopySessionActiveResponsePB);
+
+  // Fetch data (blocks, logs) from the server.
+  rpc FetchData(FetchDataRequestPB)
+      returns (FetchDataResponsePB);
+
+  // End a tablet copy session, allow server to release resources.
+  rpc EndTabletCopySession(EndTabletCopySessionRequestPB)
+      returns (EndTabletCopySessionResponsePB);
+}
+
+// Tablet Copy-specific errors use this protobuf.
+message TabletCopyErrorPB {
+  extend kudu.rpc.ErrorStatusPB {
+    optional TabletCopyErrorPB tablet_copy_error_ext = 102;
+  }
+
+  enum Code {
+    // An error which has no more specific error code.
+    // The code and message in 'status' may reveal more details.
+    //
+    // RPCs should avoid returning this, since callers will not be
+    // able to easily parse the error.
+    UNKNOWN_ERROR = 1;
+
+    // The specified tablet copy session either never existed or has expired.
+    NO_SESSION = 2;
+
+    // Unknown tablet.
+    TABLET_NOT_FOUND = 3;
+
+    // Unknown data block.
+    BLOCK_NOT_FOUND = 4;
+
+    // Unknown WAL segment.
+    WAL_SEGMENT_NOT_FOUND = 5;
+
+    // Invalid request. Possibly missing parameters.
+    INVALID_TABLET_COPY_REQUEST = 6;
+
+    // Error reading or transferring data.
+    IO_ERROR = 7;
+  }
+
+  // The error code.
+  required Code code = 1 [ default = UNKNOWN_ERROR ];
+
+  // The Status object for the error. This will include a textual
+  // message that may be more useful to present in log messages, etc,
+  // though its error code is less specific.
+  required AppStatusPB status = 2;
+}
+
+message BeginTabletCopySessionRequestPB {
+  // permanent_uuid of the requesting peer.
+  required bytes requestor_uuid = 1;
+
+  // tablet_id of the tablet the requester desires to bootstrap from.
+  required bytes tablet_id = 2;
+}
+
+message BeginTabletCopySessionResponsePB {
+  // Opaque session id assigned by the server.
+  // No guarantees are made as to the format of the session id.
+  required bytes session_id = 1;
+
+  // Maximum session idle timeout between requests.
+  // Learners will have to start over again if they reach this timeout.
+  // A value of 0 means there is no timeout.
+  required uint64 session_idle_timeout_millis = 2;
+
+  // Active superblock at the time of the request.
+  required tablet.TabletSuperBlockPB superblock = 3;
+
+  // Identifiers for the WAL segments available for download.
+  // Each WAL segment is keyed by its sequence number.
+  repeated uint64 wal_segment_seqnos = 4;
+
+  // A snapshot of the committed Consensus state at the time that the
+  // tablet copy session was started.
+  required consensus.ConsensusStatePB initial_committed_cstate = 5;
+
+  // permanent_uuid of the responding peer.
+  optional bytes responder_uuid = 6;
+}
+
+message CheckTabletCopySessionActiveRequestPB {
+  // Valid Session ID returned by a BeginTabletCopySession() RPC call.
+  required bytes session_id = 1;
+
+  // Set keepalive to true to reset the session timeout timer.
+  optional bool keepalive = 2 [default = false];
+}
+
+message CheckTabletCopySessionActiveResponsePB {
+  // Whether the given session id represents an active tablet copy session.
+  required bool session_is_active = 1;
+}
+
+// A "union" type that allows the same RPC call to fetch different types of
+// data (data blocks or log files).
+message DataIdPB {
+  enum IdType {
+    UNKNOWN = 0;
+    BLOCK = 1;
+    LOG_SEGMENT = 2;
+  }
+
+  // Indicator whether it's a block or log segment id.
+  required IdType type = 1;
+
+  // Exactly one of these must be set.
+  optional BlockIdPB block_id = 2;          // To fetch a block.
+  optional uint64 wal_segment_seqno = 3;    // To fetch a log segment.
+}
+
+message FetchDataRequestPB {
+  // Valid Session ID returned by a BeginTabletCopySession() RPC call.
+  required bytes session_id = 1;
+
+  // The server will use this ID to determine the key and type of data
+  // that was requested.
+  required DataIdPB data_id = 2;
+
+  // Offset into data to start reading from.
+  // If not specified, the server will send the data from offset 0.
+  optional uint64 offset = 3 [default = 0];
+
+  // Maximum length of the chunk of data to return.
+  // If max_length is not specified, or if the server's max is less than the
+  // requested max, the server will use its own max.
+  optional int64 max_length = 4 [default = 0];
+}
+
+// A chunk of data (a slice of a block, file, etc).
+message DataChunkPB {
+  // Offset into the complete data block or file that 'data' starts at.
+  required uint64 offset = 1;
+
+  // Actual bytes of data from the data block, starting at 'offset'.
+  required bytes data = 2;
+
+  // CRC32C of the bytes contained in 'data'.
+  required fixed32 crc32 = 3;
+
+  // Full length, in bytes, of the complete data block or file on the server.
+  // The number of bytes returned in 'data' can certainly be less than this.
+  required int64 total_data_length = 4;
+}
+
+message FetchDataResponsePB {
+  // The server will automatically release the resources (i.e. close file, free
+  // read buffers) for a given data resource after the last byte is read.
+  // So, per-resource, chunks are optimized to be fetched in-order.
+  required DataChunkPB chunk = 1;
+}
+
+message EndTabletCopySessionRequestPB {
+  required bytes session_id = 1;
+
+  // Set to true if bootstrap is successful.
+  required bool is_success = 2;
+
+  // Client-provided error message. The server will log this error so that an
+  // admin can identify when bad things are happening with tablet copy.
+  optional AppStatusPB error = 3;
+}
+
+message EndTabletCopySessionResponsePB {
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
new file mode 100644
index 0000000..5fd3a6c
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/tserver/tablet_copy-test-base.h"
+
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/tablet/tablet_bootstrap.h"
+#include "kudu/tserver/tablet_copy_client.h"
+#include "kudu/util/env_util.h"
+
+using std::shared_ptr;
+
+namespace kudu {
+namespace tserver {
+
+using consensus::GetRaftConfigLeader;
+using consensus::RaftPeerPB;
+using tablet::TabletMetadata;
+using tablet::TabletStatusListener;
+
+class TabletCopyClientTest : public TabletCopyTest {
+ public:
+  virtual void SetUp() OVERRIDE {
+    NO_FATALS(TabletCopyTest::SetUp());
+
+    fs_manager_.reset(new FsManager(Env::Default(), GetTestPath("client_tablet")));
+    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
+    ASSERT_OK(fs_manager_->Open());
+
+    tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
+    rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
+    client_.reset(new TabletCopyClient(GetTabletId(),
+                                            fs_manager_.get(),
+                                            messenger_));
+    ASSERT_OK(GetRaftConfigLeader(tablet_peer_->consensus()
+        ->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED), &leader_));
+
+    HostPort host_port;
+    ASSERT_OK(HostPortFromPB(leader_.last_known_addr(), &host_port));
+    ASSERT_OK(client_->Start(host_port, &meta_));
+  }
+
+ protected:
+  Status CompareFileContents(const string& path1, const string& path2);
+
+  gscoped_ptr<FsManager> fs_manager_;
+  shared_ptr<rpc::Messenger> messenger_;
+  gscoped_ptr<TabletCopyClient> client_;
+  scoped_refptr<TabletMetadata> meta_;
+  RaftPeerPB leader_;
+};
+
+Status TabletCopyClientTest::CompareFileContents(const string& path1, const string& path2) {
+  shared_ptr<RandomAccessFile> file1, file2;
+  RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path1, &file1));
+  RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path2, &file2));
+
+  uint64_t size1, size2;
+  RETURN_NOT_OK(file1->Size(&size1));
+  RETURN_NOT_OK(file2->Size(&size2));
+  if (size1 != size2) {
+    return Status::Corruption("Sizes of files don't match",
+                              strings::Substitute("$0 vs $1 bytes", size1, size2));
+  }
+
+  Slice slice1, slice2;
+  faststring scratch1, scratch2;
+  scratch1.resize(size1);
+  scratch2.resize(size2);
+  RETURN_NOT_OK(env_util::ReadFully(file1.get(), 0, size1, &slice1, scratch1.data()));
+  RETURN_NOT_OK(env_util::ReadFully(file2.get(), 0, size2, &slice2, scratch2.data()));
+  int result = strings::fastmemcmp_inlined(slice1.data(), slice2.data(), size1);
+  if (result != 0) {
+    return Status::Corruption("Files do not match");
+  }
+  return Status::OK();
+}
+
+// Basic begin / end tablet copy session.
+TEST_F(TabletCopyClientTest, TestBeginEndSession) {
+  TabletStatusListener listener(meta_);
+  ASSERT_OK(client_->FetchAll(&listener));
+  ASSERT_OK(client_->Finish());
+}
+
+// Basic data block download unit test.
+TEST_F(TabletCopyClientTest, TestDownloadBlock) {
+  TabletStatusListener listener(meta_);
+  BlockId block_id = FirstColumnBlockId(*client_->superblock_);
+  Slice slice;
+  faststring scratch;
+
+  // Ensure the block wasn't there before (it shouldn't be, we use our own FsManager dir).
+  Status s;
+  s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
+  ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
+
+  // Check that the client downloaded the block and verification passed.
+  BlockId new_block_id;
+  ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id));
+
+  // Ensure it placed the block where we expected it to.
+  s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
+  ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
+  ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice));
+}
+
+// Basic WAL segment download unit test.
+TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
+  ASSERT_OK(fs_manager_->CreateDirIfMissing(fs_manager_->GetTabletWalDir(GetTabletId())));
+
+  uint64_t seqno = client_->wal_seqnos_[0];
+  string path = fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno);
+
+  ASSERT_FALSE(fs_manager_->Exists(path));
+  ASSERT_OK(client_->DownloadWAL(seqno));
+  ASSERT_TRUE(fs_manager_->Exists(path));
+
+  log::SegmentSequence local_segments;
+  ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments));
+  const scoped_refptr<log::ReadableLogSegment>& segment = local_segments[0];
+  string server_path = segment->path();
+
+  // Compare the downloaded file with the source file.
+  ASSERT_OK(CompareFileContents(path, server_path));
+}
+
+// Ensure that we detect data corruption at the per-transfer level.
+TEST_F(TabletCopyClientTest, TestVerifyData) {
+  string good = "This is a known good string";
+  string bad = "This is a known bad! string";
+  const int kGoodOffset = 0;
+  const int kBadOffset = 1;
+  const int64_t kDataTotalLen = std::numeric_limits<int64_t>::max(); // Ignored.
+
+  // Create a known-good PB.
+  DataChunkPB valid_chunk;
+  valid_chunk.set_offset(0);
+  valid_chunk.set_data(good);
+  valid_chunk.set_crc32(crc::Crc32c(good.data(), good.length()));
+  valid_chunk.set_total_data_length(kDataTotalLen);
+
+  // Make sure we work on the happy case.
+  ASSERT_OK(client_->VerifyData(kGoodOffset, valid_chunk));
+
+  // Test unexpected offset.
+  DataChunkPB bad_offset = valid_chunk;
+  bad_offset.set_offset(kBadOffset);
+  Status s;
+  s = client_->VerifyData(kGoodOffset, bad_offset);
+  ASSERT_TRUE(s.IsInvalidArgument()) << "Bad offset expected: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Offset did not match");
+  LOG(INFO) << "Expected error returned: " << s.ToString();
+
+  // Test bad checksum.
+  DataChunkPB bad_checksum = valid_chunk;
+  bad_checksum.set_data(bad);
+  s = client_->VerifyData(kGoodOffset, bad_checksum);
+  ASSERT_TRUE(s.IsCorruption()) << "Invalid checksum expected: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "CRC32 does not match");
+  LOG(INFO) << "Expected error returned: " << s.ToString();
+}
+
+namespace {
+
+vector<BlockId> GetAllSortedBlocks(const tablet::TabletSuperBlockPB& sb) {
+  vector<BlockId> data_blocks;
+
+  for (const tablet::RowSetDataPB& rowset : sb.rowsets()) {
+    for (const tablet::DeltaDataPB& redo : rowset.redo_deltas()) {
+      data_blocks.push_back(BlockId::FromPB(redo.block()));
+    }
+    for (const tablet::DeltaDataPB& undo : rowset.undo_deltas()) {
+      data_blocks.push_back(BlockId::FromPB(undo.block()));
+    }
+    for (const tablet::ColumnDataPB& column : rowset.columns()) {
+      data_blocks.push_back(BlockId::FromPB(column.block()));
+    }
+    if (rowset.has_bloom_block()) {
+      data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
+    }
+    if (rowset.has_adhoc_index_block()) {
+      data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
+    }
+  }
+
+  std::sort(data_blocks.begin(), data_blocks.end(), BlockIdCompare());
+  return data_blocks;
+}
+
+} // anonymous namespace
+
+TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
+  // Download all the blocks.
+  ASSERT_OK(client_->DownloadBlocks());
+
+  // Verify that the new superblock reflects the changes in block IDs.
+  //
+  // As long as block IDs are generated with UUIDs or something equally
+  // unique, there's no danger of a block in the new superblock somehow
+  // being assigned the same ID as a block in the existing superblock.
+  vector<BlockId> old_data_blocks = GetAllSortedBlocks(*client_->superblock_.get());
+  vector<BlockId> new_data_blocks = GetAllSortedBlocks(*client_->new_superblock_.get());
+  vector<BlockId> result;
+  std::set_intersection(old_data_blocks.begin(), old_data_blocks.end(),
+                        new_data_blocks.begin(), new_data_blocks.end(),
+                        std::back_inserter(result), BlockIdCompare());
+  ASSERT_TRUE(result.empty());
+  ASSERT_EQ(old_data_blocks.size(), new_data_blocks.size());
+
+  // Verify that the old blocks aren't found. We're using a different
+  // FsManager than 'tablet_peer', so the only way an old block could end
+  // up in ours is due to a tablet copy client bug.
+  for (const BlockId& block_id : old_data_blocks) {
+    gscoped_ptr<fs::ReadableBlock> block;
+    Status s = fs_manager_->OpenBlock(block_id, &block);
+    ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
+  }
+  // And the new blocks are all present.
+  for (const BlockId& block_id : new_data_blocks) {
+    gscoped_ptr<fs::ReadableBlock> block;
+    ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
+  }
+}
+
+} // namespace tserver
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
new file mode 100644
index 0000000..56d1837
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -0,0 +1,563 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tserver/tablet_copy_client.h"
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/fs/block_id.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tablet/tablet_bootstrap.h"
+#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tserver/tablet_copy.pb.h"
+#include "kudu/tserver/tablet_copy.proxy.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/net_util.h"
+
+DEFINE_int32(tablet_copy_begin_session_timeout_ms, 3000,
+             "Tablet server RPC client timeout for BeginTabletCopySession calls. "
+             "Also used for EndTabletCopySession calls.");
+TAG_FLAG(tablet_copy_begin_session_timeout_ms, hidden);
+
+DEFINE_bool(tablet_copy_save_downloaded_metadata, false,
+            "Save copies of the downloaded tablet copy files for debugging purposes. "
+            "Note: This is only intended for debugging and should not be normally used!");
+TAG_FLAG(tablet_copy_save_downloaded_metadata, advanced);
+TAG_FLAG(tablet_copy_save_downloaded_metadata, hidden);
+TAG_FLAG(tablet_copy_save_downloaded_metadata, runtime);
+
+DEFINE_int32(tablet_copy_dowload_file_inject_latency_ms, 0,
+             "Injects latency into the loop that downloads files, causing tablet copy "
+             "to take much longer. For use in tests only.");
+TAG_FLAG(tablet_copy_dowload_file_inject_latency_ms, hidden);
+
+DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
+
+// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
+#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
+  RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg)
+
+namespace kudu {
+namespace tserver {
+
+using consensus::ConsensusMetadata;
+using consensus::ConsensusStatePB;
+using consensus::OpId;
+using consensus::RaftConfigPB;
+using consensus::RaftPeerPB;
+using env_util::CopyFile;
+using fs::WritableBlock;
+using rpc::Messenger;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+using tablet::ColumnDataPB;
+using tablet::DeltaDataPB;
+using tablet::RowSetDataPB;
+using tablet::TabletDataState;
+using tablet::TabletDataState_Name;
+using tablet::TabletMetadata;
+using tablet::TabletStatusListener;
+using tablet::TabletSuperBlockPB;
+
+TabletCopyClient::TabletCopyClient(std::string tablet_id,
+                                             FsManager* fs_manager,
+                                             shared_ptr<Messenger> messenger)
+    : tablet_id_(std::move(tablet_id)),
+      fs_manager_(fs_manager),
+      messenger_(std::move(messenger)),
+      started_(false),
+      downloaded_wal_(false),
+      downloaded_blocks_(false),
+      replace_tombstoned_tablet_(false),
+      status_listener_(nullptr),
+      session_idle_timeout_millis_(0),
+      start_time_micros_(0) {}
+
+TabletCopyClient::~TabletCopyClient() {
+  // Note: Ending the tablet copy session releases anchors on the remote.
+  WARN_NOT_OK(EndRemoteSession(), "Unable to close tablet copy session");
+}
+
+Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>& meta,
+                                                 int64_t caller_term) {
+  CHECK_EQ(tablet_id_, meta->tablet_id());
+  TabletDataState data_state = meta->tablet_data_state();
+  if (data_state != tablet::TABLET_DATA_TOMBSTONED) {
+    return Status::IllegalState(Substitute("Tablet $0 not in tombstoned state: $1 ($2)",
+                                           tablet_id_,
+                                           TabletDataState_Name(data_state),
+                                           data_state));
+  }
+
+  replace_tombstoned_tablet_ = true;
+  meta_ = meta;
+
+  int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
+  if (last_logged_term > caller_term) {
+    return Status::InvalidArgument(
+        Substitute("Leader has term $0 but the last log entry written by the tombstoned replica "
+                   "for tablet $1 has higher term $2. Refusing tablet copy from leader",
+                   caller_term, tablet_id_, last_logged_term));
+  }
+
+  // Load the old consensus metadata, if it exists.
+  gscoped_ptr<ConsensusMetadata> cmeta;
+  Status s = ConsensusMetadata::Load(fs_manager_, tablet_id_,
+                                     fs_manager_->uuid(), &cmeta);
+  if (s.IsNotFound()) {
+    // The consensus metadata was not written to disk, possibly due to a failed
+    // tablet copy.
+    return Status::OK();
+  }
+  RETURN_NOT_OK(s);
+  cmeta_.swap(cmeta);
+  return Status::OK();
+}
+
+Status TabletCopyClient::Start(const HostPort& copy_source_addr,
+                                    scoped_refptr<TabletMetadata>* meta) {
+  CHECK(!started_);
+  start_time_micros_ = GetCurrentTimeMicros();
+
+  Sockaddr addr;
+  RETURN_NOT_OK(SockaddrFromHostPort(copy_source_addr, &addr));
+  if (addr.IsWildcard()) {
+    return Status::InvalidArgument("Invalid wildcard address to tablet copy from",
+                                   Substitute("$0 (resolved to $1)",
+                                              copy_source_addr.host(), addr.host()));
+  }
+  LOG_WITH_PREFIX(INFO) << "Beginning tablet copy session"
+                        << " from remote peer at address " << copy_source_addr.ToString();
+
+  // Set up an RPC proxy for the TabletCopyService.
+  proxy_.reset(new TabletCopyServiceProxy(messenger_, addr));
+
+  BeginTabletCopySessionRequestPB req;
+  req.set_requestor_uuid(fs_manager_->uuid());
+  req.set_tablet_id(tablet_id_);
+
+  rpc::RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(
+      FLAGS_tablet_copy_begin_session_timeout_ms));
+
+  // Begin the tablet copy session with the remote peer.
+  BeginTabletCopySessionResponsePB resp;
+  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->BeginTabletCopySession(req, &resp, &controller),
+                               controller,
+                               "Unable to begin tablet copy session");
+  string copy_peer_uuid = resp.has_responder_uuid()
+      ? resp.responder_uuid() : "(unknown uuid)";
+  if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
+    Status s = Status::IllegalState("Remote peer (" + copy_peer_uuid + ")" +
+                                    " is currently copying itself!",
+                                    resp.superblock().ShortDebugString());
+    LOG_WITH_PREFIX(WARNING) << s.ToString();
+    return s;
+  }
+
+  session_id_ = resp.session_id();
+  session_idle_timeout_millis_ = resp.session_idle_timeout_millis();
+  superblock_.reset(resp.release_superblock());
+  superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING);
+  wal_seqnos_.assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
+  remote_committed_cstate_.reset(resp.release_initial_committed_cstate());
+
+  Schema schema;
+  RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock_->schema(), &schema),
+                        "Cannot deserialize schema from remote superblock");
+
+  if (replace_tombstoned_tablet_) {
+    // Also validate the term of the source peer, in case they are
+    // different. This is a sanity check that protects us in case a bug or
+    // misconfiguration causes us to attempt to copy from an out-of-date
+    // source peer, even after passing the term check from the caller in
+    // SetTabletToReplace().
+    int64_t last_logged_term = meta_->tombstone_last_logged_opid().term();
+    if (last_logged_term > remote_committed_cstate_->current_term()) {
+      return Status::InvalidArgument(
+          Substitute("Tablet $0: source peer has term $1 but "
+                     "tombstoned replica has last-logged opid with higher term $2. "
+                      "Refusing tablet copy from source peer $3",
+                      tablet_id_,
+                      remote_committed_cstate_->current_term(),
+                      last_logged_term,
+                      copy_peer_uuid));
+    }
+
+    // This will flush to disk, but we set the data state to COPYING above.
+    RETURN_NOT_OK_PREPEND(meta_->ReplaceSuperBlock(*superblock_),
+                          "Tablet Copy unable to replace superblock on tablet " +
+                          tablet_id_);
+  } else {
+
+    Partition partition;
+    Partition::FromPB(superblock_->partition(), &partition);
+    PartitionSchema partition_schema;
+    RETURN_NOT_OK(PartitionSchema::FromPB(superblock_->partition_schema(),
+                                          schema, &partition_schema));
+
+    // Create the superblock on disk.
+    RETURN_NOT_OK(TabletMetadata::CreateNew(fs_manager_, tablet_id_,
+                                            superblock_->table_name(),
+                                            superblock_->table_id(),
+                                            schema,
+                                            partition_schema,
+                                            partition,
+                                            tablet::TABLET_DATA_COPYING,
+                                            &meta_));
+  }
+
+  started_ = true;
+  if (meta) {
+    *meta = meta_;
+  }
+  return Status::OK();
+}
+
+Status TabletCopyClient::FetchAll(TabletStatusListener* status_listener) {
+  CHECK(started_);
+  status_listener_ = status_listener;
+
+  // Download all the files (serially, for now, but in parallel in the future).
+  RETURN_NOT_OK(DownloadBlocks());
+  RETURN_NOT_OK(DownloadWALs());
+
+  return Status::OK();
+}
+
+Status TabletCopyClient::Finish() {
+  CHECK(meta_);
+  CHECK(started_);
+  CHECK(downloaded_wal_);
+  CHECK(downloaded_blocks_);
+
+  RETURN_NOT_OK(WriteConsensusMetadata());
+
+  // Replace tablet metadata superblock. This will set the tablet metadata state
+  // to TABLET_DATA_READY, since we checked above that the response
+  // superblock is in a valid state to bootstrap from.
+  LOG_WITH_PREFIX(INFO) << "Tablet Copy complete. Replacing tablet superblock.";
+  UpdateStatusMessage("Replacing tablet superblock");
+  new_superblock_->set_tablet_data_state(tablet::TABLET_DATA_READY);
+  RETURN_NOT_OK(meta_->ReplaceSuperBlock(*new_superblock_));
+
+  if (FLAGS_tablet_copy_save_downloaded_metadata) {
+    string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_);
+    string meta_copy_path = Substitute("$0.copy.$1.tmp", meta_path, start_time_micros_);
+    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path,
+                                   WritableFileOptions()),
+                          "Unable to make copy of tablet metadata");
+  }
+
+  return Status::OK();
+}
+
+// Decode the remote error into a human-readable Status object.
+Status TabletCopyClient::ExtractRemoteError(const rpc::ErrorStatusPB& remote_error) {
+  if (PREDICT_TRUE(remote_error.HasExtension(TabletCopyErrorPB::tablet_copy_error_ext))) {
+    const TabletCopyErrorPB& error =
+        remote_error.GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
+    return StatusFromPB(error.status()).CloneAndPrepend("Received error code " +
+              TabletCopyErrorPB::Code_Name(error.code()) + " from remote service");
+  } else {
+    return Status::InvalidArgument("Unable to decode tablet copy RPC error message",
+                                   remote_error.ShortDebugString());
+  }
+}
+
+// Enhance a RemoteError Status message with additional details from the remote.
+Status TabletCopyClient::UnwindRemoteError(const Status& status,
+                                                const rpc::RpcController& controller) {
+  if (!status.IsRemoteError()) {
+    return status;
+  }
+  Status extension_status = ExtractRemoteError(*controller.error_response());
+  return status.CloneAndAppend(extension_status.ToString());
+}
+
+void TabletCopyClient::UpdateStatusMessage(const string& message) {
+  if (status_listener_ != nullptr) {
+    status_listener_->StatusMessage("TabletCopy: " + message);
+  }
+}
+
+Status TabletCopyClient::EndRemoteSession() {
+  if (!started_) {
+    return Status::OK();
+  }
+
+  rpc::RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(
+        FLAGS_tablet_copy_begin_session_timeout_ms));
+
+  EndTabletCopySessionRequestPB req;
+  req.set_session_id(session_id_);
+  req.set_is_success(true);
+  EndTabletCopySessionResponsePB resp;
+  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->EndTabletCopySession(req, &resp, &controller),
+                               controller,
+                               "Failure ending tablet copy session");
+
+  return Status::OK();
+}
+
+Status TabletCopyClient::DownloadWALs() {
+  CHECK(started_);
+
+  // Delete and recreate WAL dir if it already exists, to ensure stray files are
+  // not kept from previous copies and runs.
+  string path = fs_manager_->GetTabletWalDir(tablet_id_);
+  if (fs_manager_->env()->FileExists(path)) {
+    RETURN_NOT_OK(fs_manager_->env()->DeleteRecursively(path));
+  }
+  RETURN_NOT_OK(fs_manager_->env()->CreateDir(path));
+  RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir.
+
+  // Download the WAL segments.
+  int num_segments = wal_seqnos_.size();
+  LOG_WITH_PREFIX(INFO) << "Starting download of " << num_segments << " WAL segments...";
+  uint64_t counter = 0;
+  for (uint64_t seg_seqno : wal_seqnos_) {
+    UpdateStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
+                                   seg_seqno, counter + 1, num_segments));
+    RETURN_NOT_OK(DownloadWAL(seg_seqno));
+    ++counter;
+  }
+
+  downloaded_wal_ = true;
+  return Status::OK();
+}
+
+Status TabletCopyClient::DownloadBlocks() {
+  CHECK(started_);
+
+  // Count up the total number of blocks to download.
+  int num_blocks = 0;
+  for (const RowSetDataPB& rowset : superblock_->rowsets()) {
+    num_blocks += rowset.columns_size();
+    num_blocks += rowset.redo_deltas_size();
+    num_blocks += rowset.undo_deltas_size();
+    if (rowset.has_bloom_block()) {
+      num_blocks++;
+    }
+    if (rowset.has_adhoc_index_block()) {
+      num_blocks++;
+    }
+  }
+
+  // Download each block, writing the new block IDs into the new superblock
+  // as each block downloads.
+  gscoped_ptr<TabletSuperBlockPB> new_sb(new TabletSuperBlockPB());
+  new_sb->CopyFrom(*superblock_);
+  int block_count = 0;
+  LOG_WITH_PREFIX(INFO) << "Starting download of " << num_blocks << " data blocks...";
+  for (RowSetDataPB& rowset : *new_sb->mutable_rowsets()) {
+    for (ColumnDataPB& col : *rowset.mutable_columns()) {
+      RETURN_NOT_OK(DownloadAndRewriteBlock(col.mutable_block(),
+                                            &block_count, num_blocks));
+    }
+    for (DeltaDataPB& redo : *rowset.mutable_redo_deltas()) {
+      RETURN_NOT_OK(DownloadAndRewriteBlock(redo.mutable_block(),
+                                            &block_count, num_blocks));
+    }
+    for (DeltaDataPB& undo : *rowset.mutable_undo_deltas()) {
+      RETURN_NOT_OK(DownloadAndRewriteBlock(undo.mutable_block(),
+                                            &block_count, num_blocks));
+    }
+    if (rowset.has_bloom_block()) {
+      RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_bloom_block(),
+                                            &block_count, num_blocks));
+    }
+    if (rowset.has_adhoc_index_block()) {
+      RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_adhoc_index_block(),
+                                            &block_count, num_blocks));
+    }
+  }
+
+  // The orphaned physical block ids at the remote have no meaning to us.
+  new_sb->clear_orphaned_blocks();
+  new_superblock_.swap(new_sb);
+
+  downloaded_blocks_ = true;
+
+  return Status::OK();
+}
+
+Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
+  VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
+  DataIdPB data_id;
+  data_id.set_type(DataIdPB::LOG_SEGMENT);
+  data_id.set_wal_segment_seqno(wal_segment_seqno);
+  string dest_path = fs_manager_->GetWalSegmentFileName(tablet_id_, wal_segment_seqno);
+
+  WritableFileOptions opts;
+  opts.sync_on_close = true;
+  gscoped_ptr<WritableFile> writer;
+  RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, &writer),
+                        "Unable to open file for writing");
+  RETURN_NOT_OK_PREPEND(DownloadFile(data_id, writer.get()),
+                        Substitute("Unable to download WAL segment with seq. number $0",
+                                   wal_segment_seqno));
+  return Status::OK();
+}
+
+Status TabletCopyClient::WriteConsensusMetadata() {
+  // If we didn't find a previous consensus meta file, create one.
+  if (!cmeta_) {
+    gscoped_ptr<ConsensusMetadata> cmeta;
+    return ConsensusMetadata::Create(fs_manager_, tablet_id_, fs_manager_->uuid(),
+                                     remote_committed_cstate_->config(),
+                                     remote_committed_cstate_->current_term(),
+                                     &cmeta);
+  }
+
+  // Otherwise, update the consensus metadata to reflect the config and term
+  // sent by the tablet copy source.
+  cmeta_->MergeCommittedConsensusStatePB(*remote_committed_cstate_);
+  RETURN_NOT_OK(cmeta_->Flush());
+
+  if (FLAGS_tablet_copy_save_downloaded_metadata) {
+    string cmeta_path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
+    string cmeta_copy_path = Substitute("$0.copy.$1.tmp", cmeta_path, start_time_micros_);
+    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, cmeta_copy_path,
+                                   WritableFileOptions()),
+                          "Unable to make copy of consensus metadata");
+  }
+
+  return Status::OK();
+}
+
+Status TabletCopyClient::DownloadAndRewriteBlock(BlockIdPB* block_id,
+                                                      int* block_count, int num_blocks) {
+  BlockId old_block_id(BlockId::FromPB(*block_id));
+  UpdateStatusMessage(Substitute("Downloading block $0 ($1/$2)",
+                                 old_block_id.ToString(), *block_count,
+                                 num_blocks));
+  BlockId new_block_id;
+  RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
+      "Unable to download block with id " + old_block_id.ToString());
+
+  new_block_id.CopyToPB(block_id);
+  (*block_count)++;
+  return Status::OK();
+}
+
+Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
+                                            BlockId* new_block_id) {
+  VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString();
+
+  gscoped_ptr<WritableBlock> block;
+  RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block),
+                        "Unable to create new block");
+
+  DataIdPB data_id;
+  data_id.set_type(DataIdPB::BLOCK);
+  old_block_id.CopyToPB(data_id.mutable_block_id());
+  RETURN_NOT_OK_PREPEND(DownloadFile(data_id, block.get()),
+                        Substitute("Unable to download block $0",
+                                   old_block_id.ToString()));
+
+  *new_block_id = block->id();
+  RETURN_NOT_OK_PREPEND(block->Close(), "Unable to close block");
+  return Status::OK();
+}
+
+template<class Appendable>
+Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
+                                           Appendable* appendable) {
+  uint64_t offset = 0;
+  rpc::RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
+  FetchDataRequestPB req;
+
+  bool done = false;
+  while (!done) {
+    controller.Reset();
+    req.set_session_id(session_id_);
+    req.mutable_data_id()->CopyFrom(data_id);
+    req.set_offset(offset);
+    req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
+
+    FetchDataResponsePB resp;
+    RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
+                                controller,
+                                "Unable to fetch data from remote");
+
+    // Sanity-check for corruption.
+    RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
+                          Substitute("Error validating data item $0", data_id.ShortDebugString()));
+
+    // Write the data.
+    RETURN_NOT_OK(appendable->Append(resp.chunk().data()));
+
+    if (PREDICT_FALSE(FLAGS_tablet_copy_dowload_file_inject_latency_ms > 0)) {
+      LOG_WITH_PREFIX(INFO) << "Injecting latency into file download: " <<
+          FLAGS_tablet_copy_dowload_file_inject_latency_ms;
+      SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_dowload_file_inject_latency_ms));
+    }
+
+    if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
+      done = true;
+    }
+    offset += resp.chunk().data().size();
+  }
+
+  return Status::OK();
+}
+
+Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
+  // Verify the offset is what we expected.
+  if (offset != chunk.offset()) {
+    return Status::InvalidArgument("Offset did not match what was asked for",
+        Substitute("$0 vs $1", offset, chunk.offset()));
+  }
+
+  // Verify the checksum.
+  uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
+  if (PREDICT_FALSE(crc32 != chunk.crc32())) {
+    return Status::Corruption(
+        Substitute("CRC32 does not match at offset $0 size $1: $2 vs $3",
+          offset, chunk.data().size(), crc32, chunk.crc32()));
+  }
+  return Status::OK();
+}
+
+string TabletCopyClient::LogPrefix() {
+  return Substitute("T $0 P $1: Tablet Copy client: ",
+                    tablet_id_, fs_manager_->uuid());
+}
+
+} // namespace tserver
+} // namespace kudu


Mime
View raw message