kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/5] kudu git commit: Rename remote bootstrap files to 'tablet copy'
Date Sun, 07 Aug 2016 03:56:50 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
new file mode 100644
index 0000000..01253ee
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TSERVER_TABLET_COPY_CLIENT_H
+#define KUDU_TSERVER_TABLET_COPY_CLIENT_H
+
+#include <string>
+#include <memory>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class BlockId;
+class BlockIdPB;
+class FsManager;
+class HostPort;
+
+namespace consensus {
+class ConsensusMetadata;
+class ConsensusStatePB;
+class RaftConfigPB;
+class RaftPeerPB;
+} // namespace consensus
+
+namespace rpc {
+class ErrorStatusPB;
+class Messenger;
+class RpcController;
+} // namespace rpc
+
+namespace tablet {
+class TabletMetadata;
+class TabletPeer;
+class TabletStatusListener;
+class TabletSuperBlockPB;
+} // namespace tablet
+
+namespace tserver {
+class DataIdPB;
+class DataChunkPB;
+class TabletCopyServiceProxy;
+
+// Client class for using tablet copy to copy a tablet from another host.
+// This class is not thread-safe.
+//
+// TODO:
+// * Parallelize download of blocks and WAL segments.
+//
+class TabletCopyClient {
+ public:
+
+  // Construct the tablet copy client.
+  // 'fs_manager' and 'messenger' must remain valid until this object is destroyed.
+  TabletCopyClient(std::string tablet_id, FsManager* fs_manager,
+                        std::shared_ptr<rpc::Messenger> messenger);
+
+  // Attempt to clean up resources on the remote end by sending an
+  // EndTabletCopySession() RPC
+  ~TabletCopyClient();
+
+  // Pass in the existing metadata for a tombstoned tablet, which will be
+  // replaced if validation checks pass in Start().
+  // 'meta' is the metadata for the tombstoned tablet and 'caller_term' is the
+  // term provided by the caller (assumed to be the current leader of the
+  // consensus config) for validation purposes.
+  // If the consensus metadata exists on disk for this tablet, and if
+  // 'caller_term' is lower than the current term stored in that consensus
+  // metadata, then this method will fail with a Status::InvalidArgument error.
+  Status SetTabletToReplace(const scoped_refptr<tablet::TabletMetadata>& meta,
+                            int64_t caller_term);
+
+  // Start up a tablet copy session to bootstrap from the specified
+  // bootstrap peer. Place a new superblock indicating that tablet copy is
+  // in progress. If the 'metadata' pointer is passed as NULL, it is ignored,
+  // otherwise the TabletMetadata object resulting from the initial remote
+  // bootstrap response is returned.
+  Status Start(const HostPort& copy_source_addr,
+               scoped_refptr<tablet::TabletMetadata>* metadata);
+
+  // Runs a "full" tablet copy, copying the physical layout of a tablet
+  // from the leader of the specified consensus configuration.
+  Status FetchAll(tablet::TabletStatusListener* status_listener);
+
+  // After downloading all files successfully, write out the completed
+  // replacement superblock.
+  Status Finish();
+
+ private:
+  FRIEND_TEST(TabletCopyClientTest, TestBeginEndSession);
+  FRIEND_TEST(TabletCopyClientTest, TestDownloadBlock);
+  FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
+  FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
+  FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
+
+  // Extract the embedded Status message from the given ErrorStatusPB.
+  // The given ErrorStatusPB must extend TabletCopyErrorPB.
+  static Status ExtractRemoteError(const rpc::ErrorStatusPB& remote_error);
+
+  static Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller);
+
+  // Update the bootstrap StatusListener with a message.
+  // The string "TabletCopy: " will be prepended to each message.
+  void UpdateStatusMessage(const std::string& message);
+
+  // End the tablet copy session.
+  Status EndRemoteSession();
+
+  // Download all WAL files sequentially.
+  Status DownloadWALs();
+
+  // Download a single WAL file.
+  // Assumes the WAL directories have already been created.
+  // WAL file is opened with options so that it will fsync() on close.
+  Status DownloadWAL(uint64_t wal_segment_seqno);
+
+  // Write out the Consensus Metadata file based on the ConsensusStatePB
+  // downloaded as part of initiating the tablet copy session.
+  Status WriteConsensusMetadata();
+
+  // Download all blocks belonging to a tablet sequentially.
+  //
+  // Blocks are given new IDs upon creation. On success, 'new_superblock_'
+  // is populated to reflect the new block IDs and should be used in lieu
+  // of 'superblock_' henceforth.
+  Status DownloadBlocks();
+
+  // Download the block specified by 'block_id'.
+  //
+  // On success:
+  // - 'block_id' is set to the new ID of the downloaded block.
+  // - 'block_count' is incremented.
+  Status DownloadAndRewriteBlock(BlockIdPB* block_id, int* block_count, int num_blocks);
+
+  // Download a single block.
+  // Data block is opened with options so that it will fsync() on close.
+  //
+  // On success, 'new_block_id' is set to the new ID of the downloaded block.
+  Status DownloadBlock(const BlockId& old_block_id, BlockId* new_block_id);
+
+  // Download a single remote file. The block and WAL implementations delegate
+  // to this method when downloading files.
+  //
+  // An Appendable is typically a WritableBlock (block) or WritableFile (WAL).
+  //
+  // Only used in one compilation unit, otherwise the implementation would
+  // need to be in the header.
+  template<class Appendable>
+  Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
+
+  Status VerifyData(uint64_t offset, const DataChunkPB& resp);
+
+  // Return standard log prefix.
+  std::string LogPrefix();
+
+  // Set-once members.
+  const std::string tablet_id_;
+  FsManager* const fs_manager_;
+  const std::shared_ptr<rpc::Messenger> messenger_;
+
+  // State flags that enforce the progress of tablet copy.
+  bool started_;            // Session started.
+  bool downloaded_wal_;     // WAL segments downloaded.
+  bool downloaded_blocks_;  // Data blocks downloaded.
+
+  // Session-specific data items.
+  bool replace_tombstoned_tablet_;
+
+  // Local tablet metadata file.
+  scoped_refptr<tablet::TabletMetadata> meta_;
+
+  // Local Consensus metadata file. This may initially be NULL if this is
+  // bootstrapping a new replica (rather than replacing an old one).
+  gscoped_ptr<consensus::ConsensusMetadata> cmeta_;
+
+  tablet::TabletStatusListener* status_listener_;
+  std::shared_ptr<TabletCopyServiceProxy> proxy_;
+  std::string session_id_;
+  uint64_t session_idle_timeout_millis_;
+  gscoped_ptr<tablet::TabletSuperBlockPB> superblock_;
+  gscoped_ptr<tablet::TabletSuperBlockPB> new_superblock_;
+  gscoped_ptr<consensus::ConsensusStatePB> remote_committed_cstate_;
+  std::vector<uint64_t> wal_seqnos_;
+  int64_t start_time_micros_;
+
+  DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
+};
+
+} // namespace tserver
+} // namespace kudu
+#endif /* KUDU_TSERVER_TABLET_COPY_CLIENT_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_service-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc b/src/kudu/tserver/tablet_copy_service-test.cc
new file mode 100644
index 0000000..b50c5fe
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -0,0 +1,491 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/tserver/tablet_copy-test-base.h"
+
+#include <gflags/gflags.h>
+#include <limits>
+#include <thread>
+#include <vector>
+
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/tserver/tablet_copy.pb.h"
+#include "kudu/tserver/tserver_service.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+#define ASSERT_REMOTE_ERROR(status, err, code, str) \
+    ASSERT_NO_FATAL_FAILURE(AssertRemoteError(status, err, code, str))
+
+DECLARE_uint64(tablet_copy_idle_timeout_ms);
+DECLARE_uint64(tablet_copy_timeout_poll_period_ms);
+
+namespace kudu {
+namespace tserver {
+
+using consensus::MaximumOpId;
+using consensus::MinimumOpId;
+using consensus::OpIdEquals;
+using env_util::ReadFully;
+using log::ReadableLogSegment;
+using rpc::ErrorStatusPB;
+using rpc::RpcController;
+using std::thread;
+using std::vector;
+
+class TabletCopyServiceTest : public TabletCopyTest {
+ public:
+  TabletCopyServiceTest() {
+    // Poll for session expiration every 10 ms for the session timeout test.
+    FLAGS_tablet_copy_timeout_poll_period_ms = 10;
+  }
+
+ protected:
+  void SetUp() OVERRIDE {
+    TabletCopyTest::SetUp();
+    tablet_copy_proxy_.reset(
+        new TabletCopyServiceProxy(client_messenger_, mini_server_->bound_rpc_addr()));
+  }
+
+  Status DoBeginTabletCopySession(const string& tablet_id,
+                                       const string& requestor_uuid,
+                                       BeginTabletCopySessionResponsePB* resp,
+                                       RpcController* controller) {
+    controller->set_timeout(MonoDelta::FromSeconds(1.0));
+    BeginTabletCopySessionRequestPB req;
+    req.set_tablet_id(tablet_id);
+    req.set_requestor_uuid(requestor_uuid);
+    return UnwindRemoteError(
+        tablet_copy_proxy_->BeginTabletCopySession(req, resp, controller), controller);
+  }
+
+  Status DoBeginValidTabletCopySession(string* session_id,
+                                            tablet::TabletSuperBlockPB* superblock = nullptr,
+                                            uint64_t* idle_timeout_millis = nullptr,
+                                            vector<uint64_t>* sequence_numbers = nullptr) {
+    BeginTabletCopySessionResponsePB resp;
+    RpcController controller;
+    RETURN_NOT_OK(DoBeginTabletCopySession(GetTabletId(), GetLocalUUID(), &resp, &controller));
+    *session_id = resp.session_id();
+    if (superblock) {
+      *superblock = resp.superblock();
+    }
+    if (idle_timeout_millis) {
+      *idle_timeout_millis = resp.session_idle_timeout_millis();
+    }
+    if (sequence_numbers) {
+      sequence_numbers->assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
+    }
+    return Status::OK();
+  }
+
+  Status DoCheckSessionActive(const string& session_id,
+                              CheckTabletCopySessionActiveResponsePB* resp,
+                              RpcController* controller) {
+    controller->set_timeout(MonoDelta::FromSeconds(1.0));
+    CheckTabletCopySessionActiveRequestPB req;
+    req.set_session_id(session_id);
+    return UnwindRemoteError(
+        tablet_copy_proxy_->CheckSessionActive(req, resp, controller), controller);
+  }
+
+  Status DoFetchData(const string& session_id, const DataIdPB& data_id,
+                     uint64_t* offset, int64_t* max_length,
+                     FetchDataResponsePB* resp,
+                     RpcController* controller) {
+    controller->set_timeout(MonoDelta::FromSeconds(1.0));
+    FetchDataRequestPB req;
+    req.set_session_id(session_id);
+    req.mutable_data_id()->CopyFrom(data_id);
+    if (offset) {
+      req.set_offset(*offset);
+    }
+    if (max_length) {
+      req.set_max_length(*max_length);
+    }
+    return UnwindRemoteError(
+        tablet_copy_proxy_->FetchData(req, resp, controller), controller);
+  }
+
+  Status DoEndTabletCopySession(const string& session_id, bool is_success,
+                                     const Status* error_msg,
+                                     EndTabletCopySessionResponsePB* resp,
+                                     RpcController* controller) {
+    controller->set_timeout(MonoDelta::FromSeconds(1.0));
+    EndTabletCopySessionRequestPB req;
+    req.set_session_id(session_id);
+    req.set_is_success(is_success);
+    if (error_msg) {
+      StatusToPB(*error_msg, req.mutable_error());
+    }
+    return UnwindRemoteError(
+        tablet_copy_proxy_->EndTabletCopySession(req, resp, controller), controller);
+  }
+
+  // Decode the remote error into a Status object.
+  Status ExtractRemoteError(const ErrorStatusPB* remote_error) {
+    const TabletCopyErrorPB& error =
+        remote_error->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
+    return StatusFromPB(error.status());
+  }
+
+  // Enhance a RemoteError Status message with additional details from the remote.
+  Status UnwindRemoteError(Status status, const RpcController* controller) {
+    if (!status.IsRemoteError() ||
+        controller->error_response()->code() != ErrorStatusPB::ERROR_APPLICATION) {
+      return status;
+    }
+    Status remote_error = ExtractRemoteError(controller->error_response());
+    return status.CloneAndPrepend(remote_error.ToString());
+  }
+
+  void AssertRemoteError(Status status, const ErrorStatusPB* remote_error,
+                         const TabletCopyErrorPB::Code app_code,
+                         const string& status_code_string) {
+    ASSERT_TRUE(status.IsRemoteError()) << "Unexpected status code: " << status.ToString()
+                                        << ", app code: "
+                                        << TabletCopyErrorPB::Code_Name(app_code)
+                                        << ", status code string: " << status_code_string;
+    const Status app_status = ExtractRemoteError(remote_error);
+    const TabletCopyErrorPB& error =
+        remote_error->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
+    ASSERT_EQ(app_code, error.code()) << error.ShortDebugString();
+    ASSERT_EQ(status_code_string, app_status.CodeAsString()) << app_status.ToString();
+    LOG(INFO) << app_status.ToString();
+  }
+
+  // Return BlockId in format suitable for a FetchData() call.
+  static DataIdPB AsDataTypeId(const BlockId& block_id) {
+    DataIdPB data_id;
+    data_id.set_type(DataIdPB::BLOCK);
+    block_id.CopyToPB(data_id.mutable_block_id());
+    return data_id;
+  }
+
+  gscoped_ptr<TabletCopyServiceProxy> tablet_copy_proxy_;
+};
+
+// Test beginning and ending a tablet copy session.
+TEST_F(TabletCopyServiceTest, TestSimpleBeginEndSession) {
+  string session_id;
+  tablet::TabletSuperBlockPB superblock;
+  uint64_t idle_timeout_millis;
+  vector<uint64_t> segment_seqnos;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id,
+                                               &superblock,
+                                               &idle_timeout_millis,
+                                               &segment_seqnos));
+  // Basic validation of returned params.
+  ASSERT_FALSE(session_id.empty());
+  ASSERT_EQ(FLAGS_tablet_copy_idle_timeout_ms, idle_timeout_millis);
+  ASSERT_TRUE(superblock.IsInitialized());
+  // We should have number of segments = number of rolls + 1 (due to the active segment).
+  ASSERT_EQ(kNumLogRolls + 1, segment_seqnos.size());
+
+  EndTabletCopySessionResponsePB resp;
+  RpcController controller;
+  ASSERT_OK(DoEndTabletCopySession(session_id, true, nullptr, &resp, &controller));
+}
+
+// Test starting two sessions. The current implementation will silently only create one.
+TEST_F(TabletCopyServiceTest, TestBeginTwice) {
+  // Second time through should silently succeed.
+  for (int i = 0; i < 2; i++) {
+    string session_id;
+    ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
+    ASSERT_FALSE(session_id.empty());
+  }
+}
+
+// Regression test for KUDU-1436: race conditions if multiple requests
+// to begin the same tablet copy session arrive at more or less the
+// same time.
+TEST_F(TabletCopyServiceTest, TestBeginConcurrently) {
+  const int kNumThreads = 5;
+  vector<thread> threads;
+  vector<tablet::TabletSuperBlockPB> sblocks(kNumThreads);
+  for (int i = 0 ; i < kNumThreads; i++) {
+    threads.emplace_back([this, &sblocks, i]{
+        string session_id;
+        CHECK_OK(DoBeginValidTabletCopySession(&session_id, &sblocks[i]));
+        CHECK(!session_id.empty());
+      });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  // Verify that all threads got the same result.
+  for (int i = 1; i < threads.size(); i++) {
+    ASSERT_EQ(sblocks[i].DebugString(), sblocks[0].DebugString());
+  }
+}
+
+// Test bad session id error condition.
+TEST_F(TabletCopyServiceTest, TestInvalidSessionId) {
+  vector<string> bad_session_ids;
+  bad_session_ids.push_back("hodor");
+  bad_session_ids.push_back(GetLocalUUID());
+
+  // Fetch a block for a non-existent session.
+  for (const string& session_id : bad_session_ids) {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    DataIdPB data_id;
+    data_id.set_type(DataIdPB::BLOCK);
+    data_id.mutable_block_id()->set_id(1);
+    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
+    ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::NO_SESSION,
+                        Status::NotFound("").CodeAsString());
+  }
+
+  // End a non-existent session.
+  for (const string& session_id : bad_session_ids) {
+    EndTabletCopySessionResponsePB resp;
+    RpcController controller;
+    Status status = DoEndTabletCopySession(session_id, true, nullptr, &resp, &controller);
+    ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::NO_SESSION,
+                        Status::NotFound("").CodeAsString());
+  }
+}
+
+// Test bad tablet id error condition.
+TEST_F(TabletCopyServiceTest, TestInvalidTabletId) {
+  BeginTabletCopySessionResponsePB resp;
+  RpcController controller;
+  Status status =
+      DoBeginTabletCopySession("some-unknown-tablet", GetLocalUUID(), &resp, &controller);
+  ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::TABLET_NOT_FOUND,
+                      Status::NotFound("").CodeAsString());
+}
+
+// Test DataIdPB validation.
+TEST_F(TabletCopyServiceTest, TestInvalidBlockOrOpId) {
+  string session_id;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
+
+  // Invalid BlockId.
+  {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    DataIdPB data_id;
+    data_id.set_type(DataIdPB::BLOCK);
+    data_id.mutable_block_id()->set_id(1);
+    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
+    ASSERT_REMOTE_ERROR(status, controller.error_response(),
+                        TabletCopyErrorPB::BLOCK_NOT_FOUND,
+                        Status::NotFound("").CodeAsString());
+  }
+
+  // Invalid Segment Sequence Number for log fetch.
+  {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    DataIdPB data_id;
+    data_id.set_type(DataIdPB::LOG_SEGMENT);
+    data_id.set_wal_segment_seqno(31337);
+    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
+    ASSERT_REMOTE_ERROR(status, controller.error_response(),
+                        TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND,
+                        Status::NotFound("").CodeAsString());
+  }
+
+  // Empty data type with BlockId.
+  // The server will reject the request since we are missing the required 'type' field.
+  {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    DataIdPB data_id;
+    data_id.mutable_block_id()->set_id(1);
+    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
+    ASSERT_TRUE(status.IsRemoteError()) << status.ToString();
+    ASSERT_STR_CONTAINS(status.ToString(),
+                        "Invalid argument: invalid parameter for call "
+                        "kudu.tserver.TabletCopyService.FetchData: "
+                        "missing fields: data_id.type");
+  }
+
+  // Empty data type id (no BlockId, no Segment Sequence Number);
+  {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    DataIdPB data_id;
+    data_id.set_type(DataIdPB::LOG_SEGMENT);
+    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
+    ASSERT_REMOTE_ERROR(status, controller.error_response(),
+                        TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
+                        Status::InvalidArgument("").CodeAsString());
+  }
+
+  // Both BlockId and Segment Sequence Number in the same "union" PB (illegal).
+  {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    DataIdPB data_id;
+    data_id.set_type(DataIdPB::BLOCK);
+    data_id.mutable_block_id()->set_id(1);
+    data_id.set_wal_segment_seqno(0);
+    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
+    ASSERT_REMOTE_ERROR(status, controller.error_response(),
+                        TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
+                        Status::InvalidArgument("").CodeAsString());
+  }
+}
+
+// Test invalid file offset error condition.
+TEST_F(TabletCopyServiceTest, TestFetchInvalidBlockOffset) {
+  string session_id;
+  tablet::TabletSuperBlockPB superblock;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
+
+  FetchDataResponsePB resp;
+  RpcController controller;
+  // Impossible offset.
+  uint64_t offset = std::numeric_limits<uint64_t>::max();
+  Status status = DoFetchData(session_id, AsDataTypeId(FirstColumnBlockId(superblock)),
+                              &offset, nullptr, &resp, &controller);
+  ASSERT_REMOTE_ERROR(status, controller.error_response(),
+                      TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
+                      Status::InvalidArgument("").CodeAsString());
+}
+
+// Test that we are able to fetch an entire block.
+TEST_F(TabletCopyServiceTest, TestFetchBlockAtOnce) {
+  string session_id;
+  tablet::TabletSuperBlockPB superblock;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
+
+  // Local.
+  BlockId block_id = FirstColumnBlockId(superblock);
+  Slice local_data;
+  faststring scratch;
+  ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
+                               &scratch, &local_data));
+
+  // Remote.
+  FetchDataResponsePB resp;
+  RpcController controller;
+  ASSERT_OK(DoFetchData(session_id, AsDataTypeId(block_id), nullptr, nullptr, &resp, &controller));
+
+  AssertDataEqual(local_data.data(), local_data.size(), resp.chunk());
+}
+
+// Test that we are able to incrementally fetch blocks.
+TEST_F(TabletCopyServiceTest, TestFetchBlockIncrementally) {
+  string session_id;
+  tablet::TabletSuperBlockPB superblock;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
+
+  BlockId block_id = FirstColumnBlockId(superblock);
+  Slice local_data;
+  faststring scratch;
+  ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
+                               &scratch, &local_data));
+
+  // Grab the remote data in several chunks.
+  int64_t block_size = local_data.size();
+  int64_t max_chunk_size = block_size / 5;
+  uint64_t offset = 0;
+  while (offset < block_size) {
+    FetchDataResponsePB resp;
+    RpcController controller;
+    ASSERT_OK(DoFetchData(session_id, AsDataTypeId(block_id),
+                                 &offset, &max_chunk_size, &resp, &controller));
+    int64_t returned_bytes = resp.chunk().data().size();
+    ASSERT_LE(returned_bytes, max_chunk_size);
+    AssertDataEqual(local_data.data() + offset, returned_bytes, resp.chunk());
+    offset += returned_bytes;
+  }
+}
+
+// Test that we are able to fetch log segments.
+TEST_F(TabletCopyServiceTest, TestFetchLog) {
+  string session_id;
+  tablet::TabletSuperBlockPB superblock;
+  uint64_t idle_timeout_millis;
+  vector<uint64_t> segment_seqnos;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id,
+                                               &superblock,
+                                               &idle_timeout_millis,
+                                               &segment_seqnos));
+
+  ASSERT_EQ(kNumLogRolls + 1, segment_seqnos.size());
+  uint64_t seg_seqno = *segment_seqnos.begin();
+
+  // Fetch the remote data.
+  FetchDataResponsePB resp;
+  RpcController controller;
+  DataIdPB data_id;
+  data_id.set_type(DataIdPB::LOG_SEGMENT);
+  data_id.set_wal_segment_seqno(seg_seqno);
+  ASSERT_OK(DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller));
+
+  // Fetch the local data.
+  log::SegmentSequence local_segments;
+  ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments));
+
+  uint64_t first_seg_seqno = (*local_segments.begin())->header().sequence_number();
+
+
+  ASSERT_EQ(seg_seqno, first_seg_seqno)
+      << "Expected equal sequence numbers: " << seg_seqno
+      << " and " << first_seg_seqno;
+  const scoped_refptr<ReadableLogSegment>& segment = local_segments[0];
+  faststring scratch;
+  int64_t size = segment->file_size();
+  scratch.resize(size);
+  Slice slice;
+  ASSERT_OK(ReadFully(segment->readable_file().get(), 0, size, &slice, scratch.data()));
+
+  AssertDataEqual(slice.data(), slice.size(), resp.chunk());
+}
+
+// Test that the tablet copy session timeout works properly.
+TEST_F(TabletCopyServiceTest, TestSessionTimeout) {
+  // This flag should be seen by the service due to TSO.
+  // We have also reduced the timeout polling frequency in SetUp().
+  FLAGS_tablet_copy_idle_timeout_ms = 1; // Expire the session almost immediately.
+
+  // Start session.
+  string session_id;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
+
+  MonoTime start_time = MonoTime::Now(MonoTime::FINE);
+  CheckTabletCopySessionActiveResponsePB resp;
+
+  do {
+    RpcController controller;
+    ASSERT_OK(DoCheckSessionActive(session_id, &resp, &controller));
+    if (!resp.session_is_active()) {
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1)); // 1 ms
+  } while (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start_time).ToSeconds() < 10);
+
+  ASSERT_FALSE(resp.session_is_active()) << "Tablet Copy session did not time out!";
+}
+
+} // namespace tserver
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
new file mode 100644
index 0000000..62168a7
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/tserver/tablet_copy_service.h"
+
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <string>
+#include <vector>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/log.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/tserver/tablet_copy_session.h"
+#include "kudu/tserver/tablet_peer_lookup.h"
+#include "kudu/tablet/tablet_peer.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+
+// Note, this macro assumes the existence of a local var named 'context'.
+#define RPC_RETURN_APP_ERROR(app_err, message, s) \
+  do { \
+    SetupErrorAndRespond(context, app_err, message, s); \
+    return; \
+  } while (false)
+
+#define RPC_RETURN_NOT_OK(expr, app_err, message) \
+  do { \
+    Status s = (expr); \
+    if (!s.ok()) { \
+      RPC_RETURN_APP_ERROR(app_err, message, s); \
+    } \
+  } while (false)
+
+DEFINE_uint64(tablet_copy_idle_timeout_ms, 180000,
+              "Amount of time without activity before a tablet copy "
+              "session will expire, in millis");
+TAG_FLAG(tablet_copy_idle_timeout_ms, hidden);
+
+DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000,
+              "How often the tablet_copy service polls for expired "
+              "tablet copy sessions, in millis");
+TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden);
+
+DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0,
+              "Fraction of the time when the tablet will crash while "
+              "servicing a TabletCopyService FetchData() RPC call. "
+              "(For testing only!)");
+TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe);
+
+namespace kudu {
+namespace tserver {
+
+using crc::Crc32c;
+using strings::Substitute;
+using tablet::TabletPeer;
+
+static void SetupErrorAndRespond(rpc::RpcContext* context,
+                                 TabletCopyErrorPB::Code code,
+                                 const string& message,
+                                 const Status& s) {
+  LOG(WARNING) << "Error handling TabletCopyService RPC request from "
+               << context->requestor_string() << ": "
+               << s.ToString();
+  TabletCopyErrorPB error;
+  StatusToPB(s, error.mutable_status());
+  error.set_code(code);
+  context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(),
+                                   message, error);
+}
+
+TabletCopyServiceImpl::TabletCopyServiceImpl(
+    FsManager* fs_manager,
+    TabletPeerLookupIf* tablet_peer_lookup,
+    const scoped_refptr<MetricEntity>& metric_entity,
+    const scoped_refptr<rpc::ResultTracker>& result_tracker)
+    : TabletCopyServiceIf(metric_entity, result_tracker),
+      fs_manager_(CHECK_NOTNULL(fs_manager)),
+      tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
+      shutdown_latch_(1) {
+  CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
+                          &TabletCopyServiceImpl::EndExpiredSessions, this,
+                          &session_expiration_thread_));
+}
+
+void TabletCopyServiceImpl::BeginTabletCopySession(
+        const BeginTabletCopySessionRequestPB* req,
+        BeginTabletCopySessionResponsePB* resp,
+        rpc::RpcContext* context) {
+  const string& requestor_uuid = req->requestor_uuid();
+  const string& tablet_id = req->tablet_id();
+
+  // For now, we use the requestor_uuid with the tablet id as the session id,
+  // but there is no guarantee this will not change in the future.
+  const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id);
+
+  scoped_refptr<TabletPeer> tablet_peer;
+  RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
+                    TabletCopyErrorPB::TABLET_NOT_FOUND,
+                    Substitute("Unable to find specified tablet: $0", tablet_id));
+
+  scoped_refptr<TabletCopySession> session;
+  {
+    MutexLock l(sessions_lock_);
+    if (!FindCopy(sessions_, session_id, &session)) {
+      LOG(INFO) << Substitute(
+          "Beginning new tablet copy session on tablet $0 from peer $1"
+          " at $2: session id = $3",
+          tablet_id, requestor_uuid, context->requestor_string(), session_id);
+      session.reset(new TabletCopySession(tablet_peer, session_id,
+                                               requestor_uuid, fs_manager_));
+      RPC_RETURN_NOT_OK(session->Init(),
+                        TabletCopyErrorPB::UNKNOWN_ERROR,
+                        Substitute("Error initializing tablet copy session for tablet $0",
+                                   tablet_id));
+      InsertOrDie(&sessions_, session_id, session);
+    } else {
+      LOG(INFO) << Substitute(
+          "Re-sending initialization info for existing tablet copy session on tablet $0"
+          " from peer $1 at $2: session_id = $3",
+          tablet_id, requestor_uuid, context->requestor_string(), session_id);
+    }
+    ResetSessionExpirationUnlocked(session_id);
+  }
+
+  resp->set_responder_uuid(fs_manager_->uuid());
+  resp->set_session_id(session_id);
+  resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_ms);
+  resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
+  resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());
+
+  for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) {
+    resp->add_wal_segment_seqnos(segment->header().sequence_number());
+  }
+
+  context->RespondSuccess();
+}
+
+void TabletCopyServiceImpl::CheckSessionActive(
+        const CheckTabletCopySessionActiveRequestPB* req,
+        CheckTabletCopySessionActiveResponsePB* resp,
+        rpc::RpcContext* context) {
+  const string& session_id = req->session_id();
+
+  // Look up and validate tablet copy session.
+  scoped_refptr<TabletCopySession> session;
+  MutexLock l(sessions_lock_);
+  TabletCopyErrorPB::Code app_error;
+  Status status = FindSessionUnlocked(session_id, &app_error, &session);
+  if (status.ok()) {
+    if (req->keepalive()) {
+      ResetSessionExpirationUnlocked(session_id);
+    }
+    resp->set_session_is_active(true);
+    context->RespondSuccess();
+    return;
+  } else if (app_error == TabletCopyErrorPB::NO_SESSION) {
+    resp->set_session_is_active(false);
+    context->RespondSuccess();
+    return;
+  } else {
+    RPC_RETURN_NOT_OK(status, app_error,
+                      Substitute("Error trying to check whether session $0 is active", session_id));
+  }
+}
+
+void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
+                                           FetchDataResponsePB* resp,
+                                           rpc::RpcContext* context) {
+  const string& session_id = req->session_id();
+
+  // Look up and validate tablet copy session.
+  scoped_refptr<TabletCopySession> session;
+  {
+    MutexLock l(sessions_lock_);
+    TabletCopyErrorPB::Code app_error;
+    RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
+                      app_error, "No such session");
+    ResetSessionExpirationUnlocked(session_id);
+  }
+
+  MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data);
+
+  uint64_t offset = req->offset();
+  int64_t client_maxlen = req->max_length();
+
+  const DataIdPB& data_id = req->data_id();
+  TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
+  RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
+                    error_code, "Invalid DataId");
+
+  DataChunkPB* data_chunk = resp->mutable_chunk();
+  string* data = data_chunk->mutable_data();
+  int64_t total_data_length = 0;
+  if (data_id.type() == DataIdPB::BLOCK) {
+    // Fetching a data block chunk.
+    const BlockId& block_id = BlockId::FromPB(data_id.block_id());
+    RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
+                                             data, &total_data_length, &error_code),
+                      error_code, "Unable to get piece of data block");
+  } else {
+    // Fetching a log segment chunk.
+    uint64_t segment_seqno = data_id.wal_segment_seqno();
+    RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen,
+                                                  data, &total_data_length, &error_code),
+                      error_code, "Unable to get piece of log segment");
+  }
+
+  data_chunk->set_total_data_length(total_data_length);
+  data_chunk->set_offset(offset);
+
+  // Calculate checksum.
+  uint32_t crc32 = Crc32c(data->data(), data->length());
+  data_chunk->set_crc32(crc32);
+
+  context->RespondSuccess();
+}
+
+void TabletCopyServiceImpl::EndTabletCopySession(
+        const EndTabletCopySessionRequestPB* req,
+        EndTabletCopySessionResponsePB* resp,
+        rpc::RpcContext* context) {
+  {
+    MutexLock l(sessions_lock_);
+    TabletCopyErrorPB::Code app_error;
+    LOG(INFO) << "Request end of tablet copy session " << req->session_id()
+      << " received from " << context->requestor_string();
+    RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error),
+                      app_error, "No such session");
+  }
+  context->RespondSuccess();
+}
+
+void TabletCopyServiceImpl::Shutdown() {
+  shutdown_latch_.CountDown();
+  session_expiration_thread_->Join();
+
+  // Destroy all tablet copy sessions.
+  vector<string> session_ids;
+  for (const MonoTimeMap::value_type& entry : session_expirations_) {
+    session_ids.push_back(entry.first);
+  }
+  for (const string& session_id : session_ids) {
+    LOG(INFO) << "Destroying tablet copy session " << session_id << " due to service shutdown";
+    TabletCopyErrorPB::Code app_error;
+    CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
+  }
+}
+
+Status TabletCopyServiceImpl::FindSessionUnlocked(
+        const string& session_id,
+        TabletCopyErrorPB::Code* app_error,
+        scoped_refptr<TabletCopySession>* session) const {
+  if (!FindCopy(sessions_, session_id, session)) {
+    *app_error = TabletCopyErrorPB::NO_SESSION;
+    return Status::NotFound(
+        Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
+  }
+  return Status::OK();
+}
+
+Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
+        const DataIdPB& data_id,
+        TabletCopyErrorPB::Code* app_error,
+        const scoped_refptr<TabletCopySession>& session) const {
+  if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) {
+    *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
+    return Status::InvalidArgument(
+        Substitute("Only one of BlockId or segment sequence number are required, "
+            "but both were specified. DataTypeID: $0", data_id.ShortDebugString()));
+  } else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) {
+    *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
+    return Status::InvalidArgument(
+        Substitute("Only one of BlockId or segment sequence number are required, "
+            "but neither were specified. DataTypeID: $0", data_id.ShortDebugString()));
+  }
+
+  if (data_id.type() == DataIdPB::BLOCK) {
+    if (PREDICT_FALSE(!data_id.has_block_id())) {
+      return Status::InvalidArgument("block_id must be specified for type == BLOCK",
+                                     data_id.ShortDebugString());
+    }
+  } else {
+    if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
+      return Status::InvalidArgument(
+          "segment sequence number must be specified for type == LOG_SEGMENT",
+          data_id.ShortDebugString());
+    }
+  }
+
+  return Status::OK();
+}
+
+void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) {
+  MonoTime expiration(MonoTime::Now(MonoTime::FINE));
+  expiration.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms));
+  InsertOrUpdate(&session_expirations_, session_id, expiration);
+}
+
+Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
+        const std::string& session_id,
+        TabletCopyErrorPB::Code* app_error) {
+  scoped_refptr<TabletCopySession> session;
+  RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
+  // Remove the session from the map.
+  // It will get destroyed once there are no outstanding refs.
+  LOG(INFO) << "Ending tablet copy session " << session_id << " on tablet "
+    << session->tablet_id() << " with peer " << session->requestor_uuid();
+  CHECK_EQ(1, sessions_.erase(session_id));
+  CHECK_EQ(1, session_expirations_.erase(session_id));
+
+  return Status::OK();
+}
+
+void TabletCopyServiceImpl::EndExpiredSessions() {
+  do {
+    MutexLock l(sessions_lock_);
+    MonoTime now = MonoTime::Now(MonoTime::FINE);
+
+    vector<string> expired_session_ids;
+    for (const MonoTimeMap::value_type& entry : session_expirations_) {
+      const string& session_id = entry.first;
+      const MonoTime& expiration = entry.second;
+      if (expiration.ComesBefore(now)) {
+        expired_session_ids.push_back(session_id);
+      }
+    }
+    for (const string& session_id : expired_session_ids) {
+      LOG(INFO) << "Tablet Copy session " << session_id
+                << " has expired. Terminating session.";
+      TabletCopyErrorPB::Code app_error;
+      CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
+    }
+  } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
+                                    FLAGS_tablet_copy_timeout_poll_period_ms)));
+}
+
+} // namespace tserver
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.h b/src/kudu/tserver/tablet_copy_service.h
new file mode 100644
index 0000000..180dc1b
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_service.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TSERVER_TABLET_COPY_SERVICE_H_
+#define KUDU_TSERVER_TABLET_COPY_SERVICE_H_
+
+#include <string>
+#include <unordered_map>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/tserver/tablet_copy.service.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+class FsManager;
+
+namespace log {
+class ReadableLogSegment;
+} // namespace log
+
+namespace tserver {
+
+class TabletCopySession;
+class TabletPeerLookupIf;
+
+class TabletCopyServiceImpl : public TabletCopyServiceIf {
+ public:
+  TabletCopyServiceImpl(FsManager* fs_manager,
+                             TabletPeerLookupIf* tablet_peer_lookup,
+                             const scoped_refptr<MetricEntity>& metric_entity,
+                             const scoped_refptr<rpc::ResultTracker>& result_tracker);
+
+  virtual void BeginTabletCopySession(const BeginTabletCopySessionRequestPB* req,
+                                           BeginTabletCopySessionResponsePB* resp,
+                                           rpc::RpcContext* context) OVERRIDE;
+
+  virtual void CheckSessionActive(const CheckTabletCopySessionActiveRequestPB* req,
+                                  CheckTabletCopySessionActiveResponsePB* resp,
+                                  rpc::RpcContext* context) OVERRIDE;
+
+  virtual void FetchData(const FetchDataRequestPB* req,
+                         FetchDataResponsePB* resp,
+                         rpc::RpcContext* context) OVERRIDE;
+
+  virtual void EndTabletCopySession(const EndTabletCopySessionRequestPB* req,
+                                         EndTabletCopySessionResponsePB* resp,
+                                         rpc::RpcContext* context) OVERRIDE;
+
+  virtual void Shutdown() OVERRIDE;
+
+ private:
+  typedef std::unordered_map<std::string, scoped_refptr<TabletCopySession> > SessionMap;
+  typedef std::unordered_map<std::string, MonoTime> MonoTimeMap;
+
+  // Look up session in session map.
+  Status FindSessionUnlocked(const std::string& session_id,
+                             TabletCopyErrorPB::Code* app_error,
+                             scoped_refptr<TabletCopySession>* session) const;
+
+  // Validate the data identifier in a FetchData request.
+  Status ValidateFetchRequestDataId(const DataIdPB& data_id,
+                                    TabletCopyErrorPB::Code* app_error,
+                                    const scoped_refptr<TabletCopySession>& session) const;
+
+  // Take note of session activity; Re-update the session timeout deadline.
+  void ResetSessionExpirationUnlocked(const std::string& session_id);
+
+  // Destroy the specified tablet copy session.
+  Status DoEndTabletCopySessionUnlocked(const std::string& session_id,
+                                             TabletCopyErrorPB::Code* app_error);
+
+  // The timeout thread periodically checks whether sessions are expired and
+  // removes them from the map.
+  void EndExpiredSessions();
+
+  FsManager* fs_manager_;
+  TabletPeerLookupIf* tablet_peer_lookup_;
+
+  // Protects sessions_ and session_expirations_ maps.
+  mutable Mutex sessions_lock_;
+  SessionMap sessions_;
+  MonoTimeMap session_expirations_;
+
+  // Session expiration thread.
+  // TODO: this is a hack, replace with some kind of timer impl. See KUDU-286.
+  CountDownLatch shutdown_latch_;
+  scoped_refptr<Thread> session_expiration_thread_;
+};
+
+} // namespace tserver
+} // namespace kudu
+
+#endif // KUDU_TSERVER_TABLET_COPY_SERVICE_H_

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_session-test.cc b/src/kudu/tserver/tablet_copy_session-test.cc
new file mode 100644
index 0000000..aa104b5
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_session-test.cc
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/tablet/tablet-test-util.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/schema.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/fs/block_id.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/tserver/tablet_copy_session.h"
+#include "kudu/tablet/tablet_peer.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
+
+METRIC_DECLARE_entity(tablet);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+
+namespace kudu {
+namespace tserver {
+
+using consensus::ConsensusMetadata;
+using consensus::OpId;
+using consensus::RaftConfigPB;
+using consensus::RaftPeerPB;
+using fs::ReadableBlock;
+using log::Log;
+using log::LogOptions;
+using log::LogAnchorRegistry;
+using rpc::Messenger;
+using rpc::MessengerBuilder;
+using strings::Substitute;
+using tablet::ColumnDataPB;
+using tablet::DeltaDataPB;
+using tablet::KuduTabletTest;
+using tablet::RowSetDataPB;
+using tablet::TabletPeer;
+using tablet::TabletSuperBlockPB;
+using tablet::WriteTransactionState;
+
+class TabletCopyTest : public KuduTabletTest {
+ public:
+  TabletCopyTest()
+    : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
+                              ColumnSchema("val", INT32) }, 1)) {
+    CHECK_OK(ThreadPoolBuilder("test-exec").Build(&apply_pool_));
+  }
+
+  virtual void SetUp() OVERRIDE {
+    NO_FATALS(KuduTabletTest::SetUp());
+    NO_FATALS(SetUpTabletPeer());
+    NO_FATALS(PopulateTablet());
+    NO_FATALS(InitSession());
+  }
+
+  virtual void TearDown() OVERRIDE {
+    session_.reset();
+    tablet_peer_->Shutdown();
+    KuduTabletTest::TearDown();
+  }
+
+ protected:
+  void SetUpTabletPeer() {
+    scoped_refptr<Log> log;
+    ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
+                        *tablet()->schema(),
+                        0, // schema_version
+                        NULL, &log));
+
+    scoped_refptr<MetricEntity> metric_entity =
+      METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME());
+
+    RaftPeerPB config_peer;
+    config_peer.set_permanent_uuid(fs_manager()->uuid());
+    config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
+    config_peer.mutable_last_known_addr()->set_port(0);
+    config_peer.set_member_type(RaftPeerPB::VOTER);
+
+    tablet_peer_.reset(
+        new TabletPeer(tablet()->metadata(),
+                       config_peer,
+                       apply_pool_.get(),
+                       Bind(&TabletCopyTest::TabletPeerStateChangedCallback,
+                            Unretained(this),
+                            tablet()->tablet_id())));
+
+    // TODO similar to code in tablet_peer-test, consider refactor.
+    RaftConfigPB config;
+    config.add_peers()->CopyFrom(config_peer);
+    config.set_opid_index(consensus::kInvalidOpIdIndex);
+
+    gscoped_ptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
+                                        tablet()->tablet_id(), fs_manager()->uuid(),
+                                        config, consensus::kMinimumTerm, &cmeta));
+
+    shared_ptr<Messenger> messenger;
+    MessengerBuilder mbuilder(CURRENT_TEST_NAME());
+    mbuilder.Build(&messenger);
+
+    log_anchor_registry_.reset(new LogAnchorRegistry());
+    tablet_peer_->SetBootstrapping();
+    ASSERT_OK(tablet_peer_->Init(tablet(),
+                                 clock(),
+                                 messenger,
+                                 scoped_refptr<rpc::ResultTracker>(),
+                                 log,
+                                 metric_entity));
+    consensus::ConsensusBootstrapInfo boot_info;
+    ASSERT_OK(tablet_peer_->Start(boot_info));
+    ASSERT_OK(tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
+    ASSERT_OK(tablet_peer_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  }
+
+  void TabletPeerStateChangedCallback(const string& tablet_id, const string& reason) {
+    LOG(INFO) << "Tablet peer state changed for tablet " << tablet_id << ". Reason: " << reason;
+  }
+
+  void PopulateTablet() {
+    for (int32_t i = 0; i < 1000; i++) {
+      WriteRequestPB req;
+      req.set_tablet_id(tablet_peer_->tablet_id());
+      ASSERT_OK(SchemaToPB(client_schema_, req.mutable_schema()));
+      RowOperationsPB* data = req.mutable_row_operations();
+      RowOperationsPBEncoder enc(data);
+      KuduPartialRow row(&client_schema_);
+
+      string key = Substitute("key$0", i);
+      ASSERT_OK(row.SetString(0, key));
+      ASSERT_OK(row.SetInt32(1, i));
+      enc.Add(RowOperationsPB::INSERT, row);
+
+      WriteResponsePB resp;
+      CountDownLatch latch(1);
+
+      unique_ptr<tablet::WriteTransactionState> state(
+          new tablet::WriteTransactionState(tablet_peer_.get(),
+                                            &req,
+                                            nullptr, // No RequestIdPB
+                                            &resp));
+      state->set_completion_callback(gscoped_ptr<tablet::TransactionCompletionCallback>(
+          new tablet::LatchTransactionCompletionCallback<WriteResponsePB>(&latch, &resp)));
+      ASSERT_OK(tablet_peer_->SubmitWrite(std::move(state)));
+      latch.Wait();
+      ASSERT_FALSE(resp.has_error()) << "Request failed: " << resp.error().ShortDebugString();
+      ASSERT_EQ(0, resp.per_row_errors_size()) << "Insert error: " << resp.ShortDebugString();
+    }
+    ASSERT_OK(tablet()->Flush());
+  }
+
+  void InitSession() {
+    session_.reset(new TabletCopySession(tablet_peer_.get(), "TestSession", "FakeUUID",
+                   fs_manager()));
+    ASSERT_OK(session_->Init());
+  }
+
+  // Read the specified BlockId, via the TabletCopySession, into a file.
+  // 'path' will be populated with the name of the file used.
+  // 'file' will be set to point to the SequentialFile containing the data.
+  void FetchBlockToFile(const BlockId& block_id,
+                        string* path,
+                        gscoped_ptr<SequentialFile>* file) {
+    string data;
+    int64_t block_file_size = 0;
+    TabletCopyErrorPB::Code error_code;
+    CHECK_OK(session_->GetBlockPiece(block_id, 0, 0, &data, &block_file_size, &error_code));
+    if (block_file_size > 0) {
+      CHECK_GT(data.size(), 0);
+    }
+
+    // Write the file to a temporary location.
+    WritableFileOptions opts;
+    string path_template = GetTestPath(Substitute("test_block_$0.tmp.XXXXXX", block_id.ToString()));
+    gscoped_ptr<WritableFile> writable_file;
+    CHECK_OK(Env::Default()->NewTempWritableFile(opts, path_template, path, &writable_file));
+    CHECK_OK(writable_file->Append(Slice(data.data(), data.size())));
+    CHECK_OK(writable_file->Close());
+
+    CHECK_OK(Env::Default()->NewSequentialFile(*path, file));
+  }
+
+  MetricRegistry metric_registry_;
+  scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
+  gscoped_ptr<ThreadPool> apply_pool_;
+  scoped_refptr<TabletPeer> tablet_peer_;
+  scoped_refptr<TabletCopySession> session_;
+};
+
+// Ensure that the serialized SuperBlock included in the TabletCopySession is
+// equal to the serialized live superblock (on a quiesced tablet).
+TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
+  // Compare content of superblocks.
+  faststring session_buf;
+  faststring tablet_buf;
+
+  {
+    const TabletSuperBlockPB& session_superblock = session_->tablet_superblock();
+    int size = session_superblock.ByteSize();
+    session_buf.resize(size);
+    uint8_t* session_dst = session_buf.data();
+    session_dst = session_superblock.SerializeWithCachedSizesToArray(session_dst);
+  }
+
+  {
+    TabletSuperBlockPB tablet_superblock;
+    ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
+    int size = tablet_superblock.ByteSize();
+    tablet_buf.resize(size);
+    uint8_t* tablet_dst = tablet_buf.data();
+    tablet_dst = tablet_superblock.SerializeWithCachedSizesToArray(tablet_dst);
+  }
+
+  ASSERT_EQ(session_buf.size(), tablet_buf.size());
+  int size = tablet_buf.size();
+  ASSERT_EQ(0, strings::fastmemcmp_inlined(session_buf.data(), tablet_buf.data(), size));
+}
+
+// Test fetching all files from tablet server, ensure the checksums for each
+// chunk and the total file sizes match.
+TEST_F(TabletCopyTest, TestBlocksEqual) {
+  TabletSuperBlockPB tablet_superblock;
+  ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
+  for (int i = 0; i < tablet_superblock.rowsets_size(); i++) {
+    const RowSetDataPB& rowset = tablet_superblock.rowsets(i);
+    for (int j = 0; j < rowset.columns_size(); j++) {
+      const ColumnDataPB& column = rowset.columns(j);
+      const BlockIdPB& block_id_pb = column.block();
+      BlockId block_id = BlockId::FromPB(block_id_pb);
+
+      string path;
+      gscoped_ptr<SequentialFile> file;
+      FetchBlockToFile(block_id, &path, &file);
+      uint64_t session_block_size = 0;
+      ASSERT_OK(Env::Default()->GetFileSize(path, &session_block_size));
+      faststring buf;
+      buf.resize(session_block_size);
+      Slice data;
+      ASSERT_OK(file->Read(session_block_size, &data, buf.data()));
+      uint32_t session_crc = crc::Crc32c(data.data(), data.size());
+      LOG(INFO) << "session block file has size of " << session_block_size
+                << " and CRC32C of " << session_crc << ": " << path;
+
+      gscoped_ptr<ReadableBlock> tablet_block;
+      ASSERT_OK(fs_manager()->OpenBlock(block_id, &tablet_block));
+      uint64_t tablet_block_size = 0;
+      ASSERT_OK(tablet_block->Size(&tablet_block_size));
+      buf.resize(tablet_block_size);
+      ASSERT_OK(tablet_block->Read(0, tablet_block_size, &data, buf.data()));
+      uint32_t tablet_crc = crc::Crc32c(data.data(), data.size());
+      LOG(INFO) << "tablet block file has size of " << tablet_block_size
+                << " and CRC32C of " << tablet_crc
+                << ": " << block_id;
+
+      // Compare the blocks.
+      ASSERT_EQ(tablet_block_size, session_block_size);
+      ASSERT_EQ(tablet_crc, session_crc);
+    }
+  }
+}
+
+// Ensure that blocks are still readable through the open session even
+// after they've been deleted.
+TEST_F(TabletCopyTest, TestBlocksAreFetchableAfterBeingDeleted) {
+  TabletSuperBlockPB tablet_superblock;
+  ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
+
+  // Gather all the blocks.
+  vector<BlockId> data_blocks;
+  for (const RowSetDataPB& rowset : tablet_superblock.rowsets()) {
+    for (const DeltaDataPB& redo : rowset.redo_deltas()) {
+      data_blocks.push_back(BlockId::FromPB(redo.block()));
+    }
+    for (const DeltaDataPB& undo : rowset.undo_deltas()) {
+      data_blocks.push_back(BlockId::FromPB(undo.block()));
+    }
+    for (const ColumnDataPB& column : rowset.columns()) {
+      data_blocks.push_back(BlockId::FromPB(column.block()));
+    }
+    if (rowset.has_bloom_block()) {
+      data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
+    }
+    if (rowset.has_adhoc_index_block()) {
+      data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
+    }
+  }
+
+  // Delete them.
+  for (const BlockId& block_id : data_blocks) {
+    ASSERT_OK(fs_manager()->DeleteBlock(block_id));
+  }
+
+  // Read them back.
+  for (const BlockId& block_id : data_blocks) {
+    ASSERT_TRUE(session_->IsBlockOpenForTests(block_id));
+    string data;
+    TabletCopyErrorPB::Code error_code;
+    int64_t piece_size;
+    ASSERT_OK(session_->GetBlockPiece(block_id, 0, 0,
+                                      &data, &piece_size, &error_code));
+  }
+}
+
+}  // namespace tserver
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_session.cc b/src/kudu/tserver/tablet_copy_session.cc
new file mode 100644
index 0000000..0b601c4
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_session.cc
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/tserver/tablet_copy_session.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/type_traits.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/tablet/tablet_peer.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/trace.h"
+
+DEFINE_int32(tablet_copy_transfer_chunk_size_bytes, 4 * 1024 * 1024,
+             "Size of chunks to transfer when copying tablets between "
+             "tablet servers.");
+TAG_FLAG(tablet_copy_transfer_chunk_size_bytes, hidden);
+
+namespace kudu {
+namespace tserver {
+
+using consensus::MinimumOpId;
+using consensus::OpId;
+using fs::ReadableBlock;
+using log::LogAnchorRegistry;
+using log::ReadableLogSegment;
+using std::shared_ptr;
+using strings::Substitute;
+using tablet::ColumnDataPB;
+using tablet::DeltaDataPB;
+using tablet::RowSetDataPB;
+using tablet::TabletMetadata;
+using tablet::TabletPeer;
+using tablet::TabletSuperBlockPB;
+
+TabletCopySession::TabletCopySession(
+    const scoped_refptr<TabletPeer>& tablet_peer, std::string session_id,
+    std::string requestor_uuid, FsManager* fs_manager)
+    : tablet_peer_(tablet_peer),
+      session_id_(std::move(session_id)),
+      requestor_uuid_(std::move(requestor_uuid)),
+      fs_manager_(fs_manager),
+      blocks_deleter_(&blocks_),
+      logs_deleter_(&logs_) {}
+
+TabletCopySession::~TabletCopySession() {
+  // No lock taken in the destructor, should only be 1 thread with access now.
+  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
+}
+
+Status TabletCopySession::Init() {
+  MutexLock l(session_lock_);
+  CHECK(!initted_);
+
+  const string& tablet_id = tablet_peer_->tablet_id();
+
+  // Prevent log GC while we grab log segments and Tablet metadata.
+  string anchor_owner_token = Substitute("TabletCopy-$0", session_id_);
+  tablet_peer_->log_anchor_registry()->Register(
+      MinimumOpId().index(), anchor_owner_token, &log_anchor_);
+
+  // Read the SuperBlock from disk.
+  const scoped_refptr<TabletMetadata>& metadata = tablet_peer_->tablet_metadata();
+  RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_),
+                        Substitute("Unable to access superblock for tablet $0",
+                                   tablet_id));
+
+  // Anchor the data blocks by opening them and adding them to the cache.
+  //
+  // All subsequent requests should reuse the opened blocks.
+  vector<BlockIdPB> data_blocks;
+  TabletMetadata::CollectBlockIdPBs(tablet_superblock_, &data_blocks);
+  for (const BlockIdPB& block_id : data_blocks) {
+    VLOG(1) << "Opening block " << block_id.DebugString();
+    RETURN_NOT_OK(OpenBlockUnlocked(BlockId::FromPB(block_id)));
+  }
+
+  // Get the latest opid in the log at this point in time so we can re-anchor.
+  OpId last_logged_opid;
+  tablet_peer_->log()->GetLatestEntryOpId(&last_logged_opid);
+
+  // Get the current segments from the log, including the active segment.
+  // The Log doesn't add the active segment to the log reader's list until
+  // a header has been written to it (but it will not have a footer).
+  shared_ptr<log::LogReader> reader = tablet_peer_->log()->reader();
+  if (!reader) {
+    tablet::TabletStatePB tablet_state = tablet_peer_->state();
+    return Status::IllegalState(Substitute(
+        "Unable to initialize tablet copy session for tablet $0. "
+        "Log reader is not available. Tablet state: $1 ($2)",
+        tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
+  }
+  reader->GetSegmentsSnapshot(&log_segments_);
+  for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
+    RETURN_NOT_OK(OpenLogSegmentUnlocked(segment->header().sequence_number()));
+  }
+
+  // Look up the committed consensus state.
+  // We do this after snapshotting the log to avoid a scenario where the latest
+  // entry in the log has a term higher than the term stored in the consensus
+  // metadata, which will results in a CHECK failure on RaftConsensus init.
+  scoped_refptr<consensus::Consensus> consensus = tablet_peer_->shared_consensus();
+  if (!consensus) {
+    tablet::TabletStatePB tablet_state = tablet_peer_->state();
+    return Status::IllegalState(Substitute(
+        "Unable to initialize tablet copy session for tablet $0. "
+        "Consensus is not available. Tablet state: $1 ($2)",
+        tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
+  }
+  initial_committed_cstate_ = consensus->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED);
+
+  // Re-anchor on the highest OpId that was in the log right before we
+  // snapshotted the log segments. This helps ensure that we don't end up in a
+  // tablet copy loop due to a follower falling too far behind the
+  // leader's log when tablet copy is slow. The remote controls when
+  // this anchor is released by ending the tablet copy session.
+  RETURN_NOT_OK(tablet_peer_->log_anchor_registry()->UpdateRegistration(
+      last_logged_opid.index(), anchor_owner_token, &log_anchor_));
+
+  LOG(INFO) << Substitute(
+      "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",
+      tablet_id, consensus->peer_uuid(), data_blocks.size(), log_segments_.size());
+  initted_ = true;
+  return Status::OK();
+}
+
+const std::string& TabletCopySession::tablet_id() const {
+  DCHECK(initted_);
+  return tablet_peer_->tablet_id();
+}
+
+const std::string& TabletCopySession::requestor_uuid() const {
+  DCHECK(initted_);
+  return requestor_uuid_;
+}
+
+// Determine the length of the data chunk to return to the client.
+static int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) {
+  // Overhead in the RPC for things like headers, protobuf data, etc.
+  static const int kOverhead = 4096;
+
+  if (requested_len <= 0) {
+    requested_len = FLAGS_tablet_copy_transfer_chunk_size_bytes;
+  } else {
+    requested_len = std::min<int64_t>(requested_len,
+                                      FLAGS_tablet_copy_transfer_chunk_size_bytes);
+  }
+  requested_len = std::min<int64_t>(requested_len, FLAGS_rpc_max_message_size - kOverhead);
+  CHECK_GT(requested_len, 0) << "rpc_max_message_size is too low to transfer data: "
+                                     << FLAGS_rpc_max_message_size;
+  return std::min(bytes_remaining, requested_len);
+}
+
+// Calculate the size of the data to return given a maximum client message
+// length, the file itself, and the offset into the file to be read from.
+static Status GetResponseDataSize(int64_t total_size,
+                                  uint64_t offset, int64_t client_maxlen,
+                                  TabletCopyErrorPB::Code* error_code, int64_t* data_size) {
+  // If requested offset is off the end of the data, bail.
+  if (offset >= total_size) {
+    *error_code = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
+    return Status::InvalidArgument(
+        Substitute("Requested offset ($0) is beyond the data size ($1)",
+                   offset, total_size));
+  }
+
+  int64_t bytes_remaining = total_size - offset;
+
+  *data_size = DetermineReadLength(bytes_remaining, client_maxlen);
+  DCHECK_GT(*data_size, 0);
+  if (client_maxlen > 0) {
+    DCHECK_LE(*data_size, client_maxlen);
+  }
+
+  return Status::OK();
+}
+
+// Read a chunk of a file into a buffer.
+// data_name provides a string for the block/log to be used in error messages.
+template <class Info>
+static Status ReadFileChunkToBuf(const Info* info,
+                                 uint64_t offset, int64_t client_maxlen,
+                                 const string& data_name,
+                                 string* data, int64_t* file_size,
+                                 TabletCopyErrorPB::Code* error_code) {
+  int64_t response_data_size = 0;
+  RETURN_NOT_OK_PREPEND(GetResponseDataSize(info->size, offset, client_maxlen, error_code,
+                                            &response_data_size),
+                        Substitute("Error reading $0", data_name));
+
+  Stopwatch chunk_timer(Stopwatch::THIS_THREAD);
+  chunk_timer.start();
+
+  // Writing into a std::string buffer is basically guaranteed to work on C++11,
+  // however any modern compiler should be compatible with it.
+  // Violates the API contract, but avoids excessive copies.
+  data->resize(response_data_size);
+  uint8_t* buf = reinterpret_cast<uint8_t*>(const_cast<char*>(data->data()));
+  Slice slice;
+  Status s = info->ReadFully(offset, response_data_size, &slice, buf);
+  if (PREDICT_FALSE(!s.ok())) {
+    s = s.CloneAndPrepend(
+        Substitute("Unable to read existing file for $0", data_name));
+    LOG(WARNING) << s.ToString();
+    *error_code = TabletCopyErrorPB::IO_ERROR;
+    return s;
+  }
+  // Figure out if Slice points to buf or if Slice points to the mmap.
+  // If it points to the mmap then copy into buf.
+  if (slice.data() != buf) {
+    memcpy(buf, slice.data(), slice.size());
+  }
+  chunk_timer.stop();
+  TRACE("Tablet Copy: $0: $1 total bytes read. Total time elapsed: $2",
+        data_name, response_data_size, chunk_timer.elapsed().ToString());
+
+  *file_size = info->size;
+  return Status::OK();
+}
+
+Status TabletCopySession::GetBlockPiece(const BlockId& block_id,
+                                             uint64_t offset, int64_t client_maxlen,
+                                             string* data, int64_t* block_file_size,
+                                             TabletCopyErrorPB::Code* error_code) {
+  ImmutableReadableBlockInfo* block_info;
+  RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code));
+
+  RETURN_NOT_OK(ReadFileChunkToBuf(block_info, offset, client_maxlen,
+                                   Substitute("block $0", block_id.ToString()),
+                                   data, block_file_size, error_code));
+
+  // Note: We do not eagerly close the block, as doing so may delete the
+  // underlying data if this was its last reader and it had been previously
+  // marked for deletion. This would be a problem for parallel readers in
+  // the same session; they would not be able to find the block.
+
+  return Status::OK();
+}
+
+Status TabletCopySession::GetLogSegmentPiece(uint64_t segment_seqno,
+                                                  uint64_t offset, int64_t client_maxlen,
+                                                  std::string* data, int64_t* block_file_size,
+                                                  TabletCopyErrorPB::Code* error_code) {
+  ImmutableRandomAccessFileInfo* file_info;
+  RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
+  RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset, client_maxlen,
+                                   Substitute("log segment $0", segment_seqno),
+                                   data, block_file_size, error_code));
+
+  // Note: We do not eagerly close log segment files, since we share ownership
+  // of the LogSegment objects with the Log itself.
+
+  return Status::OK();
+}
+
+bool TabletCopySession::IsBlockOpenForTests(const BlockId& block_id) const {
+  MutexLock l(session_lock_);
+  return ContainsKey(blocks_, block_id);
+}
+
+// Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo
+// object with the file ref and size.
+template <class Collection, class Key, class Readable>
+static Status AddImmutableFileToMap(Collection* const cache,
+                                    const Key& key,
+                                    const Readable& readable,
+                                    uint64_t size) {
+  // Sanity check for 0-length files.
+  if (size == 0) {
+    return Status::Corruption("Found 0-length object");
+  }
+
+  // Looks good, add it to the cache.
+  typedef typename Collection::mapped_type InfoPtr;
+  typedef typename base::remove_pointer<InfoPtr>::type Info;
+  InsertOrDie(cache, key, new Info(readable, size));
+
+  return Status::OK();
+}
+
+Status TabletCopySession::OpenBlockUnlocked(const BlockId& block_id) {
+  session_lock_.AssertAcquired();
+
+  gscoped_ptr<ReadableBlock> block;
+  Status s = fs_manager_->OpenBlock(block_id, &block);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(WARNING) << "Unable to open requested (existing) block file: "
+                 << block_id.ToString() << ": " << s.ToString();
+    return s.CloneAndPrepend(Substitute("Unable to open block file for block $0",
+                                        block_id.ToString()));
+  }
+
+  uint64_t size;
+  s = block->Size(&size);
+  if (PREDICT_FALSE(!s.ok())) {
+    return s.CloneAndPrepend("Unable to get size of block");
+  }
+
+  s = AddImmutableFileToMap(&blocks_, block_id, block.get(), size);
+  if (!s.ok()) {
+    s = s.CloneAndPrepend(Substitute("Error accessing data for block $0", block_id.ToString()));
+    LOG(DFATAL) << "Data block disappeared: " << s.ToString();
+  } else {
+    ignore_result(block.release());
+  }
+  return s;
+}
+
+Status TabletCopySession::FindBlock(const BlockId& block_id,
+                                         ImmutableReadableBlockInfo** block_info,
+                                         TabletCopyErrorPB::Code* error_code) {
+  Status s;
+  MutexLock l(session_lock_);
+  if (!FindCopy(blocks_, block_id, block_info)) {
+    *error_code = TabletCopyErrorPB::BLOCK_NOT_FOUND;
+    s = Status::NotFound("Block not found", block_id.ToString());
+  }
+  return s;
+}
+
+Status TabletCopySession::OpenLogSegmentUnlocked(uint64_t segment_seqno) {
+  session_lock_.AssertAcquired();
+
+  scoped_refptr<log::ReadableLogSegment> log_segment;
+  int position = -1;
+  if (!log_segments_.empty()) {
+    position = segment_seqno - log_segments_[0]->header().sequence_number();
+  }
+  if (position < 0 || position >= log_segments_.size()) {
+    return Status::NotFound(Substitute("Segment with sequence number $0 not found",
+                                       segment_seqno));
+  }
+  log_segment = log_segments_[position];
+  CHECK_EQ(log_segment->header().sequence_number(), segment_seqno);
+
+  uint64_t size = log_segment->readable_up_to();
+  Status s = AddImmutableFileToMap(&logs_, segment_seqno, log_segment->readable_file(), size);
+  if (!s.ok()) {
+    s = s.CloneAndPrepend(
+            Substitute("Error accessing data for log segment with seqno $0",
+                       segment_seqno));
+    LOG(INFO) << s.ToString();
+  }
+  return s;
+}
+
+Status TabletCopySession::FindLogSegment(uint64_t segment_seqno,
+                                              ImmutableRandomAccessFileInfo** file_info,
+                                              TabletCopyErrorPB::Code* error_code) {
+  MutexLock l(session_lock_);
+  if (!FindCopy(logs_, segment_seqno, file_info)) {
+    *error_code = TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND;
+    return Status::NotFound(Substitute("Segment with sequence number $0 not found",
+                                       segment_seqno));
+  }
+  return Status::OK();
+}
+
+Status TabletCopySession::UnregisterAnchorIfNeededUnlocked() {
+  return tablet_peer_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_);
+}
+
+} // namespace tserver
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_copy_session.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_session.h b/src/kudu/tserver/tablet_copy_session.h
new file mode 100644
index 0000000..d05f70a
--- /dev/null
+++ b/src/kudu/tserver/tablet_copy_session.h
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TSERVER_TABLET_COPY_SESSION_H_
+#define KUDU_TSERVER_TABLET_COPY_SESSION_H_
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/fs/block_id.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/tserver/tablet_copy.pb.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class FsManager;
+
+namespace tablet {
+class TabletPeer;
+} // namespace tablet
+
+namespace tserver {
+
+class TabletPeerLookupIf;
+
+// Caches file size and holds a shared_ptr reference to a RandomAccessFile.
+// Assumes that the file underlying the RandomAccessFile is immutable.
+struct ImmutableRandomAccessFileInfo {
+  std::shared_ptr<RandomAccessFile> readable;
+  int64_t size;
+
+  ImmutableRandomAccessFileInfo(std::shared_ptr<RandomAccessFile> readable,
+                                int64_t size)
+      : readable(std::move(readable)), size(size) {}
+
+  Status ReadFully(uint64_t offset, int64_t size, Slice* data, uint8_t* scratch) const {
+    return env_util::ReadFully(readable.get(), offset, size, data, scratch);
+  }
+};
+
+// Caches block size and holds an exclusive reference to a ReadableBlock.
+// Assumes that the block underlying the ReadableBlock is immutable.
+struct ImmutableReadableBlockInfo {
+  gscoped_ptr<fs::ReadableBlock> readable;
+  int64_t size;
+
+  ImmutableReadableBlockInfo(fs::ReadableBlock* readable,
+                             int64_t size)
+  : readable(readable),
+    size(size) {
+  }
+
+  Status ReadFully(uint64_t offset, int64_t size, Slice* data, uint8_t* scratch) const {
+    return readable->Read(offset, size, data, scratch);
+  }
+};
+
+// A potential Learner must establish a TabletCopySession with the leader in order
+// to fetch the needed superblock, blocks, and log segments.
+// This class is refcounted to make it easy to remove it from the session map
+// on expiration while it is in use by another thread.
+class TabletCopySession : public RefCountedThreadSafe<TabletCopySession> {
+ public:
+  TabletCopySession(const scoped_refptr<tablet::TabletPeer>& tablet_peer,
+                         std::string session_id, std::string requestor_uuid,
+                         FsManager* fs_manager);
+
+  // Initialize the session, including anchoring files (TODO) and fetching the
+  // tablet superblock and list of WAL segments.
+  //
+  // Must not be called more than once.
+  Status Init();
+
+  // Return ID of tablet corresponding to this session.
+  const std::string& tablet_id() const;
+
+  // Return UUID of the requestor that initiated this session.
+  const std::string& requestor_uuid() const;
+
+  // Open block for reading, if it's not already open, and read some of it.
+  // If maxlen is 0, we use a system-selected length for the data piece.
+  // *data is set to a std::string containing the data. Ownership of this object
+  // is passed to the caller. A string is used because the RPC interface is
+  // sending data serialized as protobuf and we want to minimize copying.
+  // On error, Status is set to a non-OK value and error_code is filled in.
+  //
+  // This method is thread-safe.
+  Status GetBlockPiece(const BlockId& block_id,
+                       uint64_t offset, int64_t client_maxlen,
+                       std::string* data, int64_t* block_file_size,
+                       TabletCopyErrorPB::Code* error_code);
+
+  // Get a piece of a log segment.
+  // The behavior and params are very similar to GetBlockPiece(), but this one
+  // is only for sending WAL segment files.
+  Status GetLogSegmentPiece(uint64_t segment_seqno,
+                            uint64_t offset, int64_t client_maxlen,
+                            std::string* data, int64_t* log_file_size,
+                            TabletCopyErrorPB::Code* error_code);
+
+  const tablet::TabletSuperBlockPB& tablet_superblock() const {
+    DCHECK(initted_);
+    return tablet_superblock_;
+  }
+
+  const consensus::ConsensusStatePB& initial_committed_cstate() const {
+    DCHECK(initted_);
+    return initial_committed_cstate_;
+  }
+
+  const log::SegmentSequence& log_segments() const {
+    DCHECK(initted_);
+    return log_segments_;
+  }
+
+  // Check if a block is currently open.
+  bool IsBlockOpenForTests(const BlockId& block_id) const;
+
+ private:
+  friend class RefCountedThreadSafe<TabletCopySession>;
+
+  typedef std::unordered_map<BlockId, ImmutableReadableBlockInfo*, BlockIdHash> BlockMap;
+  typedef std::unordered_map<uint64_t, ImmutableRandomAccessFileInfo*> LogMap;
+
+  ~TabletCopySession();
+
+  // Open the block and add it to the block map.
+  Status OpenBlockUnlocked(const BlockId& block_id);
+
+  // Look up cached block information.
+  Status FindBlock(const BlockId& block_id,
+                   ImmutableReadableBlockInfo** block_info,
+                   TabletCopyErrorPB::Code* error_code);
+
+  // Snapshot the log segment's length and put it into segment map.
+  Status OpenLogSegmentUnlocked(uint64_t segment_seqno);
+
+  // Look up log segment in cache or log segment map.
+  Status FindLogSegment(uint64_t segment_seqno,
+                        ImmutableRandomAccessFileInfo** file_info,
+                        TabletCopyErrorPB::Code* error_code);
+
+  // Unregister log anchor, if it's registered.
+  Status UnregisterAnchorIfNeededUnlocked();
+
+  scoped_refptr<tablet::TabletPeer> tablet_peer_;
+  const std::string session_id_;
+  const std::string requestor_uuid_;
+  FsManager* const fs_manager_;
+
+  mutable Mutex session_lock_;
+  bool initted_ = false;
+  BlockMap blocks_; // Protected by session_lock_.
+  LogMap logs_;     // Protected by session_lock_.
+  ValueDeleter blocks_deleter_;
+  ValueDeleter logs_deleter_;
+
+  tablet::TabletSuperBlockPB tablet_superblock_;
+
+  consensus::ConsensusStatePB initial_committed_cstate_;
+
+  // The sequence of log segments that will be sent in the course of this
+  // session.
+  log::SegmentSequence log_segments_;
+
+  log::LogAnchor log_anchor_;
+
+  DISALLOW_COPY_AND_ASSIGN(TabletCopySession);
+};
+
+} // namespace tserver
+} // namespace kudu
+
+#endif // KUDU_TSERVER_TABLET_COPY_SESSION_H_

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index b60caba..736c5ca 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -43,7 +43,7 @@
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/mini_tablet_server.h"
-#include "kudu/tserver/remote_bootstrap.proxy.h"
+#include "kudu/tserver/tablet_copy.proxy.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_server_test_util.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 14f2049..0ae70b9 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -32,7 +32,7 @@
 #include "kudu/tserver/tablet_service.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver-path-handlers.h"
-#include "kudu/tserver/remote_bootstrap_service.h"
+#include "kudu/tserver/tablet_copy_service.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 8ae9424..586e82b 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -42,7 +42,7 @@
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tablet/transactions/alter_schema_transaction.h"
 #include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/tserver/remote_bootstrap_service.h"
+#include "kudu/tserver/tablet_copy_service.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 14558c5..a1fff30 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -45,7 +45,7 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/heartbeater.h"
-#include "kudu/tserver/remote_bootstrap_client.h"
+#include "kudu/tserver/tablet_copy_client.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_service.h"
 #include "kudu/util/debug/trace_event.h"


Mime
View raw message