hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bobhan...@apache.org
Subject hadoop git commit: HDFS-10526: libhdfs++: Add connect timeouts to async_connect calls. Contributed by Bob Hansen.
Date Mon, 20 Jun 2016 19:40:14 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 6c264b1cb -> 71af40868


HDFS-10526: libhdfs++: Add connect timeouts to async_connect calls.  Contributed by Bob Hansen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71af4086
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71af4086
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71af4086

Branch: refs/heads/HDFS-8707
Commit: 71af40868aa3c2461aa3962c814a8be6571ac269
Parents: 6c264b1
Author: Bob Hansen <bob@hpe.com>
Authored: Mon Jun 20 15:37:14 2016 -0400
Committer: Bob Hansen <bob@hpe.com>
Committed: Mon Jun 20 15:39:41 2016 -0400

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/options.h   |  7 +++
 .../libhdfspp/lib/common/hdfs_configuration.cc  |  3 +-
 .../libhdfspp/lib/common/hdfs_configuration.h   |  3 +-
 .../main/native/libhdfspp/lib/common/logging.cc |  6 ++-
 .../main/native/libhdfspp/lib/common/logging.h  |  3 ++
 .../main/native/libhdfspp/lib/common/options.cc |  3 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  |  5 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.h   | 55 +++++++++++++++++---
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  2 +-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |  1 +
 .../libhdfspp/tests/hdfs_configuration_test.cc  | 14 +++--
 11 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
index 1828b2a..8562f6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
@@ -34,6 +34,13 @@ struct Options {
   static const int kDefaultRpcTimeout = 30000;
 
   /**
+   * Time to wait for an RPC connection before failing
+   * Default: 30000
+   **/
+  int rpc_connect_timeout;
+  static const int kDefaultRpcConnectTimeout = 30000;
+
+  /**
    * Maximum number of retries for RPC operations
    **/
   int max_rpc_retries;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
index 13ba279..ef67af9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
@@ -45,11 +45,12 @@ Options HdfsConfiguration::GetOptions() {
   Options result;
 
   OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey));
+  OptionalSet(result.rpc_connect_timeout, GetInt(kIpcClientConnectTimeoutKey));
   OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey));
   OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey));
   OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
 
-  optional<std::string> authentication_value = Get(kHadoopSecurityAuthentication);
+  optional<std::string> authentication_value = Get(kHadoopSecurityAuthenticationKey);
   if (authentication_value ) {
       std::string fixed_case_value = fixCase(authentication_value.value());
       if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
index 7db9d37..c6ead66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
@@ -39,9 +39,10 @@ class HdfsConfiguration : public Configuration {
     // Keys to look for in the configuration file
     static constexpr const char * kFsDefaultFsKey = "fs.defaultFS";
     static constexpr const char * kDfsClientSocketTimeoutKey = "dfs.client.socket-timeout";
+    static constexpr const char * kIpcClientConnectTimeoutKey = "ipc.client.connect.timeout";
     static constexpr const char * kIpcClientConnectMaxRetriesKey = "ipc.client.connect.max.retries";
     static constexpr const char * kIpcClientConnectRetryIntervalKey = "ipc.client.connect.retry.interval";
-    static constexpr const char * kHadoopSecurityAuthentication = "hadoop.security.authentication";
+    static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication";
     static constexpr const char * kHadoopSecurityAuthentication_simple = "simple";
     static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
index c299761..39ed944 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
@@ -136,6 +136,11 @@ LogMessage& LogMessage::operator<<(const std::string& str)
{
   return *this;
 }
 
+LogMessage& LogMessage::operator<<(const ::asio::ip::tcp::endpoint& endpoint)
{
+  msg_buffer_ << endpoint;
+  return *this;
+}
+
 LogMessage& LogMessage::operator<<(const char *str) {
   if(str)
     msg_buffer_ << str;
@@ -213,4 +218,3 @@ const char * LogMessage::component_string() const {
 }
 
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
index 3403646..9dc0c5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
@@ -26,6 +26,8 @@
 #include <mutex>
 #include <memory>
 
+#include <asio/ip/tcp.hpp>
+
 namespace hdfs {
 
 /**
@@ -177,6 +179,7 @@ class LogMessage {
   LogMessage& operator<<(const std::string*);
   LogMessage& operator<<(const std::string&);
 
+  LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint);
 
   //convert to a string "true"/"false"
   LogMessage& operator<<(bool);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
index c7dd2ed..305ea1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
@@ -27,7 +27,8 @@ const int Options::kDefaultMaxRpcRetries;
 const int Options::kDefaultRpcRetryDelayMs;
 const unsigned int Options::kDefaultHostExclusionDuration;
 
-Options::Options() : rpc_timeout(kDefaultRpcTimeout), max_rpc_retries(kDefaultMaxRpcRetries),
+Options::Options() : rpc_timeout(kDefaultRpcTimeout), rpc_connect_timeout(kDefaultRpcConnectTimeout),
+                     max_rpc_retries(kDefaultMaxRpcRetries),
                      rpc_retry_delay_ms(kDefaultRpcRetryDelayMs),
                      host_exclusion_duration(kDefaultHostExclusionDuration),
                      defaultFS(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index 749195a..8567932 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -183,7 +183,7 @@ void RpcConnection::HandshakeComplete(const Status &s) {
   LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
 
   if (s.ok()) {
-    if (connected_ == kConnecting) {
+    if (connected_ == kHandshaking) {
       auto shared_this = shared_from_this();
 
       connected_ = kAuthenticating;
@@ -407,7 +407,7 @@ void RpcConnection::SendRpcRequests(const std::vector<std::shared_ptr<Request>
>
       else
         auth_requests_.push_back(r);
     }
-    if (connected_ == kConnected || connected_ == kAuthenticating) { // Dont flush if we're
waiting or handshaking
+    if (connected_ == kConnected || connected_ == kHandshaking || connected_ == kAuthenticating)
{ // Dont flush if we're waiting or handshaking
       FlushPendingRequests();
     }
   }
@@ -494,6 +494,7 @@ std::string RpcConnection::ToString(ConnectedState connected) {
   switch(connected) {
     case kNotYetConnected: return "NotYetConnected";
     case kConnecting: return "Connecting";
+    case kHandshaking: return "Handshaking";
     case kAuthenticating: return "Authenticating";
     case kConnected: return "Connected";
     case kDisconnected: return "Disconnected";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 255b98b..2b47ce1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -59,17 +59,20 @@ public:
 
  private:
   const Options options_;
+  ::asio::ip::tcp::endpoint current_endpoint_;
   std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
   NextLayer next_layer_;
+  ::asio::deadline_timer connect_timer_;
 
-  void ConnectComplete(const ::asio::error_code &ec);
+  void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint
&remote);
 };
 
 template <class NextLayer>
 RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
     : RpcConnection(engine),
       options_(engine->options()),
-      next_layer_(engine->io_service()) {
+      next_layer_(engine->io_service()),
+      connect_timer_(engine->io_service()) {
     LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
 }
 
@@ -129,20 +132,43 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
   additional_endpoints_ = server;
   ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
   additional_endpoints_.erase(additional_endpoints_.begin());
+  current_endpoint_ = first_endpoint;
 
   auto shared_this = shared_from_this();
-  next_layer_.async_connect(first_endpoint, [shared_this, this](const ::asio::error_code
&ec) {
-    ConnectComplete(ec);
+  next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code
&ec) {
+    ConnectComplete(ec, first_endpoint);
+  });
+
+  // Prompt the timer to timeout
+  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
+  connect_timer_.expires_from_now(
+        std::chrono::milliseconds(options_.rpc_connect_timeout));
+  connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code
&ec) {
+      if (ec)
+        ConnectComplete(ec, first_endpoint);
+      else
+        ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
   });
 }
 
 template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
{
+void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
const ::asio::ip::tcp::endpoint & remote) {
   auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  connect_timer_.cancel();
 
   LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
 
+  // Could be an old async connect returning a result after we've moved on
+  if (remote != current_endpoint_) {
+      LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but
current_endpoint_ is " << current_endpoint_);
+      return;
+  }
+  if (connected_ != kConnecting) {
+      LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
+      return;
+  }
+
   Status status = ToStatus(ec);
   if(event_handlers_) {
     auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(),
0);
@@ -159,6 +185,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code
&ec)
       HandshakeComplete(s);
     });
   } else {
+    LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
     std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_));
     if(!err.empty()) {
       LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing
connection: " << err);
@@ -169,10 +196,19 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code
&ec)
       //    hit one
       ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
       additional_endpoints_.erase(additional_endpoints_.begin());
+      current_endpoint_ = next_endpoint;
 
-      next_layer_.async_connect(next_endpoint, [shared_this, this](const ::asio::error_code
&ec) {
-        ConnectComplete(ec);
+      next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code
&ec) {
+        ConnectComplete(ec, next_endpoint);
       });
+      connect_timer_.expires_from_now(
+            std::chrono::milliseconds(options_.rpc_connect_timeout));
+      connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code
&ec) {
+          if (ec)
+            ConnectComplete(ec, next_endpoint);
+          else
+            ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
+        });
     } else {
       CommsError(status);
     }
@@ -184,6 +220,7 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler)
{
   assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
 
   LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
+  connected_ = kHandshaking;
 
   auto shared_this = shared_from_this();
   auto handshake_packet = PrepareHandshakePacket();
@@ -250,6 +287,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
     return;
   case kConnecting:
     return;
+  case kHandshaking:
+    return;
   case kAuthenticating:
     if (auth_requests_.empty()) {
       return;
@@ -379,7 +418,7 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
   LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
 
   request_over_the_wire_.reset();
-  if (connected_ == kConnecting || connected_ == kAuthenticating || connected_ == kConnected)
{
+  if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating
|| connected_ == kConnected) {
     // Don't print out errors, we were expecting a disconnect here
     SafeDisconnect(get_asio_socket_ptr(&next_layer_));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index f8df97f..a8438b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -197,7 +197,7 @@ void RpcEngine::RpcCommsError(
       if (head_action->delayMillis > 0) {
         auto weak_conn = std::weak_ptr<RpcConnection>(conn_);
         retry_timer.expires_from_now(
-            std::chrono::milliseconds(options_.rpc_retry_delay_ms));
+            std::chrono::milliseconds(head_action->delayMillis));
         retry_timer.async_wait([this, weak_conn](asio::error_code ec) {
           auto strong_conn = weak_conn.lock();
           if ( (!ec) && (strong_conn) ) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 066c01f..d0365c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -207,6 +207,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection>
{
   enum ConnectedState {
       kNotYetConnected,
       kConnecting,
+      kHandshaking,
       kAuthenticating,
       kConnected,
       kDisconnected

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71af4086/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc
index 035c044..7e9ca66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc
@@ -41,17 +41,25 @@ TEST(HdfsConfigurationTest, TestSetOptions)
   // Completely empty stream
   {
     std::stringstream stream;
-    simpleConfigStream(stream, HdfsConfiguration::kDfsClientSocketTimeoutKey, 100,
-                               HdfsConfiguration::kIpcClientConnectMaxRetriesKey, 101,
-                               HdfsConfiguration::kIpcClientConnectRetryIntervalKey, 102);
+    simpleConfigStream(stream,
+                       HdfsConfiguration::kFsDefaultFsKey, "/FDFK",
+                       HdfsConfiguration::kDfsClientSocketTimeoutKey, 100,
+                       HdfsConfiguration::kIpcClientConnectMaxRetriesKey, 101,
+                       HdfsConfiguration::kIpcClientConnectRetryIntervalKey, 102,
+                       HdfsConfiguration::kIpcClientConnectTimeoutKey, 103,
+                       HdfsConfiguration::kHadoopSecurityAuthenticationKey, HdfsConfiguration::kHadoopSecurityAuthentication_kerberos
+            );
 
     optional<HdfsConfiguration> config = ConfigurationLoader().Load<HdfsConfiguration>(stream.str());
     EXPECT_TRUE(config && "Read stream");
     Options options = config->GetOptions();
 
+    EXPECT_EQ("/FDFK", options.defaultFS.str());
     EXPECT_EQ(100, options.rpc_timeout);
     EXPECT_EQ(101, options.max_rpc_retries);
     EXPECT_EQ(102, options.rpc_retry_delay_ms);
+    EXPECT_EQ(103, options.rpc_connect_timeout);
+    EXPECT_EQ(Options::kKerberos, options.authentication);
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message