hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [06/51] [abbrv] [partial] hadoop git commit: HDFS-9207. Move the implementation to the hdfs-native-client module. Contributed by Haohui Mai.
Date Fri, 16 Oct 2015 18:56:38 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
new file mode 100644
index 0000000..71e28ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(reader remote_block_reader.cc datatransfer.cc)
+add_dependencies(reader proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
new file mode 100644
index 0000000..81636b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCK_READER_H_
+#define BLOCK_READER_H_
+
+#include "libhdfspp/status.h"
+#include "datatransfer.pb.h"
+
+#include <memory>
+
+namespace hdfs {
+
+struct CacheStrategy {
+  bool drop_behind_specified;
+  bool drop_behind;
+  bool read_ahead_specified;
+  unsigned long long read_ahead;
+  CacheStrategy()
+      : drop_behind_specified(false), drop_behind(false),
+        read_ahead_specified(false), read_ahead(false) {}
+};
+
+enum DropBehindStrategy {
+  kUnspecified = 0,
+  kEnableDropBehind = 1,
+  kDisableDropBehind = 2,
+};
+
+enum EncryptionScheme {
+  kNone = 0,
+  kAESCTRNoPadding = 1,
+};
+
+struct BlockReaderOptions {
+  bool verify_checksum;
+  CacheStrategy cache_strategy;
+  EncryptionScheme encryption_scheme;
+
+  BlockReaderOptions()
+      : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
+};
+
+template <class Stream>
+class RemoteBlockReader
+    : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+public:
+  explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
+      : stream_(stream), state_(kOpen), options_(options),
+        chunk_padding_bytes_(0) {}
+
+  template <class MutableBufferSequence, class ReadHandler>
+  void async_read_some(const MutableBufferSequence &buffers,
+                       const ReadHandler &handler);
+
+  template <class MutableBufferSequence>
+  size_t read_some(const MutableBufferSequence &buffers, Status *status);
+
+  Status connect(const std::string &client_name,
+                 const hadoop::common::TokenProto *token,
+                 const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+                 uint64_t offset);
+
+  template <class ConnectHandler>
+  void async_connect(const std::string &client_name,
+                     const hadoop::common::TokenProto *token,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const ConnectHandler &handler);
+
+private:
+  struct ReadPacketHeader;
+  struct ReadChecksum;
+  struct ReadPadding;
+  template <class MutableBufferSequence> struct ReadData;
+  struct AckRead;
+  enum State {
+    kOpen,
+    kReadPacketHeader,
+    kReadChecksum,
+    kReadPadding,
+    kReadData,
+    kFinished,
+  };
+
+  Stream *stream_;
+  hadoop::hdfs::PacketHeaderProto header_;
+  State state_;
+  BlockReaderOptions options_;
+  size_t packet_len_;
+  int packet_data_read_bytes_;
+  int chunk_padding_bytes_;
+  long long bytes_to_read_;
+  std::vector<char> checksum_;
+};
+}
+
+#include "remote_block_reader_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
new file mode 100644
index 0000000..d936407
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datatransfer.h"
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+namespace DataTransferSaslStreamUtil {
+
+static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS;
+
+using hadoop::hdfs::DataTransferEncryptorMessageProto;
+
+Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload) {
+  using namespace hadoop::hdfs;
+  auto s = msg->status();
+  if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY) {
+    payload->clear();
+    return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str());
+  } else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) {
+    payload->clear();
+    return Status::Error(msg->message().c_str());
+  } else {
+    *payload = msg->payload();
+    return Status::OK();
+  }
+}
+
+void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) {
+  msg->set_status(kSUCCESS);
+  msg->set_payload("");
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
new file mode 100644
index 0000000..511c2eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_DATA_TRANSFER_H_
+#define LIB_READER_DATA_TRANSFER_H_
+
+#include "common/sasl_authenticator.h"
+
+namespace hdfs {
+
+enum {
+  kDataTransferVersion = 28,
+  kDataTransferSasl = 0xdeadbeef,
+};
+
+enum Operation {
+  kWriteBlock = 80,
+  kReadBlock = 81,
+};
+
+template <class Stream> class DataTransferSaslStream {
+public:
+  DataTransferSaslStream(Stream *stream, const std::string &username,
+                         const std::string &password)
+      : stream_(stream), authenticator_(username, password) {}
+
+  template <class Handler> void Handshake(const Handler &next);
+
+  template <class MutableBufferSequence, class ReadHandler>
+  void async_read_some(const MutableBufferSequence &buffers,
+                       ReadHandler &&handler);
+
+  template <class ConstBufferSequence, class WriteHandler>
+  void async_write_some(const ConstBufferSequence &buffers,
+                        WriteHandler &&handler);
+
+private:
+  DataTransferSaslStream(const DataTransferSaslStream &) = delete;
+  DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
+  Stream *stream_;
+  DigestMD5Authenticator authenticator_;
+  struct ReadSaslMessage;
+  struct Authenticator;
+};
+}
+
+#include "datatransfer_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
new file mode 100644
index 0000000..088b86e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_DATATRANFER_IMPL_H_
+#define LIB_READER_DATATRANFER_IMPL_H_
+
+#include "datatransfer.pb.h"
+#include "common/continuation/continuation.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/read.hpp>
+#include <asio/buffer.hpp>
+
+namespace hdfs {
+
+namespace DataTransferSaslStreamUtil {
+Status
+ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg,
+                std::string *payload);
+void PrepareInitialHandshake(
+    ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg);
+}
+
+template <class Stream>
+struct DataTransferSaslStream<Stream>::Authenticator
+    : continuation::Continuation {
+  Authenticator(DigestMD5Authenticator *authenticator,
+                const std::string *request,
+                hadoop::hdfs::DataTransferEncryptorMessageProto *msg)
+      : authenticator_(authenticator), request_(request), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    using namespace ::hadoop::hdfs;
+    std::string response;
+    Status status = authenticator_->EvaluateResponse(*request_, &response);
+    msg_->Clear();
+    if (status.ok()) {
+      // TODO: Handle encryption scheme
+      msg_->set_payload(response);
+      msg_->set_status(
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+    } else {
+      msg_->set_status(
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR);
+    }
+    next(Status::OK());
+  }
+
+private:
+  DigestMD5Authenticator *authenticator_;
+  const std::string *request_;
+  hadoop::hdfs::DataTransferEncryptorMessageProto *msg_;
+};
+
+template <class Stream>
+struct DataTransferSaslStream<Stream>::ReadSaslMessage
+    : continuation::Continuation {
+  ReadSaslMessage(Stream *stream, std::string *data)
+      : stream_(stream), data_(data), read_pb_(stream, &resp_) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler = [this, next](const Status &status) {
+      if (status.ok()) {
+        Status new_stat =
+            DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_);
+        next(new_stat);
+      } else {
+        next(status);
+      }
+    };
+    read_pb_.Run(handler);
+  }
+
+private:
+  Stream *stream_;
+  std::string *data_;
+  hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
+  continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
+};
+
+template <class Stream>
+template <class Handler>
+void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
+  using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
+  using ::hdfs::continuation::Write;
+  using ::hdfs::continuation::WriteDelimitedPBMessage;
+
+  static const int kMagicNumber = htonl(kDataTransferSasl);
+  static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer(
+      reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber));
+
+  struct State {
+    DataTransferEncryptorMessageProto req0;
+    std::string resp0;
+    DataTransferEncryptorMessageProto req1;
+    std::string resp1;
+    Stream *stream;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+  s->stream = stream_;
+
+  DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
+
+  m->Push(Write(stream_, kMagicNumberBuffer))
+      .Push(WriteDelimitedPBMessage(stream_, &s->req0))
+      .Push(new ReadSaslMessage(stream_, &s->resp0))
+      .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
+      .Push(WriteDelimitedPBMessage(stream_, &s->req1))
+      .Push(new ReadSaslMessage(stream_, &s->resp1));
+  m->Run([next](const Status &status, const State &) { next(status); });
+}
+
+template <class Stream>
+template <class MutableBufferSequence, class ReadHandler>
+void DataTransferSaslStream<Stream>::async_read_some(
+    const MutableBufferSequence &buffers, ReadHandler &&handler) {
+  stream_->async_read_some(buffers, handler);
+}
+
+template <class Stream>
+template <typename ConstBufferSequence, typename WriteHandler>
+void DataTransferSaslStream<Stream>::async_write_some(
+    const ConstBufferSequence &buffers, WriteHandler &&handler) {
+  stream_->async_write_some(buffers, handler);
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
new file mode 100644
index 0000000..68bc4ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "block_reader.h"
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset) {
+  using namespace hadoop::hdfs;
+  using namespace hadoop::common;
+  BaseHeaderProto *base_h = new BaseHeaderProto();
+  base_h->set_allocated_block(new ExtendedBlockProto(*block));
+  if (token) {
+    base_h->set_allocated_token(new TokenProto(*token));
+  }
+  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+  h->set_clientname(client_name);
+  h->set_allocated_baseheader(base_h);
+
+  OpReadBlockProto p;
+  p.set_allocated_header(h);
+  p.set_offset(offset);
+  p.set_len(length);
+  p.set_sendchecksums(verify_checksum);
+  // TODO: p.set_allocated_cachingstrategy();
+  return p;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
new file mode 100644
index 0000000..68ea6ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+
+#include "datatransfer.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/buffers_iterator.hpp>
+#include <asio/streambuf.hpp>
+#include <asio/write.hpp>
+
+#include <arpa/inet.h>
+
+#include <future>
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset);
+
+template <class Stream>
+template <class ConnectHandler>
+void RemoteBlockReader<Stream>::async_connect(
+    const std::string &client_name, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset, const ConnectHandler &handler) {
+  // The total number of bytes that we need to transfer from the DN is
+  // the amount that the user wants (bytesToRead), plus the padding at
+  // the beginning in order to chunk-align. Note that the DN may elect
+  // to send more than this amount if the read starts/ends mid-chunk.
+  bytes_to_read_ = length;
+
+  struct State {
+    std::string header;
+    hadoop::hdfs::OpReadBlockProto request;
+    hadoop::hdfs::BlockOpResponseProto response;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+
+  s->header.insert(s->header.begin(),
+                   {0, kDataTransferVersion, Operation::kReadBlock});
+  s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
+                                        token, block, length, offset));
+
+  auto read_pb_message =
+      new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
+          stream_, &s->response);
+
+  m->Push(continuation::Write(stream_, asio::buffer(s->header)))
+      .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
+      .Push(read_pb_message);
+
+  m->Run([this, handler, offset](const Status &status, const State &s) {
+    Status stat = status;
+    if (stat.ok()) {
+      const auto &resp = s.response;
+      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+        if (resp.has_readopchecksuminfo()) {
+          const auto &checksum_info = resp.readopchecksuminfo();
+          chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+        }
+        state_ = kReadPacketHeader;
+      } else {
+        stat = Status::Error(s.response.message().c_str());
+      }
+    }
+    handler(stat);
+  });
+}
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPacketHeader
+    : continuation::Continuation {
+  ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    parent_->packet_data_read_bytes_ = 0;
+    parent_->packet_len_ = 0;
+    auto handler = [next, this](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent_->packet_len_ = packet_length();
+        parent_->header_.Clear();
+        bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+                                                 header_length());
+        assert(v && "Failed to parse the header");
+        parent_->state_ = kReadChecksum;
+      }
+      next(status);
+    };
+
+    asio::async_read(*parent_->stream_, asio::buffer(buf_),
+                     std::bind(&ReadPacketHeader::CompletionHandler, this,
+                               std::placeholders::_1, std::placeholders::_2),
+                     handler);
+  }
+
+private:
+  static const size_t kMaxHeaderSize = 512;
+  static const size_t kPayloadLenOffset = 0;
+  static const size_t kPayloadLenSize = sizeof(int);
+  static const size_t kHeaderLenOffset = 4;
+  static const size_t kHeaderLenSize = sizeof(short);
+  static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+  RemoteBlockReader<Stream> *parent_;
+  std::array<char, kMaxHeaderSize> buf_;
+
+  size_t packet_length() const {
+    return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
+  }
+
+  size_t header_length() const {
+    return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+  }
+
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    } else if (transferred < kHeaderStart) {
+      return kHeaderStart - transferred;
+    } else {
+      return kHeaderStart + header_length() - transferred;
+    }
+  }
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
+  ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    auto parent = parent_;
+    if (parent->state_ != kReadChecksum) {
+      next(Status::OK());
+      return;
+    }
+
+    auto handler = [parent, next](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent->state_ =
+            parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
+      }
+      next(status);
+    };
+    parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
+                             parent->header_.datalen());
+    asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
+                     handler);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
+  ReadPadding(RemoteBlockReader<Stream> *parent)
+      : parent_(parent), padding_(parent->chunk_padding_bytes_),
+        bytes_transferred_(std::make_shared<size_t>(0)),
+        read_data_(new ReadData<asio::mutable_buffers_1>(
+            parent, bytes_transferred_, asio::buffer(padding_))) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+      next(Status::OK());
+      return;
+    }
+
+    auto h = [next, this](const Status &status) {
+      if (status.ok()) {
+        assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
+               parent_->chunk_padding_bytes_);
+        parent_->chunk_padding_bytes_ = 0;
+        parent_->state_ = kReadData;
+      }
+      next(status);
+    };
+    read_data_->Run(h);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+  std::vector<char> padding_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  std::shared_ptr<continuation::Continuation> read_data_;
+  ReadPadding(const ReadPadding &) = delete;
+  ReadPadding &operator=(const ReadPadding &) = delete;
+};
+
+template <class Stream>
+template <class MutableBufferSequence>
+struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
+  ReadData(RemoteBlockReader<Stream> *parent,
+           std::shared_ptr<size_t> bytes_transferred,
+           const MutableBufferSequence &buf)
+      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next, this](const asio::error_code &ec, size_t transferred) {
+          Status status;
+          if (ec) {
+            status = Status(ec.value(), ec.message().c_str());
+          }
+          *bytes_transferred_ += transferred;
+          parent_->bytes_to_read_ -= transferred;
+          parent_->packet_data_read_bytes_ += transferred;
+          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+            parent_->state_ = kReadPacketHeader;
+          }
+          next(status);
+        };
+
+    auto data_len =
+        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+    async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
+               handler);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  MutableBufferSequence buf_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
+  AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->bytes_to_read_ > 0) {
+      next(Status::OK());
+      return;
+    }
+
+    auto m =
+        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+    m->state().set_status(parent_->options_.verify_checksum
+                              ? hadoop::hdfs::Status::CHECKSUM_OK
+                              : hadoop::hdfs::Status::SUCCESS);
+
+    m->Push(
+        continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
+
+    m->Run([this, next](const Status &status,
+                        const hadoop::hdfs::ClientReadStatusProto &) {
+      if (status.ok()) {
+        parent_->state_ = RemoteBlockReader<Stream>::kFinished;
+      }
+      next(status);
+    });
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+template <class MutableBufferSequence, class ReadHandler>
+void RemoteBlockReader<Stream>::async_read_some(
+    const MutableBufferSequence &buffers, const ReadHandler &handler) {
+  assert(state_ != kOpen && "Not connected");
+
+  struct State {
+    std::shared_ptr<size_t> bytes_transferred;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+  m->Push(new ReadPacketHeader(this))
+      .Push(new ReadChecksum(this))
+      .Push(new ReadPadding(this))
+      .Push(new ReadData<MutableBufferSequence>(
+          this, m->state().bytes_transferred, buffers))
+      .Push(new AckRead(this));
+
+  auto self = this->shared_from_this();
+  m->Run([self, handler](const Status &status, const State &state) {
+    handler(status, *state.bytes_transferred);
+  });
+}
+
+template <class Stream>
+template <class MutableBufferSequence>
+size_t
+RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
+                                     Status *status) {
+  size_t transferred = 0;
+  auto done = std::make_shared<std::promise<void>>();
+  auto future = done->get_future();
+  async_read_some(buffers,
+                  [status, &transferred, done](const Status &stat, size_t t) {
+                    *status = stat;
+                    transferred = t;
+                    done->set_value();
+                  });
+  future.wait();
+  return transferred;
+}
+
+template <class Stream>
+Status RemoteBlockReader<Stream>::connect(
+    const std::string &client_name, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  async_connect(client_name, token, block, length, offset,
+                [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
new file mode 100644
index 0000000..aa3951c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
@@ -0,0 +1,3 @@
+include_directories(${OPENSSL_INCLUDE_DIRS})
+add_library(rpc rpc_connection.cc rpc_engine.cc)
+add_dependencies(rpc proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
new file mode 100644
index 0000000..8c4130f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rpc_engine.h"
+
+#include "RpcHeader.pb.h"
+#include "ProtobufRpcEngine.pb.h"
+#include "IpcConnectionContext.pb.h"
+
+#include "common/logging.h"
+#include "common/util.h"
+
+#include <asio/read.hpp>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+namespace hdfs {
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+using namespace ::hadoop::common;
+using namespace ::std::placeholders;
+
+static void
+ConstructPacket(std::string *res,
+                std::initializer_list<const pb::MessageLite *> headers,
+                const std::string *request) {
+  int len = 0;
+  std::for_each(
+      headers.begin(), headers.end(),
+      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
+  if (request) {
+    len += pbio::CodedOutputStream::VarintSize32(request->size()) +
+           request->size();
+  }
+
+  int net_len = htonl(len);
+  res->reserve(res->size() + sizeof(net_len) + len);
+
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
+
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+
+  std::for_each(
+      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
+        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
+        buf = v->SerializeWithCachedSizesToArray(buf);
+      });
+
+  if (request) {
+    buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
+    buf = os.WriteStringToArray(*request, buf);
+  }
+}
+
+static void SetRequestHeader(RpcEngine *engine, int call_id,
+                             const std::string &method_name,
+                             RpcRequestHeaderProto *rpc_header,
+                             RequestHeaderProto *req_header) {
+  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
+  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  rpc_header->set_callid(call_id);
+  rpc_header->set_clientid(engine->client_name());
+
+  req_header->set_methodname(method_name);
+  req_header->set_declaringclassprotocolname(engine->protocol_name());
+  req_header->set_clientprotocolversion(engine->protocol_version());
+}
+
+RpcConnection::~RpcConnection() {}
+
+RpcConnection::Request::Request(RpcConnection *parent,
+                                const std::string &method_name,
+                                const std::string &request, Handler &&handler)
+    : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
+      handler_(std::move(handler)) {
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
+                   &req_header);
+  ConstructPacket(&payload_, {&rpc_header, &req_header}, &request);
+}
+
+RpcConnection::Request::Request(RpcConnection *parent,
+                                const std::string &method_name,
+                                const pb::MessageLite *request,
+                                Handler &&handler)
+    : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
+      handler_(std::move(handler)) {
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
+                   &req_header);
+  ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr);
+}
+
+void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is,
+                                               const Status &status) {
+  handler_(is, status);
+}
+
+RpcConnection::RpcConnection(RpcEngine *engine)
+    : engine_(engine), resp_state_(kReadLength), resp_length_(0) {}
+
+::asio::io_service &RpcConnection::io_service() {
+  return engine_->io_service();
+}
+
+void RpcConnection::Start() {
+  io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
+                              ::asio::error_code(), 0));
+}
+
+void RpcConnection::FlushPendingRequests() {
+  io_service().post([this]() {
+    if (!request_over_the_wire_) {
+      OnSendCompleted(::asio::error_code(), 0);
+    }
+  });
+}
+
+void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
+  /* assumed to be called from a context that has already acquired the
+   * engine_state_lock */
+  pbio::ArrayInputStream ar(&data[0], data.size());
+  pbio::CodedInputStream in(&ar);
+  in.PushLimit(data.size());
+  RpcResponseHeaderProto h;
+  ReadDelimitedPBMessage(&in, &h);
+
+  auto req = RemoveFromRunningQueue(h.callid());
+  if (!req) {
+    LOG_WARN() << "RPC response with Unknown call id " << h.callid();
+    return;
+  }
+
+  Status stat;
+  if (h.has_exceptionclassname()) {
+    stat =
+        Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
+  }
+  req->OnResponseArrived(&in, stat);
+}
+
+void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
+                                     const ::asio::error_code &ec) {
+  if (ec.value() == asio::error::operation_aborted) {
+    return;
+  }
+
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+  auto r = RemoveFromRunningQueue(req->call_id());
+  if (!r) {
+    // The RPC might have been finished and removed from the queue
+    return;
+  }
+
+  Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out));
+
+  r->OnResponseArrived(nullptr, stat);
+}
+
+std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
+  static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
+                                          RpcEngine::kRpcVersion, 0, 0};
+  auto res =
+      std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader));
+
+  RpcRequestHeaderProto h;
+  h.set_rpckind(RPC_PROTOCOL_BUFFER);
+  h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  h.set_callid(RpcEngine::kCallIdConnectionContext);
+  h.set_clientid(engine_->client_name());
+
+  IpcConnectionContextProto handshake;
+  handshake.set_protocol(engine_->protocol_name());
+  ConstructPacket(res.get(), {&h, &handshake}, nullptr);
+  return res;
+}
+
+void RpcConnection::AsyncRpc(
+    const std::string &method_name, const ::google::protobuf::MessageLite *req,
+    std::shared_ptr<::google::protobuf::MessageLite> resp,
+    const Callback &handler) {
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  auto wrapped_handler =
+      [resp, handler](pbio::CodedInputStream *is, const Status &status) {
+        if (status.ok()) {
+          ReadDelimitedPBMessage(is, resp.get());
+        }
+        handler(status);
+      };
+
+  auto r = std::make_shared<Request>(this, method_name, req,
+                                     std::move(wrapped_handler));
+  pending_requests_.push_back(r);
+  FlushPendingRequests();
+}
+
+void RpcConnection::AsyncRawRpc(const std::string &method_name,
+                                const std::string &req,
+                                std::shared_ptr<std::string> resp,
+                                Callback &&handler) {
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  auto wrapped_handler =
+      [this, resp, handler](pbio::CodedInputStream *is, const Status &status) {
+        if (status.ok()) {
+          uint32_t size = 0;
+          is->ReadVarint32(&size);
+          auto limit = is->PushLimit(size);
+          is->ReadString(resp.get(), limit);
+          is->PopLimit(limit);
+        }
+        handler(status);
+      };
+
+  auto r = std::make_shared<Request>(this, method_name, req,
+                                     std::move(wrapped_handler));
+  pending_requests_.push_back(r);
+  FlushPendingRequests();
+}
+
+void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
+  Shutdown();
+  std::vector<std::shared_ptr<Request>> requests;
+  std::transform(requests_on_fly_.begin(), requests_on_fly_.end(),
+                 std::back_inserter(requests),
+                 std::bind(&RequestOnFlyMap::value_type::second, _1));
+  requests_on_fly_.clear();
+  requests.insert(requests.end(),
+                  std::make_move_iterator(pending_requests_.begin()),
+                  std::make_move_iterator(pending_requests_.end()));
+  pending_requests_.clear();
+  for (const auto &req : requests) {
+    req->OnResponseArrived(nullptr, ToStatus(ec));
+  }
+}
+
+std::shared_ptr<RpcConnection::Request>
+RpcConnection::RemoveFromRunningQueue(int call_id) {
+  auto it = requests_on_fly_.find(call_id);
+  if (it == requests_on_fly_.end()) {
+    return std::shared_ptr<Request>();
+  }
+
+  auto req = it->second;
+  requests_on_fly_.erase(it);
+  return req;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
new file mode 100644
index 0000000..439a730
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_CONNECTION_H_
+#define LIB_RPC_RPC_CONNECTION_H_
+
+#include "rpc_engine.h"
+
+#include "common/logging.h"
+#include "common/util.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+
+namespace hdfs {
+
+template <class NextLayer> class RpcConnectionImpl : public RpcConnection {
+public:
+  RpcConnectionImpl(RpcEngine *engine);
+  virtual void Connect(const ::asio::ip::tcp::endpoint &server,
+                       Callback &&handler) override;
+  virtual void Handshake(Callback &&handler) override;
+  virtual void Shutdown() override;
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+
+  NextLayer &next_layer() { return next_layer_; }
+private:
+  const Options options_;
+  NextLayer next_layer_;
+};
+
+template <class NextLayer>
+RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
+    : RpcConnection(engine), options_(engine->options()),
+      next_layer_(engine->io_service()) {}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::Connect(
+    const ::asio::ip::tcp::endpoint &server, Callback &&handler) {
+  next_layer_.async_connect(server,
+      [handler](const ::asio::error_code &ec) {
+        handler(ToStatus(ec));
+      });
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::Handshake(Callback &&handler) {
+  auto handshake_packet = PrepareHandshakePacket();
+  ::asio::async_write(
+      next_layer_, asio::buffer(*handshake_packet),
+      [handshake_packet, handler](const ::asio::error_code &ec, size_t) {
+        handler(ToStatus(ec));
+      });
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  request_over_the_wire_.reset();
+  if (ec) {
+    // Current RPC has failed -- abandon the
+    // connection and do proper clean up
+    ClearAndDisconnect(ec);
+    return;
+  }
+
+  if (!pending_requests_.size()) {
+    return;
+  }
+
+  std::shared_ptr<Request> req = pending_requests_.front();
+  pending_requests_.erase(pending_requests_.begin());
+  requests_on_fly_[req->call_id()] = req;
+  request_over_the_wire_ = req;
+
+  req->timer().expires_from_now(
+      std::chrono::milliseconds(options_.rpc_timeout));
+  req->timer().async_wait(std::bind(
+      &RpcConnectionImpl<NextLayer>::HandleRpcTimeout, this, req, _1));
+
+  asio::async_write(
+      next_layer_, asio::buffer(req->payload()),
+      std::bind(&RpcConnectionImpl<NextLayer>::OnSendCompleted, this, _1, _2));
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  switch (ec.value()) {
+  case 0:
+    // No errors
+    break;
+  case asio::error::operation_aborted:
+    // The event loop has been shut down. Ignore the error.
+    return;
+  default:
+    LOG_WARN() << "Network error during RPC: " << ec.message();
+    ClearAndDisconnect(ec);
+    return;
+  }
+
+  if (resp_state_ == kReadLength) {
+    resp_state_ = kReadContent;
+    auto buf = ::asio::buffer(reinterpret_cast<char *>(&resp_length_),
+                              sizeof(resp_length_));
+    asio::async_read(next_layer_, buf,
+                     std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted,
+                               this, _1, _2));
+
+  } else if (resp_state_ == kReadContent) {
+    resp_state_ = kParseResponse;
+    resp_length_ = ntohl(resp_length_);
+    resp_data_.resize(resp_length_);
+    asio::async_read(next_layer_, ::asio::buffer(resp_data_),
+                     std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted,
+                               this, _1, _2));
+
+  } else if (resp_state_ == kParseResponse) {
+    resp_state_ = kReadLength;
+    HandleRpcResponse(resp_data_);
+    resp_data_.clear();
+    Start();
+  }
+}
+
+template <class NextLayer> void RpcConnectionImpl<NextLayer>::Shutdown() {
+  next_layer_.cancel();
+  next_layer_.close();
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
new file mode 100644
index 0000000..83721a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rpc_engine.h"
+#include "rpc_connection.h"
+#include "common/util.h"
+
+#include <openssl/rand.h>
+
+#include <sstream>
+#include <future>
+
+namespace hdfs {
+
+RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
+                     const std::string &client_name, const char *protocol_name,
+                     int protocol_version)
+    : io_service_(io_service), options_(options), client_name_(client_name),
+      protocol_name_(protocol_name), protocol_version_(protocol_version),
+      call_id_(0) {
+}
+
+void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server,
+                        const std::function<void(const Status &)> &handler) {
+  conn_.reset(new RpcConnectionImpl<::asio::ip::tcp::socket>(this));
+  conn_->Connect(server, [this, handler](const Status &stat) {
+    if (!stat.ok()) {
+      handler(stat);
+    } else {
+      conn_->Handshake([handler](const Status &s) { handler(s); });
+    }
+  });
+}
+
+void RpcEngine::Start() { conn_->Start(); }
+
+void RpcEngine::Shutdown() {
+  io_service_->post([this]() { conn_->Shutdown(); });
+}
+
+void RpcEngine::TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn) {
+  conn_.reset(conn->release());
+}
+
+void RpcEngine::AsyncRpc(
+    const std::string &method_name, const ::google::protobuf::MessageLite *req,
+    const std::shared_ptr<::google::protobuf::MessageLite> &resp,
+    const std::function<void(const Status &)> &handler) {
+  conn_->AsyncRpc(method_name, req, resp, handler);
+}
+
+Status
+RpcEngine::Rpc(const std::string &method_name,
+               const ::google::protobuf::MessageLite *req,
+               const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  AsyncRpc(method_name, req, resp,
+           [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
+                         std::shared_ptr<std::string> resp) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  conn_->AsyncRawRpc(method_name, req, resp,
+                     [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+std::string RpcEngine::GetRandomClientName() {
+  unsigned char buf[6] = {
+      0,
+  };
+  RAND_pseudo_bytes(buf, sizeof(buf));
+
+  std::stringstream ss;
+  ss << "libhdfs++_"
+     << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
+  return ss.str();
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
new file mode 100644
index 0000000..ee04fd5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_ENGINE_H_
+#define LIB_RPC_RPC_ENGINE_H_
+
+#include "libhdfspp/options.h"
+#include "libhdfspp/status.h"
+
+#include <google/protobuf/message_lite.h>
+
+#include <asio/ip/tcp.hpp>
+#include <asio/deadline_timer.hpp>
+
+#include <atomic>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+#include <mutex>
+
+namespace hdfs {
+
+class RpcEngine;
+class RpcConnection {
+public:
+  typedef std::function<void(const Status &)> Callback;
+  virtual ~RpcConnection();
+  RpcConnection(RpcEngine *engine);
+  virtual void Connect(const ::asio::ip::tcp::endpoint &server,
+                       Callback &&handler) = 0;
+  virtual void Handshake(Callback &&handler) = 0;
+  virtual void Shutdown() = 0;
+
+  void Start();
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const Callback &handler);
+
+  void AsyncRawRpc(const std::string &method_name, const std::string &request,
+                   std::shared_ptr<std::string> resp, Callback &&handler);
+
+protected:
+  class Request;
+  RpcEngine *const engine_;
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+
+  ::asio::io_service &io_service();
+  std::shared_ptr<std::string> PrepareHandshakePacket();
+  static std::string
+  SerializeRpcRequest(const std::string &method_name,
+                      const ::google::protobuf::MessageLite *req);
+  void HandleRpcResponse(const std::vector<char> &data);
+  void HandleRpcTimeout(std::shared_ptr<Request> req,
+                        const ::asio::error_code &ec);
+  void FlushPendingRequests();
+  void ClearAndDisconnect(const ::asio::error_code &ec);
+  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
+
+  enum ResponseState {
+    kReadLength,
+    kReadContent,
+    kParseResponse,
+  } resp_state_;
+  unsigned resp_length_;
+  std::vector<char> resp_data_;
+
+  class Request {
+  public:
+    typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
+                               const Status &status)> Handler;
+    Request(RpcConnection *parent, const std::string &method_name,
+            const std::string &request, Handler &&callback);
+    Request(RpcConnection *parent, const std::string &method_name,
+            const ::google::protobuf::MessageLite *request, Handler &&callback);
+
+    int call_id() const { return call_id_; }
+    ::asio::deadline_timer &timer() { return timer_; }
+    const std::string &payload() const { return payload_; }
+    void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
+                           const Status &status);
+
+  private:
+    const int call_id_;
+    ::asio::deadline_timer timer_;
+    std::string payload_;
+    Handler handler_;
+  };
+
+  // The request being sent over the wire
+  std::shared_ptr<Request> request_over_the_wire_;
+  // Requests to be sent over the wire
+  std::vector<std::shared_ptr<Request>> pending_requests_;
+  // Requests that are waiting for responses
+  typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
+  RequestOnFlyMap requests_on_fly_;
+  // Lock for mutable parts of this class that need to be thread safe
+  std::mutex engine_state_lock_;
+};
+
+class RpcEngine {
+public:
+  enum { kRpcVersion = 9 };
+  enum {
+    kCallIdAuthorizationFailed = -1,
+    kCallIdInvalid = -2,
+    kCallIdConnectionContext = -3,
+    kCallIdPing = -4
+  };
+
+  RpcEngine(::asio::io_service *io_service, const Options &options,
+            const std::string &client_name, const char *protocol_name,
+            int protocol_version);
+
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                const std::shared_ptr<::google::protobuf::MessageLite> &resp,
+                const std::function<void(const Status &)> &handler);
+
+  Status Rpc(const std::string &method_name,
+             const ::google::protobuf::MessageLite *req,
+             const std::shared_ptr<::google::protobuf::MessageLite> &resp);
+  /**
+   * Send raw bytes as RPC payload. This is intended to be used in JNI
+   * bindings only.
+   **/
+  Status RawRpc(const std::string &method_name, const std::string &req,
+                std::shared_ptr<std::string> resp);
+  void Connect(const ::asio::ip::tcp::endpoint &server,
+               const std::function<void(const Status &)> &handler);
+  void Start();
+  void Shutdown();
+  void TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn);
+
+  int NextCallId() { return ++call_id_; }
+
+  const std::string &client_name() const { return client_name_; }
+  const std::string &protocol_name() const { return protocol_name_; }
+  int protocol_version() const { return protocol_version_; }
+  ::asio::io_service &io_service() { return *io_service_; }
+  const Options &options() { return options_; }
+  static std::string GetRandomClientName();
+
+private:
+  ::asio::io_service *io_service_;
+  Options options_;
+  const std::string client_name_;
+  const std::string protocol_name_;
+  const int protocol_version_;
+  std::atomic_int call_id_;
+  std::unique_ptr<RpcConnection> conn_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
new file mode 100644
index 0000000..eca878e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(test_common OBJECT mock_connection.cc)
+
+set(PROTOBUF_IMPORT_DIRS ${PROTO_HADOOP_TEST_DIR})
+
+protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS
+  ${PROTO_HADOOP_TEST_DIR}/test.proto
+  ${PROTO_HADOOP_TEST_DIR}/test_rpc_service.proto
+)
+
+add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
+target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+add_test(remote_block_reader remote_block_reader_test)
+
+add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
+target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+add_test(sasl_digest_md5 sasl_digest_md5_test)
+
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+add_test(inputstream inputstream_test)
+
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>)
+target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+add_test(rpc_engine rpc_engine_test)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
new file mode 100644
index 0000000..aa95256
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/filesystem.h"
+#include <gmock/gmock.h>
+
+using hadoop::common::TokenProto;
+using hadoop::hdfs::DatanodeInfoProto;
+using hadoop::hdfs::DatanodeIDProto;
+using hadoop::hdfs::ExtendedBlockProto;
+using hadoop::hdfs::LocatedBlockProto;
+using hadoop::hdfs::LocatedBlocksProto;
+
+using ::testing::_;
+using ::testing::InvokeArgument;
+using ::testing::Return;
+
+using namespace hdfs;
+
+namespace hdfs {
+
+class MockReader {
+public:
+  virtual ~MockReader() {}
+  MOCK_METHOD2(
+      async_read_some,
+      void(const asio::mutable_buffers_1 &,
+           const std::function<void(const Status &, size_t transferred)> &));
+
+  MOCK_METHOD6(async_connect,
+               void(const std::string &, TokenProto *, ExtendedBlockProto *,
+                    uint64_t, uint64_t,
+                    const std::function<void(const Status &)> &));
+};
+
+template <class Trait> struct MockBlockReaderTrait {
+  typedef MockReader Reader;
+  struct State {
+    MockReader reader_;
+    size_t transferred_;
+    Reader *reader() { return &reader_; }
+    size_t *transferred() { return &transferred_; }
+    const size_t *transferred() const { return &transferred_; }
+  };
+
+  static continuation::Pipeline<State> *
+  CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) {
+    auto m = continuation::Pipeline<State>::Create();
+    *m->state().transferred() = 0;
+    Trait::InitializeMockReader(m->state().reader());
+    return m;
+  }
+};
+}
+
+TEST(InputStreamTest, TestReadSingleTrunk) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  DatanodeInfoProto dn;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  Options options;
+  FileSystemImpl fs(&io_service, options);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestReadMultipleTrunk) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  DatanodeInfoProto dn;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  Options options;
+  FileSystemImpl fs(&io_service, options);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          .Times(4)
+          .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestReadError) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  DatanodeInfoProto dn;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  Options options;
+  FileSystemImpl fs(&io_service, options);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestExcludeDataNode) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto *block = blocks.add_blocks();
+  ExtendedBlockProto *b = block->mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  DatanodeInfoProto *di = block->add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  Options options;
+  FileSystemImpl fs(&io_service, options);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+    }
+  };
+
+
+  std::set<std::string> excluded_dn({"foo"});
+  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn,
+      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
+  ASSERT_EQ(0UL, read);
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc
new file mode 100644
index 0000000..93a3099
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mock_connection.h"
+
+namespace hdfs {
+
+MockConnectionBase::MockConnectionBase(::asio::io_service *io_service)
+    : io_service_(io_service)
+{}
+
+MockConnectionBase::~MockConnectionBase() {}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
new file mode 100644
index 0000000..8c0ef8c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_
+#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_
+
+#include <asio/error_code.hpp>
+#include <asio/buffer.hpp>
+#include <asio/streambuf.hpp>
+#include <asio/io_service.hpp>
+
+#include <gmock/gmock.h>
+
+namespace hdfs {
+
+class MockConnectionBase {
+public:
+  MockConnectionBase(::asio::io_service *io_service);
+  virtual ~MockConnectionBase();
+  typedef std::pair<asio::error_code, std::string> ProducerResult;
+  template <class MutableBufferSequence, class Handler>
+  void async_read_some(const MutableBufferSequence &buf, Handler &&handler) {
+    if (produced_.size() == 0) {
+      ProducerResult r = Produce();
+      if (r.first) {
+        io_service_->post(std::bind(handler, r.first, 0));
+        return;
+      }
+      asio::mutable_buffers_1 data = produced_.prepare(r.second.size());
+      asio::buffer_copy(data, asio::buffer(r.second));
+      produced_.commit(r.second.size());
+    }
+
+    size_t len = std::min(asio::buffer_size(buf), produced_.size());
+    asio::buffer_copy(buf, produced_.data());
+    produced_.consume(len);
+    io_service_->post(std::bind(handler, asio::error_code(), len));
+  }
+
+  template <class ConstBufferSequence, class Handler>
+  void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
+    // CompletionResult res = OnWrite(buf);
+    io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf)));
+  }
+
+protected:
+  virtual ProducerResult Produce() = 0;
+  ::asio::io_service *io_service_;
+
+private:
+  asio::streambuf produced_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea310d75/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
new file mode 100644
index 0000000..388a106
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mock_connection.h"
+
+#include "datatransfer.pb.h"
+#include "common/util.h"
+#include "reader/block_reader.h"
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+using namespace hdfs;
+
+using ::hadoop::common::TokenProto;
+using ::hadoop::hdfs::BlockOpResponseProto;
+using ::hadoop::hdfs::ChecksumProto;
+using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
+using ::hadoop::hdfs::ExtendedBlockProto;
+using ::hadoop::hdfs::PacketHeaderProto;
+using ::hadoop::hdfs::ReadOpChecksumInfoProto;
+
+using ::asio::buffer;
+using ::asio::error_code;
+using ::asio::mutable_buffers_1;
+using ::testing::Return;
+using std::make_pair;
+using std::string;
+
+namespace pb = ::google::protobuf;
+namespace pbio = pb::io;
+
+namespace hdfs {
+
+class MockDNConnection : public MockConnectionBase {
+public:
+  MockDNConnection(::asio::io_service &io_service)
+      : MockConnectionBase(&io_service) {}
+  MOCK_METHOD0(Produce, ProducerResult());
+};
+}
+
+static inline string ToDelimitedString(const pb::MessageLite *msg) {
+  string res;
+  res.reserve(hdfs::DelimitedPBMessageSize(msg));
+  pbio::StringOutputStream os(&res);
+  pbio::CodedOutputStream out(&os);
+  out.WriteVarint32(msg->ByteSize());
+  msg->SerializeToCodedStream(&out);
+  return res;
+}
+
+static inline std::pair<error_code, string> Produce(const std::string &s) {
+  return make_pair(error_code(), s);
+}
+
+static inline std::pair<error_code, string>
+ProducePacket(const std::string &data, const std::string &checksum,
+              int offset_in_block, int seqno, bool last_packet) {
+  PacketHeaderProto proto;
+  proto.set_datalen(data.size());
+  proto.set_offsetinblock(offset_in_block);
+  proto.set_seqno(seqno);
+  proto.set_lastpacketinblock(last_packet);
+
+  char prefix[6];
+  *reinterpret_cast<unsigned *>(prefix) =
+      htonl(data.size() + checksum.size() + sizeof(int));
+  *reinterpret_cast<short *>(prefix + sizeof(int)) = htons(proto.ByteSize());
+  std::string payload(prefix, sizeof(prefix));
+  payload.reserve(payload.size() + proto.ByteSize() + checksum.size() +
+                  data.size());
+  proto.AppendToString(&payload);
+  payload += checksum;
+  payload += data;
+  return std::make_pair(error_code(), std::move(payload));
+}
+
+template <class Stream = MockDNConnection, class Handler>
+static std::shared_ptr<RemoteBlockReader<Stream>>
+ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
+            uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
+            const Handler &handler) {
+  BlockReaderOptions options;
+  auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
+  Status result;
+  reader->async_connect("libhdfs++", token, &block, length, offset,
+                        [buf, reader, handler](const Status &stat) {
+                          if (!stat.ok()) {
+                            handler(stat, 0);
+                          } else {
+                            reader->async_read_some(buf, handler);
+                          }
+                        });
+  return reader;
+}
+
+TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
+  static const size_t kChunkSize = 512;
+  static const string kChunkData(kChunkSize, 'a');
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
+  BlockOpResponseProto block_op_resp;
+
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+  EXPECT_CALL(conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+  
+  std::string data(kChunkSize, 0);
+  ReadContent(&conn, nullptr, block, kChunkSize, 0,
+              buffer(const_cast<char *>(data.c_str()), data.size()),
+              [&data, &io_service](const Status &stat, size_t transferred) {
+                ASSERT_TRUE(stat.ok());
+                ASSERT_EQ(kChunkSize, transferred);
+                ASSERT_EQ(kChunkData, data);
+                io_service.stop();
+              });
+  io_service.run();
+}
+
+TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
+  static const size_t kChunkSize = 1024;
+  static const size_t kLength = kChunkSize / 4 * 3;
+  static const size_t kOffset = kChunkSize / 4;
+  static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
+
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
+  BlockOpResponseProto block_op_resp;
+  ReadOpChecksumInfoProto *checksum_info =
+      block_op_resp.mutable_readopchecksuminfo();
+  checksum_info->set_chunkoffset(0);
+  ChecksumProto *checksum = checksum_info->mutable_checksum();
+  checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL);
+  checksum->set_bytesperchecksum(512);
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+  EXPECT_CALL(conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  string data(kLength, 0);
+  ReadContent(&conn, nullptr, block, data.size(), kOffset,
+              buffer(const_cast<char *>(data.c_str()), data.size()),
+              [&data, &io_service](const Status &stat, size_t transferred) {
+                ASSERT_TRUE(stat.ok());
+                ASSERT_EQ(kLength, transferred);
+                ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+                io_service.stop();
+              });
+  io_service.run();
+}
+
+TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
+  static const size_t kChunkSize = 1024;
+  static const string kChunkData(kChunkSize, 'a');
+
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
+  BlockOpResponseProto block_op_resp;
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+  EXPECT_CALL(conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)))
+      .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  string data(kChunkSize, 0);
+  mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
+  BlockReaderOptions options;
+  auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn);
+  Status result;
+  reader->async_connect(
+      "libhdfs++", nullptr, &block, data.size(), 0,
+      [buf, reader, &data, &io_service](const Status &stat) {
+        ASSERT_TRUE(stat.ok());
+        reader->async_read_some(
+            buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
+              ASSERT_TRUE(stat.ok());
+              ASSERT_EQ(kChunkSize, transferred);
+              ASSERT_EQ(kChunkData, data);
+              data.clear();
+              data.resize(kChunkSize);
+              transferred = 0;
+              reader->async_read_some(
+                  buf, [&data,&io_service](const Status &stat, size_t transferred) {
+                    ASSERT_TRUE(stat.ok());
+                    ASSERT_EQ(kChunkSize, transferred);
+                    ASSERT_EQ(kChunkData, data);
+                    io_service.stop();
+                  });
+            });
+      });
+  io_service.run();
+}
+
+TEST(RemoteBlockReaderTest, TestSaslConnection) {
+  static const size_t kChunkSize = 512;
+  static const string kChunkData(kChunkSize, 'a');
+  static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+                                     "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
+                                     "charset=utf-8,algorithm=md5-sess";
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
+  BlockOpResponseProto block_op_resp;
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+  DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1;
+  sasl_resp0.set_status(
+      ::hadoop::hdfs::
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+  sasl_resp0.set_payload(kAuthPayload);
+  sasl_resp1.set_status(
+      ::hadoop::hdfs::
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+
+  EXPECT_CALL(conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
+      .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
+
+  DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  std::string data(kChunkSize, 0);
+  sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service](
+      const Status &s) {
+    ASSERT_TRUE(s.ok());
+    ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
+                buffer(const_cast<char *>(data.c_str()), data.size()),
+                [&data, &io_service](const Status &stat, size_t transferred) {
+                  ASSERT_TRUE(stat.ok());
+                  ASSERT_EQ(kChunkSize, transferred);
+                  ASSERT_EQ(kChunkData, data);
+                  io_service.stop();
+                });
+  });
+  io_service.run();
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}


Mime
View raw message