hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [1/2] hadoop git commit: HDFS-11106: libhdfs++: Some refactoring to better organize files (part 2). Contributed by James Clampffer.
Date Mon, 06 Mar 2017 17:34:28 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 511d9c5ae -> 4040c5489


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4040c548/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
new file mode 100644
index 0000000..12f7f0e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_CONNECTION_IMPL_H_
+#define LIB_RPC_RPC_CONNECTION_IMPL_H_
+
+#include "rpc_connection.h"
+#include "rpc_engine.h"
+#include "request.h"
+
+#include "common/auth_info.h"
+#include "common/logging.h"
+#include "common/util.h"
+#include "common/libhdfs_events_impl.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+
+#include <system_error>
+
+namespace hdfs {
+
+template <class Socket>
+class RpcConnectionImpl : public RpcConnection {
+public:
+  MEMCHECKED_CLASS(RpcConnectionImpl);
+
+  RpcConnectionImpl(RpcEngine *engine);
+  virtual ~RpcConnectionImpl() override;
+
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+                       const AuthInfo & auth_info,
+                       RpcCallback &handler);
+  virtual void ConnectAndFlush(
+      const std::vector<::asio::ip::tcp::endpoint> &server) override;
+  virtual void SendHandshake(RpcCallback &handler) override;
+  virtual void SendContext(RpcCallback &handler) override;
+  virtual void Disconnect() override;
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+  virtual void FlushPendingRequests() override;
+
+
+  Socket &TEST_get_mutable_socket() { return socket_; }
+
+  void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected;
}
+
+ private:
+  const Options options_;
+  ::asio::ip::tcp::endpoint current_endpoint_;
+  std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
+  Socket socket_;
+  ::asio::deadline_timer connect_timer_;
+
+  void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint
&remote);
+};
+
+template <class Socket>
+RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
+    : RpcConnection(engine),
+      options_(engine->options()),
+      socket_(engine->io_service()),
+      connect_timer_(engine->io_service())
+{
+      LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" <<
(void*)this);
+}
+
+template <class Socket>
+RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
+  LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" <<
(void*)this);
+
+  if (pending_requests_.size() > 0)
+    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the
pending queue");
+  if (requests_on_fly_.size() > 0)
+    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the
requests_on_fly queue");
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::Connect(
+    const std::vector<::asio::ip::tcp::endpoint> &server,
+    const AuthInfo & auth_info,
+    RpcCallback &handler) {
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
+
+  this->auth_info_ = auth_info;
+
+  auto connectionSuccessfulReq = std::make_shared<Request>(
+      engine_, [handler](::google::protobuf::io::CodedInputStream *is,
+                         const Status &status) {
+        (void)is;
+        handler(status);
+      });
+  pending_requests_.push_back(connectionSuccessfulReq);
+  this->ConnectAndFlush(server);  // need "this" so compiler can infer type of CAF
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::ConnectAndFlush(
+    const std::vector<::asio::ip::tcp::endpoint> &server) {
+
+  LOG_INFO(kRPC, << "ConnectAndFlush called");
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  if (server.empty()) {
+    Status s = Status::InvalidArgument("No endpoints provided");
+    CommsError(s);
+    return;
+  }
+
+  if (connected_ == kConnected) {
+    FlushPendingRequests();
+    return;
+  }
+  if (connected_ != kNotYetConnected) {
+    LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected="
<< ToString(connected_));
+    return;
+  }
+  connected_ = kConnecting;
+
+  // Take the first endpoint, but remember the alternatives for later
+  additional_endpoints_ = server;
+  ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
+  additional_endpoints_.erase(additional_endpoints_.begin());
+  current_endpoint_ = first_endpoint;
+
+  auto shared_this = shared_from_this();
+  socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code
&ec) {
+    ConnectComplete(ec, first_endpoint);
+  });
+
+  // Prompt the timer to timeout
+  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
+  connect_timer_.expires_from_now(
+        std::chrono::milliseconds(options_.rpc_connect_timeout));
+  connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code
&ec) {
+      if (ec)
+        ConnectComplete(ec, first_endpoint);
+      else
+        ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
+  });
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const
::asio::ip::tcp::endpoint & remote) {
+  auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  connect_timer_.cancel();
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
+
+  // Could be an old async connect returning a result after we've moved on
+  if (remote != current_endpoint_) {
+      LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but
current_endpoint_ is " << current_endpoint_);
+      return;
+  }
+  if (connected_ != kConnecting) {
+      LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
+      return;
+  }
+
+  Status status = ToStatus(ec);
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(),
0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+  }
+
+  if (status.ok()) {
+    StartReading();
+    SendHandshake([shared_this, this](const Status & s) {
+      HandshakeComplete(s);
+    });
+  } else {
+    LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
+    std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
+    if(!err.empty()) {
+      LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing
connection: " << err);
+    }
+
+    if (!additional_endpoints_.empty()) {
+      // If we have additional endpoints, keep trying until we either run out or
+      //    hit one
+      ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
+      additional_endpoints_.erase(additional_endpoints_.begin());
+      current_endpoint_ = next_endpoint;
+
+      socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code
&ec) {
+        ConnectComplete(ec, next_endpoint);
+      });
+      connect_timer_.expires_from_now(
+            std::chrono::milliseconds(options_.rpc_connect_timeout));
+      connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code
&ec) {
+          if (ec)
+            ConnectComplete(ec, next_endpoint);
+          else
+            ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
+        });
+    } else {
+      CommsError(status);
+    }
+  }
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
+  connected_ = kHandshaking;
+
+  auto shared_this = shared_from_this();
+  auto handshake_packet = PrepareHandshakePacket();
+  ::asio::async_write(socket_, asio::buffer(*handshake_packet),
+                      [handshake_packet, handler, shared_this, this](
+                          const ::asio::error_code &ec, size_t) {
+                        Status status = ToStatus(ec);
+                        handler(status);
+                      });
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
+
+  auto shared_this = shared_from_this();
+  auto context_packet = PrepareContextPacket();
+  ::asio::async_write(socket_, asio::buffer(*context_packet),
+                      [context_packet, handler, shared_this, this](
+                          const ::asio::error_code &ec, size_t) {
+                        Status status = ToStatus(ec);
+                        handler(status);
+                      });
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
+
+  request_over_the_wire_.reset();
+  if (ec) {
+    LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
+    CommsError(ToStatus(ec));
+    return;
+  }
+
+  FlushPendingRequests();
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::FlushPendingRequests() {
+  using namespace ::std::placeholders;
+
+  // Lock should be held
+  assert(lock_held(connection_state_lock_));
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
+
+  // Don't send if we don't need to
+  if (request_over_the_wire_) {
+    return;
+  }
+
+  std::shared_ptr<Request> req;
+  switch (connected_) {
+  case kNotYetConnected:
+    return;
+  case kConnecting:
+    return;
+  case kHandshaking:
+    return;
+  case kAuthenticating:
+    if (auth_requests_.empty()) {
+      return;
+    }
+    req = auth_requests_.front();
+    auth_requests_.erase(auth_requests_.begin());
+    break;
+  case kConnected:
+    if (pending_requests_.empty()) {
+      return;
+    }
+    req = pending_requests_.front();
+    pending_requests_.erase(pending_requests_.begin());
+    break;
+  case kDisconnected:
+    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush
a " << ToString(connected_) << " connection");
+    return;
+  default:
+    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " <<
ToString(connected_));
+    return;
+  }
+
+  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
+  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
+  auto weak_req = std::weak_ptr<Request>(req);
+
+  std::shared_ptr<std::string> payload = std::make_shared<std::string>();
+  req->GetPacket(payload.get());
+  if (!payload->empty()) {
+    assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
+    requests_on_fly_[req->call_id()] = req;
+    request_over_the_wire_ = req;
+
+    req->timer().expires_from_now(
+        std::chrono::milliseconds(options_.rpc_timeout));
+    req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec)
{
+        auto timeout_this = weak_this.lock();
+        auto timeout_req = weak_req.lock();
+        if (timeout_this && timeout_req)
+          this->HandleRpcTimeout(timeout_req, ec);
+    });
+
+    asio::async_write(socket_, asio::buffer(*payload),
+                      [shared_this, this, payload](const ::asio::error_code &ec,
+                                                   size_t size) {
+                        OnSendCompleted(ec, size);
+                      });
+  } else {  // Nothing to send for this request, inform the handler immediately
+    io_service().post(
+        // Never hold locks when calling a callback
+        [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
+    );
+
+    // Reschedule to flush the next one
+    AsyncFlushPendingRequests();
+  }
+}
+
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  ::asio::error_code my_ec(original_ec);
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
+
+  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
+
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(),
0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response() == event_response::kTest_Error) {
+      my_ec = std::make_error_code(std::errc::network_down);
+    }
+#endif
+  }
+
+  switch (my_ec.value()) {
+    case 0:
+      // No errors
+      break;
+    case asio::error::operation_aborted:
+      // The event loop has been shut down. Ignore the error.
+      return;
+    default:
+      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
+      CommsError(ToStatus(my_ec));
+      return;
+  }
+
+  if (!current_response_state_) { /* start a new one */
+    current_response_state_ = std::make_shared<Response>();
+  }
+
+  if (current_response_state_->state_ == Response::kReadLength) {
+    current_response_state_->state_ = Response::kReadContent;
+    auto buf = ::asio::buffer(reinterpret_cast<char *>(&current_response_state_->length_),
+                              sizeof(current_response_state_->length_));
+    asio::async_read(
+        socket_, buf,
+        [shared_this, this](const ::asio::error_code &ec, size_t size) {
+          OnRecvCompleted(ec, size);
+        });
+  } else if (current_response_state_->state_ == Response::kReadContent) {
+    current_response_state_->state_ = Response::kParseResponse;
+    current_response_state_->length_ = ntohl(current_response_state_->length_);
+    current_response_state_->data_.resize(current_response_state_->length_);
+    asio::async_read(
+        socket_, ::asio::buffer(current_response_state_->data_),
+        [shared_this, this](const ::asio::error_code &ec, size_t size) {
+          OnRecvCompleted(ec, size);
+        });
+  } else if (current_response_state_->state_ == Response::kParseResponse) {
+    // Check return status from the RPC response.  We may have received a msg
+    // indicating a server side error.
+
+    Status stat = HandleRpcResponse(current_response_state_);
+
+    if(stat.get_server_exception_type() == Status::kStandbyException) {
+      // May need to bail out, connect to new NN, and restart loop
+      LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect");
+    }
+
+    current_response_state_ = nullptr;
+    StartReading();
+  }
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::Disconnect() {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
+
+  LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
+
+  request_over_the_wire_.reset();
+  if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating
|| connected_ == kConnected) {
+    // Don't print out errors, we were expecting a disconnect here
+    SafeDisconnect(get_asio_socket_ptr(&socket_));
+  }
+  connected_ = kDisconnected;
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4040c548/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index 651225a..22c0e74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -16,13 +16,12 @@
  * limitations under the License.
  */
 #include "rpc_engine.h"
-#include "rpc_connection.h"
+#include "rpc_connection_impl.h"
 #include "common/util.h"
 #include "common/logging.h"
 #include "common/namenode_info.h"
 #include "optional.hpp"
 
-#include <future>
 #include <algorithm>
 
 namespace hdfs {
@@ -30,114 +29,6 @@ namespace hdfs {
 template <class T>
 using optional = std::experimental::optional<T>;
 
-HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
-                                     ::asio::io_service *ioservice,
-                                     std::shared_ptr<LibhdfsEvents> event_handlers)
-                  : enabled_(false), resolved_(false),
-                    ioservice_(ioservice), event_handlers_(event_handlers)
-{
-  LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
-  for(unsigned int i=0;i<servers.size();i++)
-    LOG_TRACE(kRPC, << servers[i].str());
-
-  if(servers.size() >= 2) {
-    LOG_TRACE(kRPC, << "Creating HA namenode tracker");
-    if(servers.size() > 2) {
-      LOG_WARN(kRPC, << "Nameservice declares more than two nodes.  Some won't be used.");
-    }
-
-    active_info_ = servers[0];
-    standby_info_ = servers[1];
-    LOG_INFO(kRPC, << "Active namenode url  = " << active_info_.uri.str());
-    LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
-
-    enabled_ = true;
-    if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
-      resolved_ = true;
-    }
-  }
-}
-
-
-HANamenodeTracker::~HANamenodeTracker() { }
-
-
-static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts)
{
-  std::stringstream ss;
-  for(unsigned int i=0; i<pts.size(); i++)
-    if(i == pts.size() - 1)
-      ss << pts[i];
-    else
-      ss << pts[i] << ", ";
-  return ss.str();
-}
-
-//  Pass in endpoint from current connection, this will do a reverse lookup
-//  and return the info for the standby node. It will also swap its state internally.
-ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint)
{
-  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
-  mutex_guard swap_lock(swap_lock_);
-
-  ResolvedNamenodeInfo failover_node;
-
-  // Connected to standby, switch standby to active
-  if(IsCurrentActive_locked(current_endpoint)) {
-    std::swap(active_info_, standby_info_);
-    if(event_handlers_)
-      event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
-                            reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
-    failover_node = active_info_;
-  } else if(IsCurrentStandby_locked(current_endpoint)) {
-    // Connected to standby
-    if(event_handlers_)
-      event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
-                            reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
-    failover_node = active_info_;
-  } else {
-    // Invalid state, throw for testing
-    std::string ep1 = format_endpoints(active_info_.endpoints);
-    std::string ep2 = format_endpoints(standby_info_.endpoints);
-
-    std::stringstream msg;
-    msg << "Looked for " << current_endpoint << " in\n";
-    msg << ep1 << " and\n";
-    msg << ep2 << std::endl;
-
-    LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str()
<< ". Bailing out.");
-    throw std::runtime_error(msg.str());
-  }
-
-  if(failover_node.endpoints.empty()) {
-    LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() <<
" attempting to resolve again");
-    if(!ResolveInPlace(ioservice_, failover_node)) {
-      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
-                      << "failed.  Please make sure your configuration is up to date.");
-    }
-  }
-  return failover_node;
-}
-
-bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const
{
-  for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
-    if(ep.address() == active_info_.endpoints[i].address()) {
-      if(ep.port() != active_info_.endpoints[i].port())
-        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i]
<< " trying anyway..");
-      return true;
-    }
-  }
-  return false;
-}
-
-bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep)
const {
-  for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
-    if(ep.address() == standby_info_.endpoints[i].address()) {
-      if(ep.port() != standby_info_.endpoints[i].port())
-        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i]
<< " trying anyway..");
-      return true;
-    }
-  }
-  return false;
-}
 
 RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
                      const std::string &client_name, const std::string &user_name,
@@ -276,19 +167,6 @@ void RpcEngine::AsyncRpc(
   conn_->AsyncRpc(method_name, req, resp, handler);
 }
 
-Status RpcEngine::Rpc(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
-
-  LOG_TRACE(kRPC, << "RpcEngine::Rpc called");
-
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  AsyncRpc(method_name, req, resp,
-           [stat](const Status &status) { stat->set_value(status); });
-  return future.get();
-}
-
 std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
 {
   LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called");
@@ -304,7 +182,6 @@ std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
   return result;
 }
 
-
 void RpcEngine::AsyncRpcCommsError(
     const Status &status,
     std::shared_ptr<RpcConnection> failedConnection,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4040c548/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index b4aef00..dc4054e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -25,25 +25,19 @@
 #include "common/retry_policy.h"
 #include "common/libhdfs_events_impl.h"
 #include "common/util.h"
-#include "common/continuation/asio.h"
-#include "common/logging.h"
 #include "common/new_delete.h"
 #include "common/namenode_info.h"
+#include "namenode_tracker.h"
 
 #include <google/protobuf/message_lite.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
 
 #include <asio/ip/tcp.hpp>
 #include <asio/deadline_timer.hpp>
 
 #include <atomic>
 #include <memory>
-#include <unordered_map>
 #include <vector>
-#include <deque>
 #include <mutex>
-#include <future>
 
 namespace hdfs {
 
@@ -64,193 +58,8 @@ typedef const std::function<void(const Status &)> RpcCallback;
 class LockFreeRpcEngine;
 class RpcConnection;
 class SaslProtocol;
-
-/*
- * Internal bookkeeping for an outstanding request from the consumer.
- *
- * Threading model: not thread-safe; should only be accessed from a single
- *   thread at a time
- */
-class Request {
- public:
-  MEMCHECKED_CLASS(Request)
-  typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
-                             const Status &status)> Handler;
-
-  Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
-          const std::string &request, Handler &&callback);
-  Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
-          const ::google::protobuf::MessageLite *request, Handler &&callback);
-
-  // Null request (with no actual message) used to track the state of an
-  //    initial Connect call
-  Request(LockFreeRpcEngine *engine, Handler &&handler);
-
-  int call_id() const { return call_id_; }
-  std::string  method_name() const { return method_name_; }
-  ::asio::deadline_timer &timer() { return timer_; }
-  int IncrementRetryCount() { return retry_count_++; }
-  int IncrementFailoverCount();
-  void GetPacket(std::string *res) const;
-  void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
-                         const Status &status);
-
-  int get_failover_count() {return failover_count_;}
-
-  std::string GetDebugString() const;
-
- private:
-  LockFreeRpcEngine *const engine_;
-  const std::string method_name_;
-  const int call_id_;
-
-  ::asio::deadline_timer timer_;
-  std::string payload_;
-  const Handler handler_;
-
-  int retry_count_;
-  int failover_count_;
-};
-
-/*
- * Encapsulates a persistent connection to the NameNode, and the sending of
- * RPC requests and evaluating their responses.
- *
- * Can have multiple RPC requests in-flight simultaneously, but they are
- * evaluated in-order on the server side in a blocking manner.
- *
- * Threading model: public interface is thread-safe
- * All handlers passed in to method calls will be called from an asio thread,
- *   and will not be holding any internal RpcConnection locks.
- */
-class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
- public:
-  MEMCHECKED_CLASS(RpcConnection)
-  RpcConnection(LockFreeRpcEngine *engine);
-  virtual ~RpcConnection();
-
-  // Note that a single server can have multiple endpoints - especially both
-  //   an ipv4 and ipv6 endpoint
-  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
-                       const AuthInfo & auth_info,
-                       RpcCallback &handler) = 0;
-  virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server)
= 0;
-  virtual void Disconnect() = 0;
-
-  void StartReading();
-  void AsyncRpc(const std::string &method_name,
-                const ::google::protobuf::MessageLite *req,
-                std::shared_ptr<::google::protobuf::MessageLite> resp,
-                const RpcCallback &handler);
-
-  void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
-
-  // Enqueue requests before the connection is connected.  Will be flushed
-  //   on connect
-  void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
-
-  // Put requests at the front of the current request queue
-  void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
-
-  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
-  void SetClusterName(std::string cluster_name);
-
-  LockFreeRpcEngine *engine() { return engine_; }
-  ::asio::io_service &io_service();
-
- protected:
-  struct Response {
-    enum ResponseState {
-      kReadLength,
-      kReadContent,
-      kParseResponse,
-    } state_;
-    unsigned length_;
-    std::vector<char> data_;
-
-    std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
-    std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
-
-    Response() : state_(kReadLength), length_(0) {}
-  };
-
-
-  // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected
-  virtual void SendHandshake(RpcCallback &handler) = 0;
-  void HandshakeComplete(const Status &s);
-  void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
-  void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
-  virtual void SendContext(RpcCallback &handler) = 0;
-  void ContextComplete(const Status &s);
-
-
-  virtual void OnSendCompleted(const ::asio::error_code &ec,
-                               size_t transferred) = 0;
-  virtual void OnRecvCompleted(const ::asio::error_code &ec,
-                               size_t transferred) = 0;
-  virtual void FlushPendingRequests()=0;      // Synchronously write the next request
-
-  void AsyncRpc_locked(
-                const std::string &method_name,
-                const ::google::protobuf::MessageLite *req,
-                std::shared_ptr<::google::protobuf::MessageLite> resp,
-                const RpcCallback &handler);
-  void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests);
-  void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time
-
-
-
-  std::shared_ptr<std::string> PrepareHandshakePacket();
-  std::shared_ptr<std::string> PrepareContextPacket();
-  static std::string SerializeRpcRequest(
-      const std::string &method_name,
-      const ::google::protobuf::MessageLite *req);
-  Status HandleRpcResponse(std::shared_ptr<Response> response);
-  void HandleRpcTimeout(std::shared_ptr<Request> req,
-                        const ::asio::error_code &ec);
-  void CommsError(const Status &status);
-
-  void ClearAndDisconnect(const ::asio::error_code &ec);
-  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
-
-  LockFreeRpcEngine *const engine_;
-  std::shared_ptr<Response> current_response_state_;
-  AuthInfo auth_info_;
-
-  // Connection can have deferred connection, especially when we're pausing
-  //   during retry
-  enum ConnectedState {
-      kNotYetConnected,
-      kConnecting,
-      kHandshaking,
-      kAuthenticating,
-      kConnected,
-      kDisconnected
-  };
-  static std::string ToString(ConnectedState connected);
-  ConnectedState connected_;
-
-  // State machine for performing a SASL handshake
-  std::shared_ptr<SaslProtocol> sasl_protocol_;
-  // The request being sent over the wire; will also be in requests_on_fly_
-  std::shared_ptr<Request> request_over_the_wire_;
-  // Requests to be sent over the wire
-  std::deque<std::shared_ptr<Request>> pending_requests_;
-  // Requests to be sent over the wire during authentication; not retried if
-  //   there is a connection error
-  std::deque<std::shared_ptr<Request>> auth_requests_;
-  // Requests that are waiting for responses
-  typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
-  RequestOnFlyMap requests_on_fly_;
-  std::shared_ptr<LibhdfsEvents> event_handlers_;
-  std::string cluster_name_;
-
-  // Lock for mutable parts of this class that need to be thread safe
-  std::mutex connection_state_lock_;
-
-  friend class SaslProtocol;
-};
-
+class RpcConnection;
+class Request;
 
 /*
  * These methods of the RpcEngine will never acquire locks, and are safe for
@@ -259,6 +68,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection>
{
 class LockFreeRpcEngine {
 public:
   MEMCHECKED_CLASS(LockFreeRpcEngine)
+
   /* Enqueues a CommsError without acquiring a lock*/
   virtual void AsyncRpcCommsError(const Status &status,
                       std::shared_ptr<RpcConnection> failedConnection,
@@ -279,54 +89,6 @@ public:
 
 
 /*
- *  Tracker gives the RpcEngine a quick way to use an endpoint that just
- *  failed in order to lookup a set of endpoints for a failover node.
- *
- *  Note: For now this only deals with 2 NameNodes, but that's the default
- *  anyway.
- */
-class HANamenodeTracker {
- public:
-  HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
-                    ::asio::io_service *ioservice,
-                    std::shared_ptr<LibhdfsEvents> event_handlers_);
-
-  virtual ~HANamenodeTracker();
-
-  bool is_enabled() const { return enabled_; }
-  bool is_resolved() const { return resolved_; }
-
-  // Get node opposite of the current one if possible (swaps active/standby)
-  // Note: This will always mutate internal state.  Use IsCurrentActive/Standby to
-  // get info without changing state
-  ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
-
-  bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
-  bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
-
- private:
-  // If HA should be enabled, according to our options and runtime info like # nodes provided
-  bool enabled_;
-  // If we were able to resolve at least 1 HA namenode
-  bool resolved_;
-
-  // Keep service in case a second round of DNS lookup is required
-  ::asio::io_service *ioservice_;
-
-  // Event handlers, for now this is the simplest place to catch all failover events
-  // and push info out to client application.  Possibly move into RPCEngine.
-  std::shared_ptr<LibhdfsEvents> event_handlers_;
-
-  // Only support 1 active and 1 standby for now.
-  ResolvedNamenodeInfo active_info_;
-  ResolvedNamenodeInfo standby_info_;
-
-  // Aquire when switching from active-standby
-  std::mutex swap_lock_;
-};
-
-
-/*
  * An engine for reliable communication with a NameNode.  Handles connection,
  * retry, and (someday) failover of the requested messages.
  *
@@ -360,10 +122,6 @@ class RpcEngine : public LockFreeRpcEngine {
                 const std::shared_ptr<::google::protobuf::MessageLite> &resp,
                 const std::function<void(const Status &)> &handler);
 
-  Status Rpc(const std::string &method_name,
-             const ::google::protobuf::MessageLite *req,
-             const std::shared_ptr<::google::protobuf::MessageLite> &resp);
-
   void Shutdown();
 
   /* Enqueues a CommsError without acquiring a lock*/

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4040c548/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
index b35a2f9..83d4f88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
@@ -17,6 +17,7 @@
  */
 
 #include "rpc_engine.h"
+#include "rpc_connection.h"
 #include "common/logging.h"
 
 #include "sasl_engine.h"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4040c548/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index 08218f6..66a5f1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -19,7 +19,7 @@
 #include "mock_connection.h"
 #include "test.pb.h"
 #include "RpcHeader.pb.h"
-#include "rpc/rpc_connection.h"
+#include "rpc/rpc_connection_impl.h"
 #include "common/namenode_info.h"
 
 #include <google/protobuf/io/coded_stream.h>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message