hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [1/2] hadoop git commit: HDFS-9144. Refactoring libhdfs++ into stateful/ephemeral objects. Contributed by Bob Hansen.
Date Thu, 03 Dec 2015 12:31:34 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 d6d056d3b -> a06bc8e12


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
new file mode 100644
index 0000000..a4e21de
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "common/continuation/continuation.h"
+#include "common/continuation/asio.h"
+
+#include <future>
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset) {
+  using namespace hadoop::hdfs;
+  using namespace hadoop::common;
+  BaseHeaderProto *base_h = new BaseHeaderProto();
+  base_h->set_allocated_block(new ExtendedBlockProto(*block));
+  if (token) {
+    base_h->set_allocated_token(new TokenProto(*token));
+  }
+  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+  h->set_clientname(client_name);
+  h->set_allocated_baseheader(base_h);
+
+  OpReadBlockProto p;
+  p.set_allocated_header(h);
+  p.set_offset(offset);
+  p.set_len(length);
+  p.set_sendchecksums(verify_checksum);
+  // TODO: p.set_allocated_cachingstrategy();
+  return p;
+}
+
+void BlockReaderImpl::AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset, const std::function<void(Status)> &handler) {
+  // The total number of bytes that we need to transfer from the DN is
+  // the amount that the user wants (bytesToRead), plus the padding at
+  // the beginning in order to chunk-align. Note that the DN may elect
+  // to send more than this amount if the read starts/ends mid-chunk.
+  bytes_to_read_ = length;
+
+  struct State {
+    std::string header;
+    hadoop::hdfs::OpReadBlockProto request;
+    hadoop::hdfs::BlockOpResponseProto response;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+
+  s->header.insert(s->header.begin(),
+                   {0, kDataTransferVersion, Operation::kReadBlock});
+  s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
+                                        dn_->token_.get(), block, length, offset));
+
+  auto read_pb_message =
+      new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(
+          dn_, &s->response);
+
+  m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header)))
+      .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request))
+      .Push(read_pb_message);
+
+  m->Run([this, handler, offset](const Status &status, const State &s) {    Status stat = status;
+    if (stat.ok()) {
+      const auto &resp = s.response;
+      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+        if (resp.has_readopchecksuminfo()) {
+          const auto &checksum_info = resp.readopchecksuminfo();
+          chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+        }
+        state_ = kReadPacketHeader;
+      } else {
+        stat = Status::Error(s.response.message().c_str());
+      }
+    }
+    handler(stat);
+  });
+}
+
+Status BlockReaderImpl::RequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  AsyncRequestBlock(client_name, block, length, offset,
+                [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset);
+
+struct BlockReaderImpl::ReadPacketHeader
+    : continuation::Continuation {
+  ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    parent_->packet_data_read_bytes_ = 0;
+    parent_->packet_len_ = 0;
+    auto handler = [next, this](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent_->packet_len_ = packet_length();
+        parent_->header_.Clear();
+        bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+                                                 header_length());
+        assert(v && "Failed to parse the header");
+        parent_->state_ = kReadChecksum;
+      }
+      next(status);
+    };
+
+    asio::async_read(*parent_->dn_, asio::buffer(buf_),
+                     std::bind(&ReadPacketHeader::CompletionHandler, this,
+                               std::placeholders::_1, std::placeholders::_2),
+                     handler);
+  }
+
+private:
+  static const size_t kMaxHeaderSize = 512;
+  static const size_t kPayloadLenOffset = 0;
+  static const size_t kPayloadLenSize = sizeof(int32_t);
+  static const size_t kHeaderLenOffset = 4;
+  static const size_t kHeaderLenSize = sizeof(int16_t);
+  static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+  BlockReaderImpl *parent_;
+  std::array<char, kMaxHeaderSize> buf_;
+
+  size_t packet_length() const {
+    return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
+  }
+
+  size_t header_length() const {
+    return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+  }
+
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    } else if (transferred < kHeaderStart) {
+      return kHeaderStart - transferred;
+    } else {
+      return kHeaderStart + header_length() - transferred;
+    }
+  }
+};
+
+struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
+  ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    auto parent = parent_;
+    if (parent->state_ != kReadChecksum) {
+      next(Status::OK());
+      return;
+    }
+
+    auto handler = [parent, next](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent->state_ =
+            parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
+      }
+      next(status);
+    };
+    parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
+                             parent->header_.datalen());
+    asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+};
+
+struct BlockReaderImpl::ReadData : continuation::Continuation {
+  ReadData(BlockReaderImpl *parent,
+           std::shared_ptr<size_t> bytes_transferred,
+           const asio::mutable_buffers_1 &buf)
+      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {
+    buf_.begin();
+  }
+
+  ~ReadData() {
+    buf_.end();
+  }
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next, this](const asio::error_code &ec, size_t transferred) {
+          Status status;
+          if (ec) {
+            status = Status(ec.value(), ec.message().c_str());
+          }
+          *bytes_transferred_ += transferred;
+          parent_->bytes_to_read_ -= transferred;
+          parent_->packet_data_read_bytes_ += transferred;
+          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+            parent_->state_ = kReadPacketHeader;
+          }
+          next(status);
+        };
+
+    auto data_len =
+        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+    asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len),
+               handler);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  const asio::mutable_buffers_1 buf_;
+};
+
+struct BlockReaderImpl::ReadPadding : continuation::Continuation {
+  ReadPadding(BlockReaderImpl *parent)
+      : parent_(parent), padding_(parent->chunk_padding_bytes_),
+        bytes_transferred_(std::make_shared<size_t>(0)),
+        read_data_(new ReadData(
+            parent, bytes_transferred_, asio::buffer(padding_))) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+      next(Status::OK());
+      return;
+    }
+
+    auto h = [next, this](const Status &status) {
+      if (status.ok()) {
+        assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
+               parent_->chunk_padding_bytes_);
+        parent_->chunk_padding_bytes_ = 0;
+        parent_->state_ = kReadData;
+      }
+      next(status);
+    };
+    read_data_->Run(h);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+  std::vector<char> padding_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  std::shared_ptr<continuation::Continuation> read_data_;
+  ReadPadding(const ReadPadding &) = delete;
+  ReadPadding &operator=(const ReadPadding &) = delete;
+};
+
+
+struct BlockReaderImpl::AckRead : continuation::Continuation {
+  AckRead(BlockReaderImpl *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->bytes_to_read_ > 0) {
+      next(Status::OK());
+      return;
+    }
+
+    auto m =
+        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+    m->state().set_status(parent_->options_.verify_checksum
+                              ? hadoop::hdfs::Status::CHECKSUM_OK
+                              : hadoop::hdfs::Status::SUCCESS);
+
+    m->Push(
+        continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
+
+    m->Run([this, next](const Status &status,
+                        const hadoop::hdfs::ClientReadStatusProto &) {
+      if (status.ok()) {
+        parent_->state_ = BlockReaderImpl::kFinished;
+      }
+      next(status);
+    });
+  }
+
+private:
+  BlockReaderImpl *parent_;
+};
+
+void BlockReaderImpl::AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> &handler) {
+  assert(state_ != kOpen && "Not connected");
+
+  struct State {
+    std::shared_ptr<size_t> bytes_transferred;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+  m->Push(new ReadPacketHeader(this))
+      .Push(new ReadChecksum(this))
+      .Push(new ReadPadding(this))
+      .Push(new ReadData(
+          this, m->state().bytes_transferred, buffers))
+      .Push(new AckRead(this));
+
+  auto self = this->shared_from_this();
+  m->Run([self, handler](const Status &status, const State &state) {
+    handler(status, *state.bytes_transferred);
+  });
+}
+
+
+size_t
+BlockReaderImpl::ReadPacket(const MutableBuffers &buffers,
+                                     Status *status) {
+  size_t transferred = 0;
+  auto done = std::make_shared<std::promise<void>>();
+  auto future = done->get_future();
+  AsyncReadPacket(buffers,
+                  [status, &transferred, done](const Status &stat, size_t t) {
+                    *status = stat;
+                    transferred = t;
+                    done->set_value();
+                  });
+  future.wait();
+  return transferred;
+}
+
+
+struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation {
+  RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
+                        const hadoop::hdfs::ExtendedBlockProto *block,
+                        uint64_t length, uint64_t offset)
+      : reader_(reader), client_name_(client_name), length_(length),
+        offset_(offset) {
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next &next) override {
+    reader_->AsyncRequestBlock(client_name_, &block_, length_,
+                           offset_, next);
+  }
+
+private:
+  BlockReader *reader_;
+  const std::string client_name_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation {
+  ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer,
+                        size_t *transferred)
+      : reader_(reader), buffer_(buffer),
+        buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
+  }
+
+  virtual void Run(const Next &next) override {
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+private:
+  BlockReader *reader_;
+  const MutableBuffers buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->AsyncReadPacket(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+void BlockReaderImpl::AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block,
+    size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) {
+
+  auto m = continuation::Pipeline<size_t>::Create();
+  size_t * bytesTransferred = &m->state();
+
+  size_t size = asio::buffer_size(buffers);
+
+  m->Push(new RequestBlockContinuation(this, client_name,
+                                            &block.b(), size, offset))
+    .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
+
+  m->Run([handler] (const Status &status,
+                         const size_t totalBytesTransferred) {
+    handler(status, totalBytesTransferred);
+  });
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
index 81636b9..140286b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -19,7 +19,9 @@
 #define BLOCK_READER_H_
 
 #include "libhdfspp/status.h"
+#include "common/async_stream.h"
 #include "datatransfer.pb.h"
+#include "connection/datanodeconnection.h"
 
 #include <memory>
 
@@ -55,38 +57,73 @@ struct BlockReaderOptions {
       : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
 };
 
-template <class Stream>
-class RemoteBlockReader
-    : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+/**
+ * Handles the operational state of request and reading a block (or portion of
+ * a block) from a DataNode.
+ *
+ * Threading model: not thread-safe.
+ * Lifecycle: should be created, used for a single read, then freed.
+ */
+class BlockReader {
 public:
-  explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
-      : stream_(stream), state_(kOpen), options_(options),
+  virtual void AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) = 0;
+
+  virtual void AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0;
+
+  virtual void AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset,
+    const std::function<void(Status)> &handler) = 0;
+};
+
+class BlockReaderImpl
+    : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
+public:
+  explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn)
+      : dn_(dn), state_(kOpen), options_(options),
         chunk_padding_bytes_(0) {}
 
-  template <class MutableBufferSequence, class ReadHandler>
-  void async_read_some(const MutableBufferSequence &buffers,
-                       const ReadHandler &handler);
+  virtual void AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> &handler) override;
+
+  virtual void AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset,
+    const std::function<void(Status)> &handler) override;
 
-  template <class MutableBufferSequence>
-  size_t read_some(const MutableBufferSequence &buffers, Status *status);
+  virtual void AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) override;
 
-  Status connect(const std::string &client_name,
-                 const hadoop::common::TokenProto *token,
-                 const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-                 uint64_t offset);
+  size_t ReadPacket(const MutableBuffers &buffers, Status *status);
 
-  template <class ConnectHandler>
-  void async_connect(const std::string &client_name,
-                     const hadoop::common::TokenProto *token,
-                     const hadoop::hdfs::ExtendedBlockProto *block,
-                     uint64_t length, uint64_t offset,
-                     const ConnectHandler &handler);
+  Status RequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset);
 
 private:
+  struct RequestBlockContinuation;
+  struct ReadBlockContinuation;
+
   struct ReadPacketHeader;
   struct ReadChecksum;
   struct ReadPadding;
-  template <class MutableBufferSequence> struct ReadData;
+  struct ReadData;
   struct AckRead;
   enum State {
     kOpen,
@@ -97,7 +134,7 @@ private:
     kFinished,
   };
 
-  Stream *stream_;
+  std::shared_ptr<DataNodeConnection> dn_;
   hadoop::hdfs::PacketHeaderProto header_;
   State state_;
   BlockReaderOptions options_;
@@ -109,6 +146,4 @@ private:
 };
 }
 
-#include "remote_block_reader_impl.h"
-
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
index 511c2eb..8be9ef8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -19,6 +19,10 @@
 #define LIB_READER_DATA_TRANSFER_H_
 
 #include "common/sasl_authenticator.h"
+#include "common/async_stream.h"
+#include "connection/datanodeconnection.h"
+#include <memory>
+
 
 namespace hdfs {
 
@@ -32,26 +36,32 @@ enum Operation {
   kReadBlock = 81,
 };
 
-template <class Stream> class DataTransferSaslStream {
+template <class Stream> class DataTransferSaslStream : public DataNodeConnection {
 public:
-  DataTransferSaslStream(Stream *stream, const std::string &username,
+  DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string &username,
                          const std::string &password)
       : stream_(stream), authenticator_(username, password) {}
 
   template <class Handler> void Handshake(const Handler &next);
 
-  template <class MutableBufferSequence, class ReadHandler>
-  void async_read_some(const MutableBufferSequence &buffers,
-                       ReadHandler &&handler);
+  void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    stream_->async_read_some(buf, handler);
+  }
 
-  template <class ConstBufferSequence, class WriteHandler>
-  void async_write_some(const ConstBufferSequence &buffers,
-                        WriteHandler &&handler);
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    stream_->async_write_some(buf, handler);
+  }
 
+  void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override
+  {(void)handler;  /*TODO: Handshaking goes here*/};
 private:
   DataTransferSaslStream(const DataTransferSaslStream &) = delete;
   DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   DigestMD5Authenticator authenticator_;
   struct ReadSaslMessage;
   struct Authenticator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
index 088b86e..3ca16e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
@@ -70,7 +70,7 @@ private:
 template <class Stream>
 struct DataTransferSaslStream<Stream>::ReadSaslMessage
     : continuation::Continuation {
-  ReadSaslMessage(Stream *stream, std::string *data)
+  ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data)
       : stream_(stream), data_(data), read_pb_(stream, &resp_) {}
 
   virtual void Run(const Next &next) override {
@@ -87,7 +87,7 @@ struct DataTransferSaslStream<Stream>::ReadSaslMessage
   }
 
 private:
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   std::string *data_;
   hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
   continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
@@ -97,7 +97,7 @@ template <class Stream>
 template <class Handler>
 void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
   using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
-  using ::hdfs::continuation::Write;
+  using ::hdfs::asio_continuation::Write;
   using ::hdfs::continuation::WriteDelimitedPBMessage;
 
   static const int kMagicNumber = htonl(kDataTransferSasl);
@@ -109,7 +109,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
     std::string resp0;
     DataTransferEncryptorMessageProto req1;
     std::string resp1;
-    Stream *stream;
+    std::shared_ptr<Stream> stream;
   };
   auto m = continuation::Pipeline<State>::Create();
   State *s = &m->state();
@@ -117,7 +117,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
 
   DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
 
-  m->Push(Write(stream_, kMagicNumberBuffer))
+  m->Push(Write(stream_.get(), kMagicNumberBuffer))
       .Push(WriteDelimitedPBMessage(stream_, &s->req0))
       .Push(new ReadSaslMessage(stream_, &s->resp0))
       .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
@@ -126,19 +126,6 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
   m->Run([next](const Status &status, const State &) { next(status); });
 }
 
-template <class Stream>
-template <class MutableBufferSequence, class ReadHandler>
-void DataTransferSaslStream<Stream>::async_read_some(
-    const MutableBufferSequence &buffers, ReadHandler &&handler) {
-  stream_->async_read_some(buffers, handler);
-}
-
-template <class Stream>
-template <typename ConstBufferSequence, typename WriteHandler>
-void DataTransferSaslStream<Stream>::async_write_some(
-    const ConstBufferSequence &buffers, WriteHandler &&handler) {
-  stream_->async_write_some(buffers, handler);
-}
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
new file mode 100644
index 0000000..ad10165
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_FILEINFO_H_
+#define LIB_READER_FILEINFO_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+
+namespace hdfs {
+
+/**
+ * Information that is assumed to be unchanging about a file for the duration of
+ * the operations.
+ */
+struct FileInfo {
+  unsigned long long file_length_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
deleted file mode 100644
index 68bc4ee..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "block_reader.h"
-
-namespace hdfs {
-
-hadoop::hdfs::OpReadBlockProto
-ReadBlockProto(const std::string &client_name, bool verify_checksum,
-               const hadoop::common::TokenProto *token,
-               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-               uint64_t offset) {
-  using namespace hadoop::hdfs;
-  using namespace hadoop::common;
-  BaseHeaderProto *base_h = new BaseHeaderProto();
-  base_h->set_allocated_block(new ExtendedBlockProto(*block));
-  if (token) {
-    base_h->set_allocated_token(new TokenProto(*token));
-  }
-  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
-  h->set_clientname(client_name);
-  h->set_allocated_baseheader(base_h);
-
-  OpReadBlockProto p;
-  p.set_allocated_header(h);
-  p.set_offset(offset);
-  p.set_len(length);
-  p.set_sendchecksums(verify_checksum);
-  // TODO: p.set_allocated_cachingstrategy();
-  return p;
-}
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
deleted file mode 100644
index 35c2ce4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
-#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
-
-#include "datatransfer.h"
-#include "common/continuation/asio.h"
-#include "common/continuation/protobuf.h"
-
-#include <asio/buffers_iterator.hpp>
-#include <asio/streambuf.hpp>
-#include <asio/write.hpp>
-
-#include <arpa/inet.h>
-
-#include <future>
-
-namespace hdfs {
-
-hadoop::hdfs::OpReadBlockProto
-ReadBlockProto(const std::string &client_name, bool verify_checksum,
-               const hadoop::common::TokenProto *token,
-               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-               uint64_t offset);
-
-template <class Stream>
-template <class ConnectHandler>
-void RemoteBlockReader<Stream>::async_connect(
-    const std::string &client_name, const hadoop::common::TokenProto *token,
-    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-    uint64_t offset, const ConnectHandler &handler) {
-  // The total number of bytes that we need to transfer from the DN is
-  // the amount that the user wants (bytesToRead), plus the padding at
-  // the beginning in order to chunk-align. Note that the DN may elect
-  // to send more than this amount if the read starts/ends mid-chunk.
-  bytes_to_read_ = length;
-
-  struct State {
-    std::string header;
-    hadoop::hdfs::OpReadBlockProto request;
-    hadoop::hdfs::BlockOpResponseProto response;
-  };
-
-  auto m = continuation::Pipeline<State>::Create();
-  State *s = &m->state();
-
-  s->header.insert(s->header.begin(),
-                   {0, kDataTransferVersion, Operation::kReadBlock});
-  s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
-                                        token, block, length, offset));
-
-  auto read_pb_message =
-      new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
-          stream_, &s->response);
-
-  m->Push(continuation::Write(stream_, asio::buffer(s->header)))
-      .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
-      .Push(read_pb_message);
-
-  m->Run([this, handler, offset](const Status &status, const State &s) {
-    Status stat = status;
-    if (stat.ok()) {
-      const auto &resp = s.response;
-      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
-        if (resp.has_readopchecksuminfo()) {
-          const auto &checksum_info = resp.readopchecksuminfo();
-          chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
-        }
-        state_ = kReadPacketHeader;
-      } else {
-        stat = Status::Error(s.response.message().c_str());
-      }
-    }
-    handler(stat);
-  });
-}
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadPacketHeader
-    : continuation::Continuation {
-  ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
-
-  virtual void Run(const Next &next) override {
-    parent_->packet_data_read_bytes_ = 0;
-    parent_->packet_len_ = 0;
-    auto handler = [next, this](const asio::error_code &ec, size_t) {
-      Status status;
-      if (ec) {
-        status = Status(ec.value(), ec.message().c_str());
-      } else {
-        parent_->packet_len_ = packet_length();
-        parent_->header_.Clear();
-        bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
-                                                 header_length());
-        assert(v && "Failed to parse the header");
-        parent_->state_ = kReadChecksum;
-      }
-      next(status);
-    };
-
-    asio::async_read(*parent_->stream_, asio::buffer(buf_),
-                     std::bind(&ReadPacketHeader::CompletionHandler, this,
-                               std::placeholders::_1, std::placeholders::_2),
-                     handler);
-  }
-
-private:
-  static const size_t kMaxHeaderSize = 512;
-  static const size_t kPayloadLenOffset = 0;
-  static const size_t kPayloadLenSize = sizeof(int32_t);
-  static const size_t kHeaderLenOffset = 4;
-  static const size_t kHeaderLenSize = sizeof(int16_t);
-  static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
-
-  RemoteBlockReader<Stream> *parent_;
-  std::array<char, kMaxHeaderSize> buf_;
-
-  size_t packet_length() const {
-    return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
-  }
-
-  size_t header_length() const {
-    return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
-  }
-
-  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
-    if (ec) {
-      return 0;
-    } else if (transferred < kHeaderStart) {
-      return kHeaderStart - transferred;
-    } else {
-      return kHeaderStart + header_length() - transferred;
-    }
-  }
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
-  ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
-
-  virtual void Run(const Next &next) override {
-    auto parent = parent_;
-    if (parent->state_ != kReadChecksum) {
-      next(Status::OK());
-      return;
-    }
-
-    auto handler = [parent, next](const asio::error_code &ec, size_t) {
-      Status status;
-      if (ec) {
-        status = Status(ec.value(), ec.message().c_str());
-      } else {
-        parent->state_ =
-            parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
-      }
-      next(status);
-    };
-    parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
-                             parent->header_.datalen());
-    asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
-                     handler);
-  }
-
-private:
-  RemoteBlockReader<Stream> *parent_;
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
-  ReadPadding(RemoteBlockReader<Stream> *parent)
-      : parent_(parent), padding_(parent->chunk_padding_bytes_),
-        bytes_transferred_(std::make_shared<size_t>(0)),
-        read_data_(new ReadData<asio::mutable_buffers_1>(
-            parent, bytes_transferred_, asio::buffer(padding_))) {}
-
-  virtual void Run(const Next &next) override {
-    if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
-      next(Status::OK());
-      return;
-    }
-
-    auto h = [next, this](const Status &status) {
-      if (status.ok()) {
-        assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
-               parent_->chunk_padding_bytes_);
-        parent_->chunk_padding_bytes_ = 0;
-        parent_->state_ = kReadData;
-      }
-      next(status);
-    };
-    read_data_->Run(h);
-  }
-
-private:
-  RemoteBlockReader<Stream> *parent_;
-  std::vector<char> padding_;
-  std::shared_ptr<size_t> bytes_transferred_;
-  std::shared_ptr<continuation::Continuation> read_data_;
-  ReadPadding(const ReadPadding &) = delete;
-  ReadPadding &operator=(const ReadPadding &) = delete;
-};
-
-template <class Stream>
-template <class MutableBufferSequence>
-struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
-  ReadData(RemoteBlockReader<Stream> *parent,
-           std::shared_ptr<size_t> bytes_transferred,
-           const MutableBufferSequence &buf)
-      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
-
-  virtual void Run(const Next &next) override {
-    auto handler =
-        [next, this](const asio::error_code &ec, size_t transferred) {
-          Status status;
-          if (ec) {
-            status = Status(ec.value(), ec.message().c_str());
-          }
-          *bytes_transferred_ += transferred;
-          parent_->bytes_to_read_ -= transferred;
-          parent_->packet_data_read_bytes_ += transferred;
-          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
-            parent_->state_ = kReadPacketHeader;
-          }
-          next(status);
-        };
-
-    auto data_len =
-        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
-    async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
-               handler);
-  }
-
-private:
-  RemoteBlockReader<Stream> *parent_;
-  std::shared_ptr<size_t> bytes_transferred_;
-  MutableBufferSequence buf_;
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
-  AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
-
-  virtual void Run(const Next &next) override {
-    if (parent_->bytes_to_read_ > 0) {
-      next(Status::OK());
-      return;
-    }
-
-    auto m =
-        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
-    m->state().set_status(parent_->options_.verify_checksum
-                              ? hadoop::hdfs::Status::CHECKSUM_OK
-                              : hadoop::hdfs::Status::SUCCESS);
-
-    m->Push(
-        continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
-
-    m->Run([this, next](const Status &status,
-                        const hadoop::hdfs::ClientReadStatusProto &) {
-      if (status.ok()) {
-        parent_->state_ = RemoteBlockReader<Stream>::kFinished;
-      }
-      next(status);
-    });
-  }
-
-private:
-  RemoteBlockReader<Stream> *parent_;
-};
-
-template <class Stream>
-template <class MutableBufferSequence, class ReadHandler>
-void RemoteBlockReader<Stream>::async_read_some(
-    const MutableBufferSequence &buffers, const ReadHandler &handler) {
-  assert(state_ != kOpen && "Not connected");
-
-  struct State {
-    std::shared_ptr<size_t> bytes_transferred;
-  };
-  auto m = continuation::Pipeline<State>::Create();
-  m->state().bytes_transferred = std::make_shared<size_t>(0);
-
-  m->Push(new ReadPacketHeader(this))
-      .Push(new ReadChecksum(this))
-      .Push(new ReadPadding(this))
-      .Push(new ReadData<MutableBufferSequence>(
-          this, m->state().bytes_transferred, buffers))
-      .Push(new AckRead(this));
-
-  auto self = this->shared_from_this();
-  m->Run([self, handler](const Status &status, const State &state) {
-    handler(status, *state.bytes_transferred);
-  });
-}
-
-template <class Stream>
-template <class MutableBufferSequence>
-size_t
-RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
-                                     Status *status) {
-  size_t transferred = 0;
-  auto done = std::make_shared<std::promise<void>>();
-  auto future = done->get_future();
-  async_read_some(buffers,
-                  [status, &transferred, done](const Status &stat, size_t t) {
-                    *status = stat;
-                    transferred = t;
-                    done->set_value();
-                  });
-  future.wait();
-  return transferred;
-}
-
-template <class Stream>
-Status RemoteBlockReader<Stream>::connect(
-    const std::string &client_name, const hadoop::common::TokenProto *token,
-    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-    uint64_t offset) {
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  async_connect(client_name, token, block, length, offset,
-                [stat](const Status &status) { stat->set_value(status); });
-  return future.get();
-}
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index 83721a7..29d455f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -19,9 +19,6 @@
 #include "rpc_connection.h"
 #include "common/util.h"
 
-#include <openssl/rand.h>
-
-#include <sstream>
 #include <future>
 
 namespace hdfs {
@@ -83,15 +80,4 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
   return future.get();
 }
 
-std::string RpcEngine::GetRandomClientName() {
-  unsigned char buf[6] = {
-      0,
-  };
-  RAND_pseudo_bytes(buf, sizeof(buf));
-
-  std::stringstream ss;
-  ss << "libhdfs++_"
-     << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
-  return ss.str();
-}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index 7d06141..cc5ab01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -32,7 +32,7 @@ include_directories(
     ${LIBHDFS_SRC_DIR}
     ${OS_DIR}
 )
-add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs_cpp.cc)
+add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
 
 add_library(test_common OBJECT mock_connection.cc)
 
@@ -44,24 +44,20 @@ protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS
 )
 
 add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
-target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(remote_block_reader_test reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(remote_block_reader remote_block_reader_test)
 
 add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
 target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(sasl_digest_md5 sasl_digest_md5_test)
 
-add_executable(inputstream_test inputstream_test.cc)
-target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
-add_test(inputstream inputstream_test)
-
 include_directories(${CMAKE_CURRENT_BINARY_DIR})
 add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>)
 target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(rpc_engine rpc_engine_test)
 
 add_executable(bad_datanode_test bad_datanode_test.cc)
-target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader  ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(bad_datanode bad_datanode_test)
 
 add_executable(node_exclusion_test node_exclusion_test.cc)
@@ -73,5 +69,5 @@ target_link_libraries(configuration_test common gmock_main ${CMAKE_THREAD_LIBS_I
 add_test(configuration configuration_test)
 
 build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c)
-link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
+link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
 add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index cf1fdbb..0f69195 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -19,6 +19,8 @@
 #include "fs/filesystem.h"
 #include "fs/bad_datanode_tracker.h"
 
+#include "common/util.h"
+
 #include <gmock/gmock.h>
 
 using hadoop::common::TokenProto;
@@ -34,70 +36,140 @@ using ::testing::Return;
 
 using namespace hdfs;
 
-class MockReader {
- public:
-  virtual ~MockReader() {}
+class MockReader : public BlockReader {
+public:
   MOCK_METHOD2(
-      async_read_some,
+      AsyncReadPacket,
       void(const asio::mutable_buffers_1 &,
            const std::function<void(const Status &, size_t transferred)> &));
 
-  MOCK_METHOD6(async_connect,
-               void(const std::string &, TokenProto *, ExtendedBlockProto *,
-                    uint64_t, uint64_t,
-                    const std::function<void(const Status &)> &));
+  MOCK_METHOD5(AsyncRequestBlock,
+               void(const std::string &client_name,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const std::function<void(Status)> &handler));
+
+  MOCK_METHOD5(AsyncReadBlock, void(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block,
+    size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler));
 };
 
-template <class Trait>
-struct MockBlockReaderTrait {
-  typedef MockReader Reader;
-  struct State {
-    MockReader reader_;
-    size_t transferred_;
-    Reader *reader() { return &reader_; }
-    size_t *transferred() { return &transferred_; }
-    const size_t *transferred() const { return &transferred_; }
-  };
+class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> {
+    void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override {
+      handler(Status::OK(), shared_from_this());
+    }
+
+  void async_read_some(const MutableBuffers &buf,
+        std::function<void (const asio::error_code & error,
+                               std::size_t bytes_transferred) > handler) override {
+      (void)buf;
+      handler(asio::error::fault, 0);
+  }
 
-  static continuation::Pipeline<State> *CreatePipeline(
-      ::asio::io_service *, const DatanodeInfoProto &) {
-    auto m = continuation::Pipeline<State>::Create();
-    *m->state().transferred() = 0;
-    Trait::InitializeMockReader(m->state().reader());
-    return m;
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+      (void)buf;
+      handler(asio::error::fault, 0);
   }
 };
 
+
+class PartialMockFileHandle : public FileHandleImpl {
+  using FileHandleImpl::FileHandleImpl;
+public:
+  std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
+protected:
+  std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
+                                                 std::shared_ptr<DataNodeConnection> dn) override
+  {
+    (void) options; (void) dn;
+    assert(mock_reader_);
+    return mock_reader_;
+  }
+  std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+      ::asio::io_service *io_service,
+      const ::hadoop::hdfs::DatanodeInfoProto & dn,
+      const hadoop::common::TokenProto * token) override {
+    (void) io_service; (void) dn; (void) token;
+    return std::make_shared<MockDNConnection>();
+  }
+
+
+};
+
+TEST(BadDataNodeTest, TestNoNodes) {
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanode holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
+  bad_node_tracker->AddBadNode("foo");
+
+  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker);
+  Status stat;
+  size_t read = 0;
+
+  // Exclude the one datanode with the data
+  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), nullptr,
+      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+
+  // Should fail with no resource available
+  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
+  ASSERT_EQ(0UL, read);
+}
+
 TEST(BadDataNodeTest, RecoverableError) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanode holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
   char buf[4096] = {
       0,
   };
   IoServiceImpl io_service;
-  Options default_options;
-  FileSystemImpl fs(&io_service, default_options);
   auto tracker = std::make_shared<BadDataNodeTracker>();
-  InputStreamImpl is(&fs, &blocks, tracker);
+  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(),  file_info, tracker);
   Status stat;
   size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          // resource unavailable error
-          .WillOnce(InvokeArgument<1>(
-              Status::ResourceUnavailable(
-                  "Unable to get some resource, try again later"),
-              sizeof(buf)));
-    }
-  };
+  EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+      // resource unavailable error
+      .WillOnce(InvokeArgument<4>(
+          Status::ResourceUnavailable("Unable to get some resource, try again later"), 0));
 
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+
+  is.AsyncPreadSome(
+      0, asio::buffer(buf, sizeof(buf)), nullptr,
       [&stat, &read](const Status &status, const std::string &,
                      size_t transferred) {
         stat = status;
@@ -108,7 +180,7 @@ TEST(BadDataNodeTest, RecoverableError) {
 
   std::string failing_dn = "id_of_bad_datanode";
   if (!stat.ok()) {
-    if (InputStream::ShouldExclude(stat)) {
+    if (FileHandle::ShouldExclude(stat)) {
       tracker->AddBadNode(failing_dn);
     }
   }
@@ -117,35 +189,37 @@ TEST(BadDataNodeTest, RecoverableError) {
 }
 
 TEST(BadDataNodeTest, InternalError) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanode holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
   char buf[4096] = {
       0,
   };
   IoServiceImpl io_service;
-  Options default_options;
   auto tracker = std::make_shared<BadDataNodeTracker>();
-  FileSystemImpl fs(&io_service, default_options);
-  InputStreamImpl is(&fs, &blocks, tracker);
+  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(),  file_info, tracker);
   Status stat;
   size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          // something bad happened on the DN, calling again isn't going to help
-          .WillOnce(
-              InvokeArgument<1>(Status::Exception("server_explosion_exception",
-                                                  "the server exploded"),
+  EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+      // resource unavailable error
+      .WillOnce(InvokeArgument<4>(
+              Status::Exception("server_explosion_exception",
+                                "the server exploded"),
                                 sizeof(buf)));
-    }
-  };
 
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+  is.AsyncPreadSome(
+      0, asio::buffer(buf, sizeof(buf)), nullptr,
       [&stat, &read](const Status &status, const std::string &,
                      size_t transferred) {
         stat = status;
@@ -156,7 +230,7 @@ TEST(BadDataNodeTest, InternalError) {
 
   std::string failing_dn = "id_of_bad_datanode";
   if (!stat.ok()) {
-    if (InputStream::ShouldExclude(stat)) {
+    if (FileHandle::ShouldExclude(stat)) {
       tracker->AddBadNode(failing_dn);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
deleted file mode 100644
index 786b846..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "fs/filesystem.h"
-#include "fs/bad_datanode_tracker.h"
-#include <gmock/gmock.h>
-
-using hadoop::common::TokenProto;
-using hadoop::hdfs::DatanodeInfoProto;
-using hadoop::hdfs::DatanodeIDProto;
-using hadoop::hdfs::ExtendedBlockProto;
-using hadoop::hdfs::LocatedBlockProto;
-using hadoop::hdfs::LocatedBlocksProto;
-
-using ::testing::_;
-using ::testing::InvokeArgument;
-using ::testing::Return;
-
-using namespace hdfs;
-
-namespace hdfs {
-
-class MockReader {
- public:
-  virtual ~MockReader() {}
-  MOCK_METHOD2(
-      async_read_some,
-      void(const asio::mutable_buffers_1 &,
-           const std::function<void(const Status &, size_t transferred)> &));
-
-  MOCK_METHOD6(async_connect,
-               void(const std::string &, TokenProto *, ExtendedBlockProto *,
-                    uint64_t, uint64_t,
-                    const std::function<void(const Status &)> &));
-};
-
-template <class Trait>
-struct MockBlockReaderTrait {
-  typedef MockReader Reader;
-  struct State {
-    MockReader reader_;
-    size_t transferred_;
-    Reader *reader() { return &reader_; }
-    size_t *transferred() { return &transferred_; }
-    const size_t *transferred() const { return &transferred_; }
-  };
-
-  static continuation::Pipeline<State> *CreatePipeline(
-      ::asio::io_service *, const DatanodeInfoProto &) {
-    auto m = continuation::Pipeline<State>::Create();
-    *m->state().transferred() = 0;
-    Trait::InitializeMockReader(m->state().reader());
-    return m;
-  }
-};
-}
-
-TEST(InputStreamTest, TestReadSingleTrunk) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
-    }
-  };
-
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &,
-                     size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(sizeof(buf), read);
-  read = 0;
-}
-
-TEST(InputStreamTest, TestReadMultipleTrunk) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .Times(4)
-          .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
-    }
-  };
-
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &,
-                     size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(sizeof(buf), read);
-  read = 0;
-}
-
-TEST(InputStreamTest, TestReadError) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
-          .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
-    }
-  };
-
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &,
-                     size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_FALSE(stat.ok());
-  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
-  read = 0;
-}
-
-TEST(InputStreamTest, TestExcludeDataNode) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto *block = blocks.add_blocks();
-  ExtendedBlockProto *b = block->mutable_b();
-  b->set_poolid("");
-  b->set_blockid(1);
-  b->set_generationstamp(1);
-  b->set_numbytes(4096);
-
-  DatanodeInfoProto *di = block->add_locs();
-  DatanodeIDProto *dnid = di->mutable_id();
-  dnid->set_datanodeuuid("foo");
-
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
-    }
-  };
-
-  std::shared_ptr<NodeExclusionRule> exclude_set =
-      std::make_shared<ExclusionSet>(std::set<std::string>({"foo"}));
-  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), exclude_set,
-                    [&stat, &read](const Status &status, const std::string &,
-                                   size_t transferred) {
-                      stat = status;
-                      read = transferred;
-                    });
-  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again),
-            stat.code());
-  ASSERT_EQ(0UL, read);
-}
-
-int main(int argc, char *argv[]) {
-  // The following line must be executed to initialize Google Mock
-  // (and Google Test) before running the tests.
-  ::testing::InitGoogleMock(&argc, argv);
-  return RUN_ALL_TESTS();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
index 8c0ef8c..4c15375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
@@ -18,6 +18,8 @@
 #ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_
 #define LIBHDFSPP_TEST_MOCK_CONNECTION_H_
 
+#include "common/async_stream.h"
+
 #include <asio/error_code.hpp>
 #include <asio/buffer.hpp>
 #include <asio/streambuf.hpp>
@@ -27,13 +29,15 @@
 
 namespace hdfs {
 
-class MockConnectionBase {
+class MockConnectionBase : public AsyncStream{
 public:
   MockConnectionBase(::asio::io_service *io_service);
   virtual ~MockConnectionBase();
   typedef std::pair<asio::error_code, std::string> ProducerResult;
-  template <class MutableBufferSequence, class Handler>
-  void async_read_some(const MutableBufferSequence &buf, Handler &&handler) {
+
+  void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
     if (produced_.size() == 0) {
       ProducerResult r = Produce();
       if (r.first) {
@@ -51,8 +55,9 @@ public:
     io_service_->post(std::bind(handler, asio::error_code(), len));
   }
 
-  template <class ConstBufferSequence, class Handler>
-  void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
     // CompletionResult res = OnWrite(buf);
     io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf)));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
index f54b14f..6f3122e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -21,6 +21,8 @@
 #include "datatransfer.pb.h"
 #include "common/util.h"
 #include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "reader/fileinfo.h"
 
 #include <google/protobuf/io/coded_stream.h>
 #include <google/protobuf/io/zero_copy_stream_impl.h>
@@ -36,10 +38,14 @@ using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
 using ::hadoop::hdfs::ExtendedBlockProto;
 using ::hadoop::hdfs::PacketHeaderProto;
 using ::hadoop::hdfs::ReadOpChecksumInfoProto;
+using ::hadoop::hdfs::LocatedBlockProto;
+using ::hadoop::hdfs::LocatedBlocksProto;
 
 using ::asio::buffer;
 using ::asio::error_code;
 using ::asio::mutable_buffers_1;
+using ::testing::_;
+using ::testing::InvokeArgument;
 using ::testing::Return;
 using std::make_pair;
 using std::string;
@@ -49,12 +55,47 @@ namespace pbio = pb::io;
 
 namespace hdfs {
 
-class MockDNConnection : public MockConnectionBase {
- public:
+class MockDNConnection : public MockConnectionBase, public DataNodeConnection{
+public:
   MockDNConnection(::asio::io_service &io_service)
       : MockConnectionBase(&io_service) {}
   MOCK_METHOD0(Produce, ProducerResult());
+
+  MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>));
+
+  void async_read_some(const MutableBuffers &buf,
+        std::function<void (const asio::error_code & error,
+                               std::size_t bytes_transferred) > handler) override {
+      this->MockConnectionBase::async_read_some(buf, handler);
+  }
+
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    this->MockConnectionBase::async_write_some(buf, handler);
+  }
+};
+
+// Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we
+//     can test the logic of AsyncReadBlock
+class PartialMockReader : public BlockReaderImpl {
+public:
+  PartialMockReader() :
+    BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {};
+
+  MOCK_METHOD2(
+      AsyncReadPacket,
+      void(const asio::mutable_buffers_1 &,
+           const std::function<void(const Status &, size_t transferred)> &));
+
+  MOCK_METHOD5(AsyncRequestBlock,
+               void(const std::string &client_name,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const std::function<void(Status)> &handler));
 };
+
+
 }
 
 static inline string ToDelimitedString(const pb::MessageLite *msg) {
@@ -94,20 +135,102 @@ static inline std::pair<error_code, string> ProducePacket(
   return std::make_pair(error_code(), std::move(payload));
 }
 
+TEST(RemoteBlockReaderTest, TestReadSingleTrunk) {
+  auto file_info = std::make_shared<struct FileInfo>();
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+
+  Status stat;
+  size_t read = 0;
+  PartialMockReader reader;
+  EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+      .WillOnce(InvokeArgument<4>(Status::OK()));
+  EXPECT_CALL(reader, AsyncReadPacket(_, _))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+
+  reader.AsyncReadBlock(
+       GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) {
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  Status stat;
+  size_t read = 0;
+
+  PartialMockReader reader;
+  EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+      .WillOnce(InvokeArgument<4>(Status::OK()));
+
+  EXPECT_CALL(reader, AsyncReadPacket(_, _))
+      .Times(4)
+      .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
+
+  reader.AsyncReadBlock(
+       GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(RemoteBlockReaderTest, TestReadError) {
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  Status stat;
+  size_t read = 0;
+  PartialMockReader reader;
+  EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+      .WillOnce(InvokeArgument<4>(Status::OK()));
+
+  EXPECT_CALL(reader, AsyncReadPacket(_, _))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+      .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
+
+  reader.AsyncReadBlock(
+       GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
+  read = 0;
+}
+
 template <class Stream = MockDNConnection, class Handler>
-static std::shared_ptr<RemoteBlockReader<Stream>> ReadContent(
-    Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
-    uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
-    const Handler &handler) {
+static std::shared_ptr<BlockReaderImpl>
+ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block,
+            uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
+            const Handler &handler) {
   BlockReaderOptions options;
-  auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
   Status result;
-  reader->async_connect("libhdfs++", token, &block, length, offset,
+  reader->AsyncRequestBlock("libhdfs++", &block, length, offset,
                         [buf, reader, handler](const Status &stat) {
                           if (!stat.ok()) {
                             handler(stat, 0);
                           } else {
-                            reader->async_read_some(buf, handler);
+                            reader->AsyncReadPacket(buf, handler);
                           }
                         });
   return reader;
@@ -117,11 +240,11 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
 
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
 
@@ -130,16 +253,19 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   block.set_blockid(0);
   block.set_generationstamp(0);
 
+  bool done = false;
   std::string data(kChunkSize, 0);
-  ReadContent(&conn, nullptr, block, kChunkSize, 0,
+  ReadContent(conn, block, kChunkSize, 0,
               buffer(const_cast<char *>(data.c_str()), data.size()),
-              [&data, &io_service](const Status &stat, size_t transferred) {
+              [&data, &io_service, &done](const Status &stat, size_t transferred) {
                 ASSERT_TRUE(stat.ok());
                 ASSERT_EQ(kChunkSize, transferred);
                 ASSERT_EQ(kChunkData, data);
+                done = true;
                 io_service.stop();
               });
   io_service.run();
+  ASSERT_TRUE(done);
 }
 
 TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
@@ -149,7 +275,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
 
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
   ReadOpChecksumInfoProto *checksum_info =
       block_op_resp.mutable_readopchecksuminfo();
@@ -159,7 +285,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   checksum->set_bytesperchecksum(512);
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
 
@@ -168,16 +294,20 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   block.set_blockid(0);
   block.set_generationstamp(0);
 
+  bool done = false;
+
   string data(kLength, 0);
-  ReadContent(&conn, nullptr, block, data.size(), kOffset,
+  ReadContent(conn, block, data.size(), kOffset,
               buffer(const_cast<char *>(data.c_str()), data.size()),
-              [&data, &io_service](const Status &stat, size_t transferred) {
+              [&data, &io_service,&done](const Status &stat, size_t transferred) {
                 ASSERT_TRUE(stat.ok());
                 ASSERT_EQ(kLength, transferred);
                 ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+                done = true;
                 io_service.stop();
               });
   io_service.run();
+  ASSERT_TRUE(done);
 }
 
 TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
@@ -185,11 +315,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   static const string kChunkData(kChunkSize, 'a');
 
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)))
       .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
@@ -202,25 +332,22 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   string data(kChunkSize, 0);
   mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
   BlockReaderOptions options;
-  auto reader =
-      std::make_shared<RemoteBlockReader<MockDNConnection>>(options, &conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
   Status result;
-  reader->async_connect(
-      "libhdfs++", nullptr, &block, data.size(), 0,
+  reader->AsyncRequestBlock(
+      "libhdfs++", &block, data.size(), 0,
       [buf, reader, &data, &io_service](const Status &stat) {
         ASSERT_TRUE(stat.ok());
-        reader->async_read_some(
-            buf, [buf, reader, &data, &io_service](const Status &stat,
-                                                   size_t transferred) {
+        reader->AsyncReadPacket(
+            buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
               ASSERT_TRUE(stat.ok());
               ASSERT_EQ(kChunkSize, transferred);
               ASSERT_EQ(kChunkData, data);
               data.clear();
               data.resize(kChunkSize);
               transferred = 0;
-              reader->async_read_some(
-                  buf,
-                  [&data, &io_service](const Status &stat, size_t transferred) {
+              reader->AsyncReadPacket(
+                  buf, [&data,&io_service](const Status &stat, size_t transferred) {
                     ASSERT_TRUE(stat.ok());
                     ASSERT_EQ(kChunkSize, transferred);
                     ASSERT_EQ(kChunkData, data);
@@ -234,12 +361,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
 TEST(RemoteBlockReaderTest, TestSaslConnection) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');
-  static const string kAuthPayload =
-      "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
-      "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
-      "charset=utf-8,algorithm=md5-sess";
+  static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+                                     "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
+                                     "charset=utf-8,algorithm=md5-sess";
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
@@ -252,23 +378,23 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
       ::hadoop::hdfs::
           DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
 
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
       .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
 
-  DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
+  auto sasl_conn = std::make_shared<DataTransferSaslStream<MockDNConnection> >(conn, "foo", "bar");
   ExtendedBlockProto block;
   block.set_poolid("foo");
   block.set_blockid(0);
   block.set_generationstamp(0);
 
   std::string data(kChunkSize, 0);
-  sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service](
+  sasl_conn->Handshake([sasl_conn, &block, &data, &io_service](
       const Status &s) {
     ASSERT_TRUE(s.ok());
-    ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
+    ReadContent(sasl_conn, block, kChunkSize, 0,
                 buffer(const_cast<char *>(data.c_str()), data.size()),
                 [&data, &io_service](const Status &stat, size_t transferred) {
                   ASSERT_TRUE(stat.ok());


Mime
View raw message