hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [45/46] hadoop git commit: HDFS-8737. Initial implementation of a Hadoop RPC v9 client. Contributed by Haohui Mai.
Date Sat, 11 Jul 2015 00:10:04 GMT
HDFS-8737. Initial implementation of a Hadoop RPC v9 client. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b9ff2ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b9ff2ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b9ff2ee

Branch: refs/heads/HDFS-8707
Commit: 5b9ff2eef7162738c4d1a27992a28b3cee9a2e4d
Parents: 53c169a
Author: Haohui Mai <wheat9@apache.org>
Authored: Wed Jul 8 13:13:03 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Fri Jul 10 17:08:44 2015 -0700

----------------------------------------------------------------------
 .../src/main/native/libhdfspp/CMakeLists.txt    |  12 +-
 .../native/libhdfspp/include/libhdfspp/status.h |  97 ++++++++
 .../main/native/libhdfspp/lib/CMakeLists.txt    |   2 +
 .../src/main/native/libhdfspp/lib/common/util.h |  58 +++++
 .../native/libhdfspp/lib/proto/CMakeLists.txt   |  21 ++
 .../native/libhdfspp/lib/rpc/CMakeLists.txt     |   3 +
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  | 225 +++++++++++++++++++
 .../native/libhdfspp/lib/rpc/rpc_connection.h   | 149 ++++++++++++
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  98 ++++++++
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  | 160 +++++++++++++
 10 files changed, 824 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
index 2986b88..f4bc8b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
@@ -18,6 +18,9 @@
 
 project (libhdfspp)
 
+find_package(Protobuf REQUIRED)
+find_package(Threads)
+
 add_definitions(-DASIO_STANDALONE -DASIO_CPP11_DATE_TIME)
 
 if(UNIX)
@@ -30,6 +33,13 @@ if(APPLE)
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -Wno-deprecated-declarations")
 endif()
 
-include_directories(third_party/gmock-1.7.0)
+include_directories(
+  include
+  lib
+  ${PROJECT_BINARY_DIR}/lib/proto
+  third_party/asio-1.10.2/include
+  third_party/gmock-1.7.0
+)
 
 add_subdirectory(third_party/gmock-1.7.0)
+add_subdirectory(lib)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
new file mode 100644
index 0000000..9436c8b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_STATUS_H_
+#define LIBHDFSPP_STATUS_H_
+
+#include <string>
+#include <system_error>
+
+namespace hdfs {
+
+class StatusHelper;
+class Status {
+ public:
+  // Create a success status.
+  Status() : state_(NULL) { }
+  ~Status() { delete[] state_; }
+  explicit Status(int code, const char *msg);
+
+  // Copy the specified status.
+  Status(const Status& s);
+  void operator=(const Status& s);
+
+  // Return a success status.
+  static Status OK() { return Status(); }
+  static Status InvalidArgument(const char *msg)
+  { return Status(kInvalidArgument, msg); }
+  static Status ResourceUnavailable(const char *msg)
+  { return Status(kResourceUnavailable, msg); }
+  static Status Unimplemented()
+  { return Status(kUnimplemented, ""); }
+  static Status Exception(const char *expception_class_name, const char *error_message)
+  { return Status(kException, expception_class_name, error_message); }
+
+  // Returns true iff the status indicates success.
+  bool ok() const { return (state_ == NULL); }
+
+  // Return a string representation of this status suitable for printing.
+  // Returns the string "OK" for success.
+  std::string ToString() const;
+
+  int code() const {
+    return (state_ == NULL) ? kOk : static_cast<int>(state_[4]);
+  }
+
+ private:
+  // OK status has a NULL state_.  Otherwise, state_ is a new[] array
+  // of the following form:
+  //    state_[0..3] == length of message
+  //    state_[4]    == code
+  //    state_[5..]  == message
+  const char* state_;
+  friend class StatusHelper;
+
+  enum Code {
+    kOk = 0,
+    kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
+    kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
+    kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
+    kException = 256,
+  };
+
+  explicit Status(int code, const char *msg1, const char *msg2);
+  static const char *CopyState(const char* s);
+  static const char *ConstructState(int code, const char *msg1, const char *msg2);
+};
+
+inline Status::Status(const Status& s) {
+  state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+}
+
+inline void Status::operator=(const Status& s) {
+  // The following condition catches both aliasing (when this == &s),
+  // and the common case where both s and *this are ok.
+  if (state_ != s.state_) {
+    delete[] state_;
+    state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+  }
+}
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
new file mode 100644
index 0000000..7458453
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
@@ -0,0 +1,2 @@
+add_subdirectory(rpc)
+add_subdirectory(proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h
new file mode 100644
index 0000000..ff9f36c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/util.h
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_UTIL_H_
+#define LIB_COMMON_UTIL_H_
+
+#include "libhdfspp/status.h"
+
+#include <asio/error_code.hpp>
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+
+namespace hdfs {
+
+static inline Status ToStatus(const ::asio::error_code &ec) {
+  if (ec) {
+    return Status(ec.value(), ec.message().c_str());
+  } else {
+    return Status::OK();
+  }
+}
+
+static inline int DelimitedPBMessageSize(
+    const ::google::protobuf::MessageLite *msg) {
+  size_t size = msg->ByteSize();
+  return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
+}
+
+static inline void ReadDelimitedPBMessage(
+    ::google::protobuf::io::CodedInputStream *in,
+    ::google::protobuf::MessageLite *msg) {
+  uint32_t size = 0;
+  in->ReadVarint32(&size);
+  auto limit = in->PushLimit(size);
+  msg->ParseFromCodedStream(in);
+  in->PopLimit(limit);
+}
+
+std::string Base64Encode(const std::string &src);
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
new file mode 100644
index 0000000..156a7f4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
@@ -0,0 +1,21 @@
+set(CLIENT_PROTO_DIR ${CMAKE_SOURCE_DIR}/../proto)
+set(COMMON_PROTO_DIR ${CMAKE_SOURCE_DIR}/../../../../../hadoop-common-project/hadoop-common/src/main/proto)
+set(PROTOBUF_IMPORT_DIRS ${CLIENT_PROTO_DIR} ${COMMON_PROTO_DIR})
+
+protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
+  ${CLIENT_PROTO_DIR}/datatransfer.proto
+  ${CLIENT_PROTO_DIR}/ClientDatanodeProtocol.proto
+  ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto
+  ${CLIENT_PROTO_DIR}/acl.proto
+  ${CLIENT_PROTO_DIR}/datatransfer.proto
+  ${CLIENT_PROTO_DIR}/encryption.proto
+  ${CLIENT_PROTO_DIR}/hdfs.proto
+  ${CLIENT_PROTO_DIR}/inotify.proto
+  ${CLIENT_PROTO_DIR}/xattr.proto
+  ${COMMON_PROTO_DIR}/IpcConnectionContext.proto
+  ${COMMON_PROTO_DIR}/ProtobufRpcEngine.proto
+  ${COMMON_PROTO_DIR}/RpcHeader.proto
+  ${COMMON_PROTO_DIR}/Security.proto
+)
+
+add_library(proto ${PROTO_SRCS} ${PROTO_HDRS})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
new file mode 100644
index 0000000..aa3951c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt
@@ -0,0 +1,3 @@
+include_directories(${OPENSSL_INCLUDE_DIRS})
+add_library(rpc rpc_connection.cc rpc_engine.cc)
+add_dependencies(rpc proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
new file mode 100644
index 0000000..4d08bd1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rpc_engine.h"
+
+#include "RpcHeader.pb.h"
+#include "ProtobufRpcEngine.pb.h"
+#include "IpcConnectionContext.pb.h"
+
+#include "common/util.h"
+
+#include <asio/read.hpp>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+namespace hdfs {
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+using namespace ::hadoop::common;
+using namespace ::std::placeholders;
+
+static void
+ConstructPacket(std::string *res,
+                std::initializer_list<const pb::MessageLite *> headers,
+                const std::string *request) {
+  int len = 0;
+  std::for_each(
+      headers.begin(), headers.end(),
+      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
+  if (request) {
+    len += pbio::CodedOutputStream::VarintSize32(request->size()) +
+           request->size();
+  }
+
+  int net_len = htonl(len);
+  res->reserve(res->size() + sizeof(net_len) + len);
+
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
+
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf && "Cannot allocate memory");
+
+  std::for_each(
+      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
+        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
+        buf = v->SerializeWithCachedSizesToArray(buf);
+      });
+
+  if (request) {
+    buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
+    buf = os.WriteStringToArray(*request, buf);
+  }
+}
+
+static void SetRequestHeader(RpcEngine *engine, int call_id,
+                             const std::string &method_name,
+                             RpcRequestHeaderProto *rpc_header,
+                             RequestHeaderProto *req_header) {
+  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
+  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  rpc_header->set_callid(call_id);
+  rpc_header->set_clientid(engine->client_name());
+
+  req_header->set_methodname(method_name);
+  req_header->set_declaringclassprotocolname(engine->protocol_name());
+  req_header->set_clientprotocolversion(engine->protocol_version());
+}
+
+RpcConnection::~RpcConnection() {}
+
+RpcConnection::Request::Request(RpcConnection *parent,
+                                const std::string &method_name,
+                                const std::string &request, Handler &&handler)
+    : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
+      handler_(std::move(handler)) {
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
+                   &req_header);
+  ConstructPacket(&payload_, {&rpc_header, &req_header}, &request);
+}
+
+RpcConnection::Request::Request(RpcConnection *parent,
+                                const std::string &method_name,
+                                const pb::MessageLite *request,
+                                Handler &&handler)
+    : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
+      handler_(std::move(handler)) {
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
+                   &req_header);
+  ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr);
+}
+
+void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is,
+                                               const Status &status) {
+  handler_(is, status);
+}
+
+RpcConnection::RpcConnection(RpcEngine *engine)
+    : engine_(engine), resp_state_(kReadLength), resp_length_(0) {}
+
+::asio::io_service &RpcConnection::io_service() {
+  return engine_->io_service();
+}
+
+void RpcConnection::Start() {
+  io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
+                              ::asio::error_code(), 0));
+}
+
+void RpcConnection::FlushPendingRequests() {
+  io_service().post([this]() {
+    if (!request_over_the_wire_) {
+      OnSendCompleted(::asio::error_code(), 0);
+    }
+  });
+}
+
+void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
+  /* assumed to be called from a context that has already acquired the
+   * engine_state_lock */
+  pbio::ArrayInputStream ar(&data[0], data.size());
+  pbio::CodedInputStream in(&ar);
+  in.PushLimit(data.size());
+  RpcResponseHeaderProto h;
+  ReadDelimitedPBMessage(&in, &h);
+
+  auto it = requests_on_fly_.find(h.callid());
+  if (it == requests_on_fly_.end()) {
+    // TODO: out of line RPC request
+    assert(false && "Out of line request with unknown call id");
+  }
+
+  auto req = it->second;
+  requests_on_fly_.erase(it);
+  Status stat;
+  if (h.has_exceptionclassname()) {
+    stat =
+        Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
+  }
+  req->OnResponseArrived(&in, stat);
+}
+
+std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
+  static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
+                                          RpcEngine::kRpcVersion, 0, 0};
+  auto res =
+      std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader));
+
+  RpcRequestHeaderProto h;
+  h.set_rpckind(RPC_PROTOCOL_BUFFER);
+  h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  h.set_callid(RpcEngine::kCallIdConnectionContext);
+  h.set_clientid(engine_->client_name());
+
+  IpcConnectionContextProto handshake;
+  handshake.set_protocol(engine_->protocol_name());
+  ConstructPacket(res.get(), {&h, &handshake}, nullptr);
+  return res;
+}
+
+void RpcConnection::AsyncRpc(
+    const std::string &method_name, const ::google::protobuf::MessageLite *req,
+    std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler)
{
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  auto wrapped_handler =
+      [resp, handler](pbio::CodedInputStream *is, const Status &status) {
+        if (status.ok()) {
+          ReadDelimitedPBMessage(is, resp.get());
+        }
+        handler(status);
+      };
+
+  auto r = std::make_shared<Request>(this, method_name, req,
+                                     std::move(wrapped_handler));
+  pending_requests_.push_back(r);
+  FlushPendingRequests();
+}
+
+void RpcConnection::AsyncRawRpc(const std::string &method_name,
+                                const std::string &req,
+                                std::shared_ptr<std::string> resp,
+                                Callback &&handler) {
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  auto wrapped_handler =
+      [this, resp, handler](pbio::CodedInputStream *is, const Status &status) {
+        if (status.ok()) {
+          uint32_t size = 0;
+          is->ReadVarint32(&size);
+          auto limit = is->PushLimit(size);
+          is->ReadString(resp.get(), limit);
+          is->PopLimit(limit);
+        }
+        handler(status);
+      };
+
+  auto r = std::make_shared<Request>(this, method_name, req,
+                                     std::move(wrapped_handler));
+  pending_requests_.push_back(r);
+  FlushPendingRequests();
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
new file mode 100644
index 0000000..a8eecf4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_CONNECTION_H_
+#define LIB_RPC_RPC_CONNECTION_H_
+
+#include "rpc_engine.h"
+#include "common/util.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+
+namespace hdfs {
+
+template <class NextLayer> class RpcConnectionImpl : public RpcConnection {
+public:
+  RpcConnectionImpl(RpcEngine *engine);
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+                       Callback &&handler) override;
+  virtual void Handshake(Callback &&handler) override;
+  virtual void Shutdown() override;
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+
+private:
+  NextLayer next_layer_;
+};
+
+template <class NextLayer>
+RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
+    : RpcConnection(engine)
+    , next_layer_(engine->io_service())
+{}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::Connect(
+    const std::vector<::asio::ip::tcp::endpoint> &server, Callback &&handler)
{
+  ::asio::async_connect(
+      next_layer_, server.begin(), server.end(),
+      [handler](const ::asio::error_code &ec,
+                std::vector<::asio::ip::tcp::endpoint>::const_iterator) {
+        handler(ToStatus(ec));
+      });
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::Handshake(Callback &&handler) {
+  auto handshake_packet = PrepareHandshakePacket();
+  ::asio::async_write(
+      next_layer_, asio::buffer(*handshake_packet),
+      [handshake_packet, handler](const ::asio::error_code &ec, size_t) {
+        handler(ToStatus(ec));
+      });
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  request_over_the_wire_.reset();
+  if (ec) {
+    // TODO: Current RPC has failed -- we should abandon the
+    // connection and do proper clean up
+    assert(false && "Unimplemented");
+  }
+
+  if (!pending_requests_.size()) {
+    return;
+  }
+
+  std::shared_ptr<Request> req = pending_requests_.front();
+  pending_requests_.erase(pending_requests_.begin());
+  requests_on_fly_[req->call_id()] = req;
+  request_over_the_wire_ = req;
+
+  // TODO: set the timeout for the RPC request
+
+  asio::async_write(
+      next_layer_, asio::buffer(req->payload()),
+      std::bind(&RpcConnectionImpl<NextLayer>::OnSendCompleted, this, _1, _2));
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+
+  switch (ec.value()) {
+  case 0:
+    // No errors
+    break;
+  case asio::error::operation_aborted:
+    // The event loop has been shut down. Ignore the error.
+    return;
+  default:
+    assert(false && "Unimplemented");
+  }
+
+  if (resp_state_ == kReadLength) {
+    resp_state_ = kReadContent;
+    auto buf = ::asio::buffer(reinterpret_cast<char *>(&resp_length_),
+                              sizeof(resp_length_));
+    asio::async_read(next_layer_, buf,
+                     std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted,
+                               this, _1, _2));
+
+  } else if (resp_state_ == kReadContent) {
+    resp_state_ = kParseResponse;
+    resp_length_ = ntohl(resp_length_);
+    resp_data_.resize(resp_length_);
+    asio::async_read(next_layer_, ::asio::buffer(resp_data_),
+                     std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted,
this, _1, _2));
+
+  } else if (resp_state_ == kParseResponse) {
+    resp_state_ = kReadLength;
+    HandleRpcResponse(resp_data_);
+    resp_data_.clear();
+    Start();
+  }
+}
+
+template <class NextLayer> void RpcConnectionImpl<NextLayer>::Shutdown() {
+  next_layer_.close();
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
new file mode 100644
index 0000000..50dce86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rpc_engine.h"
+#include "rpc_connection.h"
+#include "common/util.h"
+
+#include <openssl/rand.h>
+
+#include <sstream>
+#include <future>
+
+namespace hdfs {
+
+RpcEngine::RpcEngine(::asio::io_service *io_service,
+                     const std::string &client_name, const char *protocol_name,
+                     int protocol_version)
+    : io_service_(io_service), client_name_(client_name),
+      protocol_name_(protocol_name), protocol_version_(protocol_version),
+      call_id_(0)
+    , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this))
+{}
+
+Status
+RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) {
+  using ::asio::ip::tcp;
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  conn_->Connect(servers, [this, stat](const Status &status) {
+    if (!status.ok()) {
+      stat->set_value(status);
+      return;
+    }
+    conn_->Handshake(
+        [this, stat](const Status &status) { stat->set_value(status); });
+  });
+  return future.get();
+}
+
+void RpcEngine::Start() { conn_->Start(); }
+
+void RpcEngine::Shutdown() {
+  io_service_->post([this]() { conn_->Shutdown(); });
+}
+
+void RpcEngine::AsyncRpc(
+    const std::string &method_name, const ::google::protobuf::MessageLite *req,
+    const std::shared_ptr<::google::protobuf::MessageLite> &resp,
+    std::function<void(const Status &)> &&handler) {
+  conn_->AsyncRpc(method_name, req, resp, std::move(handler));
+}
+
+Status
+RpcEngine::Rpc(const std::string &method_name,
+               const ::google::protobuf::MessageLite *req,
+               const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  AsyncRpc(method_name, req, resp,
+           [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
+                         std::shared_ptr<std::string> resp) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  conn_->AsyncRawRpc(method_name, req, resp,
+                     [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+std::string RpcEngine::GetRandomClientName() {
+  unsigned char buf[6] = {
+      0,
+  };
+  RAND_pseudo_bytes(buf, sizeof(buf));
+
+  std::stringstream ss;
+  ss << "libhdfs++_"
+     << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
+  return ss.str();
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b9ff2ee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
new file mode 100644
index 0000000..cd5c0e6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_ENGINE_H_
+#define LIB_RPC_RPC_ENGINE_H_
+
+#include "libhdfspp/status.h"
+
+#include <google/protobuf/message_lite.h>
+
+#include <asio/ip/tcp.hpp>
+#include <asio/deadline_timer.hpp>
+
+#include <atomic>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+#include <mutex>
+
+namespace hdfs {
+
+class RpcEngine;
+class RpcConnection {
+public:
+  typedef std::function<void(const Status &)> Callback;
+  virtual ~RpcConnection();
+  RpcConnection(RpcEngine *engine);
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+                       Callback &&handler) = 0;
+  virtual void Handshake(Callback &&handler) = 0;
+  virtual void Shutdown() = 0;
+
+  void Start();
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                Callback &&handler);
+
+  void AsyncRawRpc(const std::string &method_name, const std::string &request,
+                   std::shared_ptr<std::string> resp, Callback &&handler);
+
+protected:
+  RpcEngine *const engine_;
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+
+  ::asio::io_service &io_service();
+  std::shared_ptr<std::string> PrepareHandshakePacket();
+  static std::string
+  SerializeRpcRequest(const std::string &method_name,
+                      const ::google::protobuf::MessageLite *req);
+  void HandleRpcResponse(const std::vector<char> &data);
+  void FlushPendingRequests();
+
+  enum ResponseState {
+    kReadLength,
+    kReadContent,
+    kParseResponse,
+  } resp_state_;
+  unsigned resp_length_;
+  std::vector<char> resp_data_;
+
+  class Request {
+  public:
+    typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
+                               const Status &status)> Handler;
+    Request(RpcConnection *parent, const std::string &method_name,
+            const std::string &request, Handler &&callback);
+    Request(RpcConnection *parent, const std::string &method_name,
+            const ::google::protobuf::MessageLite *request, Handler &&callback);
+
+    int call_id() const { return call_id_; }
+    ::asio::deadline_timer &timer() { return timer_; }
+    const std::string &payload() const { return payload_; }
+    void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
+                           const Status &status); 
+  private:
+    const int call_id_;
+    ::asio::deadline_timer timer_;
+    std::string payload_;
+    Handler handler_;
+  };
+
+  // The request being sent over the wire
+  std::shared_ptr<Request> request_over_the_wire_;
+  // Requests to be sent over the wire
+  std::vector<std::shared_ptr<Request>> pending_requests_;
+  // Requests that are waiting for responses
+  std::unordered_map<int, std::shared_ptr<Request>> requests_on_fly_;
+  // Lock for mutable parts of this class that need to be thread safe
+  std::mutex engine_state_lock_;
+};
+
+class RpcEngine {
+public:
+  enum { kRpcVersion = 9 };
+  enum {
+    kCallIdAuthorizationFailed = -1,
+    kCallIdInvalid = -2,
+    kCallIdConnectionContext = -3,
+    kCallIdPing = -4
+  };
+
+  RpcEngine(::asio::io_service *io_service, const std::string &client_name,
+            const char *protocol_name, int protocol_version);
+
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                const std::shared_ptr<::google::protobuf::MessageLite> &resp,
+                std::function<void(const Status &)> &&handler);
+
+  Status Rpc(const std::string &method_name,
+             const ::google::protobuf::MessageLite *req,
+             const std::shared_ptr<::google::protobuf::MessageLite> &resp);
+  /**
+   * Send raw bytes as RPC payload. This is intended to be used in JNI
+   * bindings only.
+   **/
+  Status RawRpc(const std::string &method_name, const std::string &req,
+                std::shared_ptr<std::string> resp);
+  Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server);
+  void Start();
+  void Shutdown();
+
+  int NextCallId() { return ++call_id_; }
+
+  const std::string &client_name() const { return client_name_; }
+  const std::string &protocol_name() const { return protocol_name_; }
+  int protocol_version() const { return protocol_version_; }
+  ::asio::io_service &io_service() { return *io_service_; }
+
+  static std::string GetRandomClientName();
+
+private:
+  ::asio::io_service *io_service_;
+  const std::string client_name_;
+  const std::string protocol_name_;
+  const int protocol_version_;
+  std::atomic_int call_id_;
+  std::unique_ptr<RpcConnection> conn_;
+};
+}
+
+#endif


Mime
View raw message