From common-commits-return-80432-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Thu Mar 22 23:04:33 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3BA74180787 for ; Thu, 22 Mar 2018 23:04:31 +0100 (CET) Received: (qmail 38670 invoked by uid 500); 22 Mar 2018 22:04:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38425 invoked by uid 99); 22 Mar 2018 22:04:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Mar 2018 22:04:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D4FC2F6768; Thu, 22 Mar 2018 22:04:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Thu, 22 Mar 2018 22:04:27 -0000 Message-Id: In-Reply-To: <8f2b4901454b4370b18adb970c75559e@git.apache.org> References: <8f2b4901454b4370b18adb970c75559e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] hadoop git commit: HDFS-10441: libhdfs++: HA namenode support. Contributed by James Clampffer. HDFS-10441: libhdfs++: HA namenode support. Contributed by James Clampffer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cb0dad5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cb0dad5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cb0dad5 Branch: refs/heads/HDFS-7240 Commit: 4cb0dad5e535f4ee79c32a54f8958bf7d4402123 Parents: d75c835 Author: James Authored: Fri Jul 29 20:10:53 2016 -0400 Committer: James Clampffer Committed: Thu Mar 22 17:19:47 2018 -0400 ---------------------------------------------------------------------- .../main/native/libhdfspp/examples/cat/c/cat.c | 1 + .../native/libhdfspp/include/hdfspp/events.h | 9 +- .../native/libhdfspp/include/hdfspp/options.h | 42 +++- .../native/libhdfspp/include/hdfspp/status.h | 24 +- .../native/libhdfspp/lib/common/CMakeLists.txt | 2 +- .../libhdfspp/lib/common/hdfs_configuration.cc | 123 ++++++++++ .../libhdfspp/lib/common/hdfs_configuration.h | 5 + .../main/native/libhdfspp/lib/common/logging.cc | 2 +- .../main/native/libhdfspp/lib/common/logging.h | 9 +- .../libhdfspp/lib/common/namenode_info.cc | 132 ++++++++++ .../native/libhdfspp/lib/common/namenode_info.h | 49 ++++ .../main/native/libhdfspp/lib/common/options.cc | 21 +- .../native/libhdfspp/lib/common/retry_policy.cc | 43 +++- .../native/libhdfspp/lib/common/retry_policy.h | 83 ++++++- .../main/native/libhdfspp/lib/common/status.cc | 73 ++++-- .../src/main/native/libhdfspp/lib/common/uri.cc | 48 +++- .../src/main/native/libhdfspp/lib/common/uri.h | 6 +- .../main/native/libhdfspp/lib/fs/filesystem.cc | 39 ++- .../libhdfspp/lib/fs/namenode_operations.cc | 18 +- .../libhdfspp/lib/fs/namenode_operations.h | 7 +- .../native/libhdfspp/lib/rpc/rpc_connection.cc | 56 ++++- .../native/libhdfspp/lib/rpc/rpc_connection.h | 13 +- .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 241 ++++++++++++++++--- .../main/native/libhdfspp/lib/rpc/rpc_engine.h | 75 +++++- .../native/libhdfspp/tests/rpc_engine_test.cc | 24 +- 25 files changed, 1016 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c index 5839308..dec8758 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c @@ -86,6 +86,7 @@ void parse_uri(const char * uri_string, struct Uri * uri) { }; int main(int argc, char** argv) { + char error_text[1024]; if (argc != 2) { fprintf(stderr, "usage: cat [hdfs://[:]]/\n"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h index 43187a5..80c3712 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h @@ -29,13 +29,20 @@ namespace hdfs { * Supported event names. These names will stay consistent in libhdfs callbacks. * * Other events not listed here may be seen, but they are not stable and - * should not be counted on. + * should not be counted on. May need to be broken up into more components + * as more events are added. */ static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect"; static constexpr const char * FS_NN_READ_EVENT = "NN::read"; static constexpr const char * FS_NN_WRITE_EVENT = "NN::write"; +// NN failover event due to issues with the current NN; might be standby, might be dead. +// Invokes the fs_event_callback using the nameservice name in the cluster string. +// The uint64_t value argument holds an address that can be reinterpreted as a const char * +// and provides the full URI of the node the failover will attempt to connect to next. +static constexpr const char * FS_NN_FAILOVER_EVENT = "NN::failover"; + static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect"; static constexpr const char * FILE_DN_READ_EVENT = "DN::read"; static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h index 8562f6d..1acfe1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h @@ -20,8 +20,29 @@ #include "common/uri.h" +#include +#include +#include + namespace hdfs { + +struct NamenodeInfo { + NamenodeInfo(const std::string &nameservice, const std::string &nodename, const URI &uri) : + nameservice(nameservice), name(nodename), uri(uri) {} + NamenodeInfo(){}; + //nameservice this belongs to + std::string nameservice; + //node name + std::string name; + //host:port + URI uri; + + //get server hostname and port (aka service) + std::string get_host() const; + std::string get_port() const; +}; + /** * Options to control the behavior of the libhdfspp library. **/ @@ -44,7 +65,7 @@ struct Options { * Maximum number of retries for RPC operations **/ int max_rpc_retries; - static const int kNoRetry = -1; + static const int kNoRetry = 0; static const int kDefaultMaxRpcRetries = kNoRetry; /** @@ -66,6 +87,25 @@ struct Options { URI defaultFS; /** + * Namenodes used to provide HA for this cluster if applicable + **/ + std::map> services; + + + /** + * Client failover attempts before failover gives up + **/ + int failover_max_retries; + static const unsigned int kDefaultFailoverMaxRetries = 15; + + /** + * Client failover attempts before failover gives up if server + * connection is timing out. + **/ + int failover_connection_max_retries; + static const unsigned int kDefaultFailoverConnectionMaxRetries = 0; + + /* * Which form of authentication to use with the server * Default: simple */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h index 1628d8c..f217cad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h @@ -27,15 +27,20 @@ class Status { public: // Create a success status. Status() : code_(0) {}; + + // Note: Avoid calling the Status constructors directly, call the factory methods instead + + // Used for common status types Status(int code, const char *msg); - Status(int code, const char *msg1, const char *msg2); + // Used for server side exceptions reported through RpcResponseProto and similar + Status(int code, const char *exception_class, const char *exception_details); // Factory methods static Status OK(); static Status InvalidArgument(const char *msg); static Status ResourceUnavailable(const char *msg); static Status Unimplemented(); - static Status Exception(const char *expception_class_name, const char *error_message); + static Status Exception(const char *exception_class_name, const char *exception_details); static Status Error(const char *error_message); static Status AuthenticationFailed(); static Status Canceled(); @@ -61,13 +66,28 @@ class Status { kNotADirectory = static_cast(std::errc::not_a_directory), kFileAlreadyExists = static_cast(std::errc::file_exists), kPathIsNotEmptyDirectory = static_cast(std::errc::directory_not_empty), + + // non-errc codes start at 256 kException = 256, kAuthenticationFailed = 257, + kAccessControlException = 258, + kStandbyException = 259, + kSnapshotProtocolException = 260, }; + std::string get_exception_class_str() const { + return exception_class_; + } + + int get_server_exception_type() const { + return code_; + } + private: int code_; std::string msg_; + + std::string exception_class_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 484d1a4..501d3b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc) +add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc) add_library(common $ $) target_link_libraries(common ${LIB_DL}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc index ef67af9..70775b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc @@ -17,6 +17,13 @@ */ #include "common/hdfs_configuration.h" +#include "common/logging.h" + +#include + +#ifndef DEFAULT_SCHEME + #define DEFAULT_SCHEME "hdfs://" +#endif namespace hdfs { @@ -40,6 +47,94 @@ void OptionalSet(T& target, optional value) { target = *value; } +std::vector SplitOnComma(const std::string &s, bool include_empty_strings) { + std::vector res; + std::string buf; + + for(unsigned int i=0;i 0) + res.push_back(buf); + + return res; +} + +// Prepend hdfs:// to string if there isn't already a scheme +// Converts unset optional into empty string +std::string PrependHdfsScheme(optional str) { + if(!str) + return ""; + + if(str.value().find("://") == std::string::npos) + return DEFAULT_SCHEME + str.value(); + return str.value(); +} + +// It's either use this, goto, or a lot of returns w/ status checks +struct ha_parse_error : public std::exception { + std::string desc; + ha_parse_error(const std::string &val) : desc(val) {}; + const char *what() const noexcept override { + return desc.c_str(); + }; +}; + +std::vector HdfsConfiguration::LookupNameService(const std::string &nameservice) { + std::vector namenodes; + try { + // Find namenodes that belong to nameservice + std::vector namenode_ids; + { + std::string service_nodes = std::string("dfs.ha.namenodes.") + nameservice; + optional namenode_list = Get(service_nodes); + if(namenode_list) + namenode_ids = SplitOnComma(namenode_list.value(), false); + else + throw ha_parse_error("unable to find " + service_nodes); + + for(auto it=namenode_ids.begin(); it != namenode_ids.end(); it++) + LOG_INFO(kRPC, << "Namenode: " << *it); + } + + // should this error if we only find 1 NN? + if(namenode_ids.empty()) + throw ha_parse_error("No namenodes found for nameservice " + nameservice); + + // Get URI for each HA namenode + for(auto node_id=namenode_ids.begin(); node_id != namenode_ids.end(); node_id++) { + // find URI + std::string dom_node_name = std::string("dfs.namenode.rpc-address.") + nameservice + "." + *node_id; + optional node_uri = URI::parse_from_string(PrependHdfsScheme(Get(dom_node_name))); + + if(!node_uri) { + throw ha_parse_error("unable to find " + dom_node_name); + } + + URI uri = node_uri.value(); + LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString()); + + NamenodeInfo node(nameservice, *node_id, uri); + namenodes.push_back(node); + } + } catch (ha_parse_error e) { + LOG_ERROR(kRPC, << "HA cluster detected but failed because : " << e.what()); + namenodes.clear(); // Don't return inconsistent view + } + return namenodes; +} + // Interprets the resources to build an Options object Options HdfsConfiguration::GetOptions() { Options result; @@ -50,7 +145,35 @@ Options HdfsConfiguration::GetOptions() { OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey)); OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey)); + + OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts)); + OptionalSet(result.failover_connection_max_retries, GetInt(kDfsClientFailoverConnectionRetriesOnTimeouts)); + + // Load all nameservices if it's HA configured + optional dfs_nameservices = Get("dfs.nameservices"); + if(dfs_nameservices) { + std::string nameservice = dfs_nameservices.value(); + + std::vector all_services = SplitOnComma(nameservice, false); + + // Look up nodes for each nameservice so that FileSystem object can support + // multiple nameservices by ID. + for(const std::string &service : all_services) { + if(service.empty()) + continue; + + LOG_DEBUG(kFileSystem, << "Parsing info for nameservice: " << service); + std::vector nodes = LookupNameService(service); + if(nodes.empty()) { + LOG_WARN(kFileSystem, << "Nameservice \"" << service << "\" declared in config but nodes aren't"); + } else { + result.services[service] = nodes; + } + } + } + optional authentication_value = Get(kHadoopSecurityAuthenticationKey); + if (authentication_value ) { std::string fixed_case_value = fixCase(authentication_value.value()); if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos)) http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h index c6ead66..459364f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h @@ -46,6 +46,10 @@ class HdfsConfiguration : public Configuration { static constexpr const char * kHadoopSecurityAuthentication_simple = "simple"; static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos"; + static constexpr const char * kDfsClientFailoverMaxAttempts = "dfs.client.failover.max.attempts"; + static constexpr const char * kDfsClientFailoverConnectionRetriesOnTimeouts = "dfs.client.failover.connection.retries.on.timeouts"; + + private: friend class ConfigurationLoader; @@ -57,6 +61,7 @@ private: HdfsConfiguration(const ConfigMap &src_map); static std::vector GetDefaultFilenames(); + std::vector LookupNameService(const std::string &nameservice); }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc index 39ed944..ac1c336 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc @@ -177,12 +177,12 @@ LogMessage& LogMessage::operator<<(uint64_t val) { return *this; } - LogMessage& LogMessage::operator<<(void *ptr) { msg_buffer_ << ptr; return *this; } + std::string LogMessage::MsgString() const { return msg_buffer_.str(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h index 9dc0c5f..f807fc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -19,6 +19,8 @@ #ifndef LIB_COMMON_LOGGING_H_ #define LIB_COMMON_LOGGING_H_ +#include + #include "hdfspp/log.h" #include @@ -179,11 +181,10 @@ class LogMessage { LogMessage& operator<<(const std::string*); LogMessage& operator<<(const std::string&); - LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint); - //convert to a string "true"/"false" LogMessage& operator<<(bool); + //integral types LogMessage& operator<<(int32_t); LogMessage& operator<<(uint32_t); LogMessage& operator<<(int64_t); @@ -192,6 +193,10 @@ class LogMessage { //print address as hex LogMessage& operator<<(void *); + //asio types + LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint); + + std::string MsgString() const; private: http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc new file mode 100644 index 0000000..bc38be7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "namenode_info.h" + +#include "common/continuation/asio.h" +#include "common/logging.h" + +#include +#include +#include +#include + +namespace hdfs { + +ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const NamenodeInfo &info) { + nameservice = info.nameservice; + name = info.name; + uri = info.uri; + return *this; +} + + + +std::string ResolvedNamenodeInfo::str() const { + std::stringstream ss; + ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << name << ", uri: " << uri.str(); + ss << ", host: " << uri.get_host(); + auto port = uri.get_port(); + if(port) + ss << ", port: " << port.value(); + else + ss << ", port: unable to parse"; + + ss << ", scheme: " << uri.get_scheme(); + + ss << " ["; + for(unsigned int i=0;i resolved = BulkResolve(ioservice, {info}); + if(resolved.size() != 1) + return false; + + info.endpoints = resolved[0].endpoints; + if(info.endpoints.size() == 0) + return false; + return true; +} + + +std::vector BulkResolve(::asio::io_service *ioservice, const std::vector &nodes) { + using namespace asio_continuation; + + typedef std::vector endpoint_vector; + typedef Pipeline resolve_pipeline_t; + + + std::vector>>> pipelines; + pipelines.reserve(nodes.size()); + + std::vector resolved_info; + // This must never reallocate once async ops begin + resolved_info.reserve(nodes.size()); + + for(unsigned int i=0; istate())); + pipeline->Push(resolve_step); + + // make a status associated with current pipeline + std::shared_ptr> active_stat = std::make_shared>(); + pipelines.push_back(std::make_pair(pipeline, active_stat)); + + pipeline->Run([i,active_stat, &resolved_info](const Status &s, const endpoint_vector &ends){ + resolved_info[i].endpoints = ends; + active_stat->set_value(s); + }); + + } + + // Join all async operations + std::vector return_set; + for(unsigned int i=0; i> promise = pipelines[i].second; + + std::future future = promise->get_future(); + Status stat = future.get(); + + // Clear endpoints if we hit an error + if(!stat.ok()) { + LOG_WARN(kRPC, << "Unable to resolve endpoints for " << nodes[i].uri.str()); + resolved_info[i].endpoints.clear(); + } + } + + return resolved_info; +} + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h new file mode 100644 index 0000000..fdee8d7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_NAMENODE_INFO_H_ +#define COMMON_HDFS_NAMENODE_INFO_H_ + +#include +#include + +#include +#include + +namespace hdfs { + +// Internal representation of namenode info that keeps track +// of its endpoints. +struct ResolvedNamenodeInfo : public NamenodeInfo { + ResolvedNamenodeInfo& operator=(const NamenodeInfo &info); + std::string str() const; + + std::vector<::asio::ip::tcp::endpoint> endpoints; +}; + +// Clear endpoints if set and resolve all of them in parallel. +// Only successful lookups will be placed in the result set. +std::vector BulkResolve(::asio::io_service *ioservice, const std::vector &nodes); + +// Clear endpoints, if any, and resolve them again +// Return true if endpoints were resolved +bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info); + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc index 305ea1a..29b45b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -26,12 +26,27 @@ const int Options::kNoRetry; const int Options::kDefaultMaxRpcRetries; const int Options::kDefaultRpcRetryDelayMs; const unsigned int Options::kDefaultHostExclusionDuration; +const unsigned int Options::kDefaultFailoverMaxRetries; +const unsigned int Options::kDefaultFailoverConnectionMaxRetries; -Options::Options() : rpc_timeout(kDefaultRpcTimeout), rpc_connect_timeout(kDefaultRpcConnectTimeout), +Options::Options() : rpc_timeout(kDefaultRpcTimeout), + rpc_connect_timeout(kDefaultRpcConnectTimeout), max_rpc_retries(kDefaultMaxRpcRetries), rpc_retry_delay_ms(kDefaultRpcRetryDelayMs), host_exclusion_duration(kDefaultHostExclusionDuration), defaultFS(), - authentication(kDefaultAuthentication) -{} + failover_max_retries(kDefaultFailoverMaxRetries), + failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries), + authentication(kDefaultAuthentication) {} + +std::string NamenodeInfo::get_host() const { + return uri.get_host(); +} +std::string NamenodeInfo::get_port() const { + optional p = uri.get_port(); + if(!p) + return std::to_string(-1); + return std::to_string(p.value()); +} + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc index 1857e20..a885d53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc @@ -17,31 +17,64 @@ */ #include "common/retry_policy.h" +#include "common/logging.h" + +#include namespace hdfs { RetryAction FixedDelayRetryPolicy::ShouldRetry( const Status &s, uint64_t retries, uint64_t failovers, bool isIdempotentOrAtMostOnce) const { - (void)s; + LOG_TRACE(kRPC, << "FixedDelayRetryPolicy::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")"); (void)isIdempotentOrAtMostOnce; if (retries + failovers >= max_retries_) { return RetryAction::fail( - "Failovers (" + std::to_string(retries + failovers) + - ") exceeded maximum retries (" + std::to_string(max_retries_) + ")"); + "Failovers and retries(" + std::to_string(retries + failovers) + + ") exceeded maximum retries (" + std::to_string(max_retries_) + "), Status: " + + s.ToString()); } else { return RetryAction::retry(delay_); } } + RetryAction NoRetryPolicy::ShouldRetry( const Status &s, uint64_t retries, uint64_t failovers, bool isIdempotentOrAtMostOnce) const { - (void)s; + LOG_TRACE(kRPC, << "NoRetryPolicy::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")"); (void)retries; (void)failovers; (void)isIdempotentOrAtMostOnce; - return RetryAction::fail("No retry"); + return RetryAction::fail("No retry, Status: " + s.ToString()); +} + + +RetryAction FixedDelayWithFailover::ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const { + (void)isIdempotentOrAtMostOnce; + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")"); + + if(s.code() == ::asio::error::timed_out && failovers < max_failover_retries_) { + // Try connecting to another NN in case this one keeps timing out + // Can add the backoff wait specified by dfs.client.failover.sleep.base.millis here + return RetryAction::failover(delay_); + } + + if(retries < max_retries_) { + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries < max_retries_"); + return RetryAction::retry(delay_); + } else if (retries >= max_retries_ && failovers < max_failover_retries_) { + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= max_retries_ && failovers < max_failover_retries_"); + return RetryAction::failover(delay_); + } else if (retries >= max_retries_ && failovers == max_failover_retries_) { + LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= max_retries_ && failovers == max_failover_retries_"); + // 1 last retry on new connection + return RetryAction::retry(delay_); + } + + return RetryAction::fail("Retry and failover didn't work, Status: " + s.ToString()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h index afa4f7d..0b5bd80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h @@ -43,13 +43,31 @@ class RetryAction { static RetryAction retry(uint64_t delay) { return RetryAction(RETRY, delay, ""); } - static RetryAction failover() { - return RetryAction(FAILOVER_AND_RETRY, 0, ""); + static RetryAction failover(uint64_t delay) { + return RetryAction(FAILOVER_AND_RETRY, delay, ""); } + + std::string decision_str() const { + switch(action) { + case FAIL: return "FAIL"; + case RETRY: return "RETRY"; + case FAILOVER_AND_RETRY: return "FAILOVER_AND_RETRY"; + default: return "UNDEFINED ACTION"; + } + }; }; class RetryPolicy { + protected: + uint64_t delay_; + uint64_t max_retries_; + RetryPolicy(uint64_t delay, uint64_t max_retries) : + delay_(delay), max_retries_(max_retries) {} + public: + RetryPolicy() {}; + + virtual ~RetryPolicy() {} /* * If there was an error in communications, responds with the configured * action to take. @@ -58,23 +76,71 @@ class RetryPolicy { uint64_t failovers, bool isIdempotentOrAtMostOnce) const = 0; - virtual ~RetryPolicy() {} + virtual std::string str() const { return "Base RetryPolicy"; } }; + +/* + * Overview of how the failover retry policy works: + * + * 1) Acts the same as FixedDelayRetryPolicy in terms of connection retries against a single NN + * with two differences: + * a) If we have retried more than the maximum number of retries we will failover to the + * other node and reset the retry counter rather than error out. It will begin the same + * routine on the other node. + * b) If an attempted connection times out and max_failover_conn_retries_ is less than the + * normal number of retries it will failover sooner. The connection timeout retry limit + * defaults to zero; the idea being that if a node is unresponsive it's better to just + * try the secondary rather than incur the timeout cost multiple times. + * + * 2) Keeps track of the failover count in the same way that the retry count is tracked. If failover + * is triggered more than a set number (dfs.client.failover.max.attempts) of times then the operation + * will error out in the same way that a non-HA operation would error if it ran out of retries. + * + * 3) Failover between namenodes isn't instantaneous so the RPC retry delay is reused to add a small + * delay between failover attempts. This helps prevent the client from quickly using up all of + * its failover attempts while thrashing between namenodes that are both temporarily marked standby. + * Note: The java client implements exponential backoff here with a base other than the rpc delay, + * and this will do the same here in the future. This doesn't do any sort of exponential backoff + * and the name can be changed to ExponentialDelayWithFailover when backoff is implemented. + */ +class FixedDelayWithFailover : public RetryPolicy { + public: + FixedDelayWithFailover(uint64_t delay, uint64_t max_retries, + uint64_t max_failover_retries, + uint64_t max_failover_conn_retries) + : RetryPolicy(delay, max_retries), max_failover_retries_(max_failover_retries), + max_failover_conn_retries_(max_failover_conn_retries) {} + + RetryAction ShouldRetry(const Status &s, uint64_t retries, + uint64_t failovers, + bool isIdempotentOrAtMostOnce) const override; + + std::string str() const override { return "FixedDelayWithFailover"; } + + private: + // Attempts to fail over + uint64_t max_failover_retries_; + // Attempts to fail over if connection times out rather than + // tring to connect and wait for the timeout delay failover_retries_ + // times. + uint64_t max_failover_conn_retries_; +}; + + /* * Returns a fixed delay up to a certain number of retries */ class FixedDelayRetryPolicy : public RetryPolicy { public: FixedDelayRetryPolicy(uint64_t delay, uint64_t max_retries) - : delay_(delay), max_retries_(max_retries) {} + : RetryPolicy(delay, max_retries) {} RetryAction ShouldRetry(const Status &s, uint64_t retries, uint64_t failovers, bool isIdempotentOrAtMostOnce) const override; - private: - uint64_t delay_; - uint64_t max_retries_; + + std::string str() const override { return "FixedDelayRetryPolicy"; } }; /* @@ -82,9 +148,12 @@ class FixedDelayRetryPolicy : public RetryPolicy { */ class NoRetryPolicy : public RetryPolicy { public: + NoRetryPolicy() {}; RetryAction ShouldRetry(const Status &s, uint64_t retries, uint64_t failovers, bool isIdempotentOrAtMostOnce) const override; + + std::string str() const override { return "NoRetryPolicy"; } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc index d6c9875..796b1a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -21,33 +21,55 @@ #include #include #include +#include namespace hdfs { -const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException"; -const char * kStatusSaslException = "javax.security.sasl.SaslException"; -const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException"; -const char * kPathNotFoundException2 = "java.io.FileNotFoundException"; -const char * kPathIsNotDirectoryException = "org.apache.hadoop.fs.PathIsNotDirectoryException"; -const char * kSnapshotException = "org.apache.hadoop.hdfs.protocol.SnapshotException"; -const char * kFileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException"; +// Server side exceptions that we capture from the RpcResponseHeaderProto +const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException"; +const char * kPathIsNotDirectoryException = "org.apache.hadoop.fs.PathIsNotDirectoryException"; +const char * kSnapshotException = "org.apache.hadoop.hdfs.protocol.SnapshotException"; +const char * kStatusStandbyException = "org.apache.hadoop.ipc.StandbyException"; +const char * kStatusSaslException = "javax.security.sasl.SaslException"; +const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException"; +const char * kPathNotFoundException2 = "java.io.FileNotFoundException"; +const char * kFileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException"; const char * kPathIsNotEmptyDirectoryException = "org.apache.hadoop.fs.PathIsNotEmptyDirectoryException"; -Status::Status(int code, const char *msg1) : code_(code) { + +const static std::map kKnownServerExceptionClasses = { + {kStatusAccessControlException, Status::kAccessControlException}, + {kPathIsNotDirectoryException, Status::kNotADirectory}, + {kSnapshotException, Status::kSnapshotProtocolException}, + {kStatusStandbyException, Status::kStandbyException}, + {kStatusSaslException, Status::kAuthenticationFailed}, + {kPathNotFoundException, Status::kPathNotFound}, + {kPathNotFoundException2, Status::kPathNotFound}, + {kFileAlreadyExistsException, Status::kFileAlreadyExists}, + {kPathIsNotEmptyDirectoryException, Status::kPathIsNotEmptyDirectory} + }; + + +Status::Status(int code, const char *msg1) + : code_(code) { if(msg1) { msg_ = msg1; } } -Status::Status(int code, const char *msg1, const char *msg2) : code_(code) { - std::stringstream ss; - if(msg1) { - ss << msg1; - if(msg2) { - ss << ":" << msg2; - } +Status::Status(int code, const char *exception_class_name, const char *exception_details) + : code_(code) { + // If we can assure this never gets nullptr args this can be + // in the initializer list. + if(exception_class_name) + exception_class_ = exception_class_name; + if(exception_details) + msg_ = exception_details; + + std::map::const_iterator it = kKnownServerExceptionClasses.find(exception_class_); + if(it != kKnownServerExceptionClasses.end()) { + code_ = it->second; } - msg_ = ss.str(); } @@ -72,6 +94,7 @@ Status Status::Unimplemented() { } Status Status::Exception(const char *exception_class_name, const char *error_message) { + // Server side exception but can be represented by std::errc codes if (exception_class_name && (strcmp(exception_class_name, kStatusAccessControlException) == 0) ) return Status(kPermissionDenied, error_message); else if (exception_class_name && (strcmp(exception_class_name, kStatusSaslException) == 0)) @@ -81,13 +104,13 @@ Status Status::Exception(const char *exception_class_name, const char *error_mes else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException2) == 0)) return Status(kPathNotFound, error_message); else if (exception_class_name && (strcmp(exception_class_name, kPathIsNotDirectoryException) == 0)) - return Status(kNotADirectory, error_message); + return Status(kNotADirectory, error_message); else if (exception_class_name && (strcmp(exception_class_name, kSnapshotException) == 0)) - return Status(kInvalidArgument, error_message); + return Status(kInvalidArgument, error_message); else if (exception_class_name && (strcmp(exception_class_name, kFileAlreadyExistsException) == 0)) - return Status(kFileAlreadyExists, error_message); + return Status(kFileAlreadyExists, error_message); else if (exception_class_name && (strcmp(exception_class_name, kPathIsNotEmptyDirectoryException) == 0)) - return Status(kPathIsNotEmptyDirectory, error_message); + return Status(kPathIsNotEmptyDirectory, error_message); else return Status(kException, exception_class_name, error_message); } @@ -101,15 +124,19 @@ Status Status::AuthenticationFailed() { } Status Status::Canceled() { - return Status(kOperationCanceled,"Operation canceled"); + return Status(kOperationCanceled, "Operation canceled"); } - std::string Status::ToString() const { if (code_ == kOk) { return "OK"; } - return msg_; + std::stringstream ss; + if(!exception_class_.empty()) { + ss << exception_class_ << ":"; + } + ss << msg_; + return ss.str(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc index 616cf3f..a54be8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc @@ -133,11 +133,11 @@ std::vector split(const std::string input, char separator) std::string copy_range(const UriTextRangeA *r) { - const int size = r->afterLast - r->first; - if (size) { + const int size = r->afterLast - r->first; + if (size) { return std::string(r->first, size); - } - return ""; + } + return ""; } bool parse_int(const UriTextRangeA *r, optional * result) { @@ -161,14 +161,14 @@ bool parse_int(const UriTextRangeA *r, optional * result) { std::vector copy_path(const UriPathSegmentA *ps) { std::vector result; - if (nullptr == ps) + if (nullptr == ps) return result; - for (; ps != 0; ps = ps->next) { - result.push_back(copy_range(&ps->text)); - } + for (; ps != 0; ps = ps->next) { + result.push_back(copy_range(&ps->text)); + } - return result; + return result; } void parse_user_info(const UriTextRangeA *r, std::string * user, std::string * pass) { @@ -204,7 +204,7 @@ std::vector > parse_query(const char *first uriFreeQueryListA(query); } - return result; + return result; } @@ -368,4 +368,32 @@ void URI::remove_queries(const std::string &q_name, bool encoded_input) } } +std::string URI::GetDebugString() const { + std::stringstream ss; + ss << std::endl; + ss << "\t" << "uri.str() = \"" << str() << "\"" << std::endl; + ss << "\t" << "uri.get_scheme() = \"" << get_scheme() << "\"" << std::endl; + ss << "\t" << "uri.get_host() = \"" << get_host() << "\"" << std::endl; + + if(!port) + ss << "\t" << "uri.get_port() = unset optional" << std::endl; + else + ss << "\t" << "uri.get_port() = \"" << port.value() << "\"" << std::endl; + + ss << "\t" << "uri.get_path() = \"" << get_path() << "\"" << std::endl; + ss << "\t" << "uri.get_fragment() = \"" << get_fragment() << "\"" << std::endl; + + + std::vector > elems = get_query_elements(); + + if(elems.size() > 0) + ss << "\t" << "Query elements:" << std::endl; + + for(auto qry = elems.begin(); qry != elems.end(); qry++) { + ss << "\t\t" << qry->first << " -> " << qry->second << std::endl; + } + + return ss.str(); +} + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h index e6fbd78..87f6919 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h @@ -118,11 +118,13 @@ public: { fragment = to_encoded(encoded_input,f); } std::string str(bool encoded_output=true) const; + + // Get a string with each URI field printed on a seperate line + std::string GetDebugString() const; }; inline std::ostream& operator<<(std::ostream &out, const URI &uri) { return out << uri.str(); } } - -#endif \ No newline at end of file +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index f3c32c2..ae55e0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -18,6 +18,8 @@ #include "filesystem.h" +#include "common/namenode_info.h" + #include #include #include @@ -126,9 +128,35 @@ void FileSystemImpl::Connect(const std::string &server, handler (Status::Error("Null IoService"), this); } - cluster_name_ = server + ":" + service; + // DNS lookup here for namenode(s) + std::vector resolved_namenodes; + + auto name_service = options_.services.find(server); + if(name_service != options_.services.end()) { + cluster_name_ = name_service->first; + resolved_namenodes = BulkResolve(&io_service_->io_service(), name_service->second); + } else { + cluster_name_ = server + ":" + service; + + // tmp namenode info just to get this in the right format for BulkResolve + NamenodeInfo tmp_info; + optional uri = URI::parse_from_string("hdfs://" + cluster_name_); + if(!uri) { + LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << cluster_name_); + handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this); + } + tmp_info.uri = uri.value(); + + resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info}); + } + + for(unsigned int i=0;i file_info) { + if(!stat.ok()) { + LOG_INFO(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString()); + if(stat.get_server_exception_type() == Status::kStandbyException) { + LOG_INFO(kFileSystem, << "Operation not allowed on standby datanode"); + } + } + handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) : nullptr); }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc index ee566ea..8b4c126 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -27,6 +27,7 @@ #include #include #include +#include #define FMT_THIS_ADDR "this=" << (void*)this @@ -51,20 +52,9 @@ Status NameNodeOperations::CheckValidPermissionMask(short permissions) { } void NameNodeOperations::Connect(const std::string &cluster_name, - const std::string &server, - const std::string &service, - std::function &&handler) { - using namespace asio_continuation; - typedef std::vector State; - auto m = Pipeline::Create(); - m->Push(Resolve(io_service_, server, service, - std::back_inserter(m->state()))) - .Push(Bind([this, m, cluster_name](const Continuation::Next &next) { - engine_.Connect(cluster_name, m->state(), next); - })); - m->Run([this, handler](const Status &status, const State &) { - handler(status); - }); + const std::vector &servers, + std::function &&handler) { + engine_.Connect(cluster_name, servers, handler); } void NameNodeOperations::GetBlockLocations(const std::string & path, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index 4f4d63e..9651570 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -21,6 +21,7 @@ #include "rpc/rpc_engine.h" #include "hdfspp/statinfo.h" #include "hdfspp/fsinfo.h" +#include "common/namenode_info.h" #include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.hrpc.inl" @@ -45,13 +46,12 @@ public: const char *protocol_name, int protocol_version) : io_service_(io_service), engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), - namenode_(& engine_) {} + namenode_(& engine_), options_(options) {} static Status CheckValidPermissionMask(short permissions); void Connect(const std::string &cluster_name, - const std::string &server, - const std::string &service, + const std::vector &servers, std::function &&handler); void GetBlockLocations(const std::string & path, @@ -104,6 +104,7 @@ private: ::asio::io_service * io_service_; RpcEngine engine_; ClientNamenodeProtocol namenode_; + const Options options_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index be6d7bd..03f83f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -118,7 +118,8 @@ Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id_(call_id), timer_(engine->io_service()), handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry) { + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { ConstructPayload(&payload_, &request); } @@ -129,7 +130,8 @@ Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id_(call_id), timer_(engine->io_service()), handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry) { + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { ConstructPayload(&payload_, request); } @@ -138,10 +140,13 @@ Request::Request(LockFreeRpcEngine *engine, Handler &&handler) call_id_(-1), timer_(engine->io_service()), handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry) { + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { } void Request::GetPacket(std::string *res) const { + LOG_TRACE(kRPC, << "Request::GetPacket called"); + if (payload_.empty()) return; @@ -159,9 +164,27 @@ void Request::GetPacket(std::string *res) const { void Request::OnResponseArrived(pbio::CodedInputStream *is, const Status &status) { + LOG_TRACE(kRPC, << "Request::OnResponseArrived called"); handler_(is, status); } +std::string Request::GetDebugString() const { + // Basic description of this object, aimed at debugging + std::stringstream ss; + ss << "\nRequest Object:\n"; + ss << "\tMethod name = \"" << method_name_ << "\"\n"; + ss << "\tCall id = " << call_id_ << "\n"; + ss << "\tRetry Count = " << retry_count_ << "\n"; + ss << "\tFailover count = " << failover_count_ << "\n"; + return ss.str(); +} + +int Request::IncrementFailoverCount() { + // reset retry count when failing over + retry_count_ = 0; + return failover_count_++; +} + RpcConnection::RpcConnection(LockFreeRpcEngine *engine) : engine_(engine), connected_(kNotYetConnected) {} @@ -258,7 +281,7 @@ void RpcConnection::AsyncFlushPendingRequests() { }); } -void RpcConnection::HandleRpcResponse(std::shared_ptr response) { +Status RpcConnection::HandleRpcResponse(std::shared_ptr response) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], response->data_.size())); @@ -270,7 +293,7 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr response) { auto req = RemoveFromRunningQueue(h.callid()); if (!req) { LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid()); - return; + return Status::Error("Rpc response with unknown call id"); } Status status; @@ -288,9 +311,22 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr response) { Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); } + if(status.get_server_exception_type() == Status::kStandbyException) { + LOG_WARN(kRPC, << "Tried to connect to standby. status = " << status.ToString()); + + // We got the request back, but it needs to be resent to the other NN + std::vector> reqs_to_redirect = {req}; + PrependRequests_locked(reqs_to_redirect); + + CommsError(status); + return status; + } + io_service().post([req, response, status]() { req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock }); + + return Status::OK(); } void RpcConnection::HandleRpcTimeout(std::shared_ptr req, @@ -437,6 +473,15 @@ void RpcConnection::PreEnqueueRequests( // Don't start sending yet; will flush when connected } +// Only call when already holding conn state lock +void RpcConnection::PrependRequests_locked( std::vector> requests) { + LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called"); + + pending_requests_.insert(pending_requests_.begin(), requests.begin(), + requests.end()); + // Don't start sending yet; will flush when connected +} + void RpcConnection::SetEventHandlers(std::shared_ptr event_handlers) { std::lock_guard state_lock(connection_state_lock_); event_handlers_ = event_handlers; @@ -452,6 +497,7 @@ void RpcConnection::SetClusterName(std::string cluster_name) { void RpcConnection::CommsError(const Status &status) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + LOG_DEBUG(kRPC, << "RpcConnection::CommsError called"); Disconnect(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 330f9b1..869be40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -112,6 +112,8 @@ void RpcConnectionImpl::Connect( template void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> &server) { + + LOG_INFO(kRPC, << "ConnectAndFlush called"); std::lock_guard state_lock(connection_state_lock_); if (server.empty()) { @@ -408,7 +410,16 @@ void RpcConnectionImpl::OnRecvCompleted(const ::asio::error_code &ori OnRecvCompleted(ec, size); }); } else if (current_response_state_->state_ == Response::kParseResponse) { - HandleRpcResponse(current_response_state_); + // Check return status from the RPC response. We may have received a msg + // indicating a server side error. + + Status stat = HandleRpcResponse(current_response_state_); + + if(stat.get_server_exception_type() == Status::kStandbyException) { + // May need to bail out, connect to new NN, and restart loop + LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect"); + } + current_response_state_ = nullptr; StartReading(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index a8438b1..be69d95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -19,15 +19,126 @@ #include "rpc_connection.h" #include "common/util.h" #include "common/logging.h" +#include "common/namenode_info.h" #include "optional.hpp" #include +#include namespace hdfs { template using optional = std::experimental::optional; +HANamenodeTracker::HANamenodeTracker(const std::vector &servers, + ::asio::io_service *ioservice, + std::shared_ptr event_handlers) + : enabled_(false), resolved_(false), + ioservice_(ioservice), event_handlers_(event_handlers) +{ + LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes"); + for(unsigned int i=0;i= 2) { + LOG_TRACE(kRPC, << "Creating HA namenode tracker"); + if(servers.size() > 2) { + LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used."); + } + + active_info_ = servers[0]; + standby_info_ = servers[1]; + LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str()); + LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str()); + + enabled_ = true; + if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) { + resolved_ = true; + } + } +} + + +HANamenodeTracker::~HANamenodeTracker() { } + + +static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) { + std::stringstream ss; + for(unsigned int i=0; icall(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), + reinterpret_cast(active_info_.uri.str().c_str())); + failover_node = active_info_; + } else if(IsCurrentStandby_locked(current_endpoint)) { + // Connected to standby + if(event_handlers_) + event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), + reinterpret_cast(active_info_.uri.str().c_str())); + failover_node = active_info_; + } else { + // Invalid state, throw for testing + std::string ep1 = format_endpoints(active_info_.endpoints); + std::string ep2 = format_endpoints(standby_info_.endpoints); + + std::stringstream msg; + msg << "Looked for " << current_endpoint << " in\n"; + msg << ep1 << " and\n"; + msg << ep2 << std::endl; + + LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out."); + throw std::runtime_error(msg.str()); + } + + if(failover_node.endpoints.empty()) { + LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again"); + if(!ResolveInPlace(ioservice_, failover_node)) { + LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str() + << "failed. Please make sure your configuration is up to date."); + } + } + return failover_node; +} + +bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const { + for(unsigned int i=0;i()) { - - auth_info_.setUser(user_name); - if (options.authentication == Options::kKerberos) { - auth_info_.setMethod(AuthInfo::kKerberos); - } + event_handlers_(std::make_shared()) +{ + LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); - LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); + auth_info_.setUser(user_name); + if (options.authentication == Options::kKerberos) { + auth_info_.setMethod(AuthInfo::kKerberos); + } } void RpcEngine::Connect(const std::string &cluster_name, - const std::vector<::asio::ip::tcp::endpoint> &server, + const std::vector servers, RpcCallback &handler) { std::lock_guard state_lock(engine_state_lock_); LOG_DEBUG(kRPC, << "RpcEngine::Connect called"); - last_endpoints_ = server; + last_endpoints_ = servers[0].endpoints; cluster_name_ = cluster_name; + LOG_TRACE(kRPC, << "Got cluster name \"" << cluster_name << "\" in RpcEngine::Connect") + + ha_persisted_info_.reset(new HANamenodeTracker(servers, io_service_, event_handlers_)); + if(!ha_persisted_info_->is_enabled()) { + ha_persisted_info_.reset(); + } + + // Construct retry policy after we determine if config is HA + retry_policy_ = std::move(MakeRetryPolicy(options_)); conn_ = InitializeConnection(); conn_->Connect(last_endpoints_, auth_info_, handler); @@ -72,8 +191,16 @@ void RpcEngine::Shutdown() { std::unique_ptr RpcEngine::MakeRetryPolicy(const Options &options) { LOG_DEBUG(kRPC, << "RpcEngine::MakeRetryPolicy called"); - if (options.max_rpc_retries > 0) { - return std::unique_ptr(new FixedDelayRetryPolicy(options.rpc_retry_delay_ms, options.max_rpc_retries)); + + if(ha_persisted_info_) { + LOG_INFO(kRPC, << "Cluster is HA configued so policy will default to HA until a knob is implemented"); + return std::unique_ptr(new FixedDelayWithFailover(options.rpc_retry_delay_ms, + options.max_rpc_retries, + options.failover_max_retries, + options.failover_connection_max_retries)); + } else if (options.max_rpc_retries > 0) { + return std::unique_ptr(new FixedDelayRetryPolicy(options.rpc_retry_delay_ms, + options.max_rpc_retries)); } else { return nullptr; } @@ -81,6 +208,15 @@ std::unique_ptr RpcEngine::MakeRetryPolicy(const Options &opt void RpcEngine::TEST_SetRpcConnection(std::shared_ptr conn) { conn_ = conn; + retry_policy_ = std::move(MakeRetryPolicy(options_)); +} + +void RpcEngine::TEST_SetRetryPolicy(std::unique_ptr policy) { + retry_policy_ = std::move(policy); +} + +std::unique_ptr RpcEngine::TEST_GenerateRetryPolicyUsingOptions() { + return MakeRetryPolicy(options_); } void RpcEngine::AsyncRpc( @@ -131,7 +267,7 @@ void RpcEngine::AsyncRpcCommsError( const Status &status, std::shared_ptr failedConnection, std::vector> pendingRequests) { - LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); + LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); io_service().post([this, status, failedConnection, pendingRequests]() { RpcCommsError(status, failedConnection, pendingRequests); @@ -142,44 +278,52 @@ void RpcEngine::RpcCommsError( const Status &status, std::shared_ptr failedConnection, std::vector> pendingRequests) { - (void)status; - - LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); + LOG_WARN(kRPC, << "RpcEngine::RpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << pendingRequests.size()); std::lock_guard state_lock(engine_state_lock_); // If the failed connection is the current one, shut it down // It will be reconnected when there is work to do if (failedConnection == conn_) { - conn_.reset(); + LOG_INFO(kRPC, << "Disconnecting from failed RpcConnection"); + conn_.reset(); } - auto head_action = optional(); + optional head_action = optional(); - // Filter out anything with too many retries already - for (auto it = pendingRequests.begin(); it < pendingRequests.end();) { - auto req = *it; + //We are talking to the Standby NN, let's talk to the active one instead. + if(ha_persisted_info_ && status.get_server_exception_type() == Status::kStandbyException) { + LOG_INFO(kRPC, << "Received StandbyException. Failing over."); + head_action = RetryAction::failover(std::max(0,options_.rpc_retry_delay_ms)); + } else { + // Filter out anything with too many retries already + for (auto it = pendingRequests.begin(); it < pendingRequests.end();) { + auto req = *it; - RetryAction retry = RetryAction::fail(""); // Default to fail - if (retry_policy()) { - retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), 0, true); - } + LOG_DEBUG(kRPC, << req->GetDebugString()); - if (retry.action == RetryAction::FAIL) { - // If we've exceeded the maximum retry, take the latest error and pass it - // on. There might be a good argument for caching the first error - // rather than the last one, that gets messy + RetryAction retry = RetryAction::fail(""); // Default to fail - io_service().post([req, status]() { - req->OnResponseArrived(nullptr, status); // Never call back while holding a lock - }); - it = pendingRequests.erase(it); - } else { - if (!head_action) { - head_action = retry; + if (retry_policy()) { + retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), req->get_failover_count(), true); } - ++it; + if (retry.action == RetryAction::FAIL) { + // If we've exceeded the maximum retry, take the latest error and pass it + // on. There might be a good argument for caching the first error + // rather than the last one, that gets messy + + io_service().post([req, status]() { + req->OnResponseArrived(nullptr, status); // Never call back while holding a lock + }); + it = pendingRequests.erase(it); + } else { + if (!head_action) { + head_action = retry; + } + + ++it; + } } } @@ -189,8 +333,31 @@ void RpcEngine::RpcCommsError( head_action && head_action->action != RetryAction::FAIL; if (haveRequests) { + LOG_TRACE(kRPC, << "Have " << pendingRequests.size() << " requests to resend"); bool needNewConnection = !conn_; if (needNewConnection) { + LOG_DEBUG(kRPC, << "Creating a new NN conection"); + + + // If HA is enabled and we have valid HA info then fail over to the standby (hopefully now active) + if(head_action->action == RetryAction::FAILOVER_AND_RETRY && ha_persisted_info_) { + + for(unsigned int i=0; iIncrementFailoverCount(); + + ResolvedNamenodeInfo new_active_nn_info = + ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_[0]/*reverse lookup*/); + + LOG_INFO(kRPC, << "Going to try connecting to alternate Datanode: " << new_active_nn_info.uri.str()); + + if(ha_persisted_info_->is_resolved()) { + last_endpoints_ = new_active_nn_info.endpoints; + } else { + LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail over. has info=" + << ha_persisted_info_->is_enabled() << " resolved=" << ha_persisted_info_->is_resolved()); + } + } + conn_ = InitializeConnection(); conn_->PreEnqueueRequests(pendingRequests); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 5de7d53..47618a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -24,7 +24,11 @@ #include "common/auth_info.h" #include "common/retry_policy.h" #include "common/libhdfs_events_impl.h" +#include "common/util.h" +#include "common/continuation/asio.h" +#include "common/logging.h" #include "common/new_delete.h" +#include "common/namenode_info.h" #include #include @@ -38,6 +42,7 @@ #include #include #include +#include namespace hdfs { @@ -84,10 +89,15 @@ class Request { std::string method_name() const { return method_name_; } ::asio::deadline_timer &timer() { return timer_; } int IncrementRetryCount() { return retry_count_++; } + int IncrementFailoverCount(); void GetPacket(std::string *res) const; void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, const Status &status); + int get_failover_count() {return failover_count_;} + + std::string GetDebugString() const; + private: LockFreeRpcEngine *const engine_; const std::string method_name_; @@ -98,6 +108,7 @@ class Request { const Handler handler_; int retry_count_; + int failover_count_; }; /* @@ -137,6 +148,9 @@ class RpcConnection : public std::enable_shared_from_this { // on connect void PreEnqueueRequests(std::vector> requests); + // Put requests at the front of the current request queue + void PrependRequests_locked(std::vector> requests); + void SetEventHandlers(std::shared_ptr event_handlers); void SetClusterName(std::string cluster_name); @@ -190,7 +204,7 @@ class RpcConnection : public std::enable_shared_from_this { static std::string SerializeRpcRequest( const std::string &method_name, const ::google::protobuf::MessageLite *req); - void HandleRpcResponse(std::shared_ptr response); + Status HandleRpcResponse(std::shared_ptr response); void HandleRpcTimeout(std::shared_ptr req, const ::asio::error_code &ec); void CommsError(const Status &status); @@ -261,6 +275,55 @@ public: virtual const Options &options() const = 0; }; + +/* + * Tracker gives the RpcEngine a quick way to use an endpoint that just + * failed in order to lookup a set of endpoints for a failover node. + * + * Note: For now this only deals with 2 NameNodes, but that's the default + * anyway. + */ +class HANamenodeTracker { + public: + HANamenodeTracker(const std::vector &servers, + ::asio::io_service *ioservice, + std::shared_ptr event_handlers_); + + virtual ~HANamenodeTracker(); + + bool is_enabled() const { return enabled_; } + bool is_resolved() const { return resolved_; } + + // Get node opposite of the current one if possible (swaps active/standby) + // Note: This will always mutate internal state. Use IsCurrentActive/Standby to + // get info without changing state + ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint); + + bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; + bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; + + private: + // If HA should be enabled, according to our options and runtime info like # nodes provided + bool enabled_; + // If we were able to resolve at least 1 HA namenode + bool resolved_; + + // Keep service in case a second round of DNS lookup is required + ::asio::io_service *ioservice_; + + // Event handlers, for now this is the simplest place to catch all failover events + // and push info out to client application. Possibly move into RPCEngine. + std::shared_ptr event_handlers_; + + // Only support 1 active and 1 standby for now. + ResolvedNamenodeInfo active_info_; + ResolvedNamenodeInfo standby_info_; + + // Aquire when switching from active-standby + std::mutex swap_lock_; +}; + + /* * An engine for reliable communication with a NameNode. Handles connection, * retry, and (someday) failover of the requested messages. @@ -285,7 +348,7 @@ class RpcEngine : public LockFreeRpcEngine { const char *protocol_name, int protocol_version); void Connect(const std::string & cluster_name, - const std::vector<::asio::ip::tcp::endpoint> &server, + const std::vector servers, RpcCallback &handler); void AsyncRpc(const std::string &method_name, @@ -297,7 +360,6 @@ class RpcEngine : public LockFreeRpcEngine { const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp); - void Start(); void Shutdown(); /* Enqueues a CommsError without acquiring a lock*/ @@ -313,6 +375,8 @@ class RpcEngine : public LockFreeRpcEngine { int NextCallId() override { return ++call_id_; } void TEST_SetRpcConnection(std::shared_ptr conn); + void TEST_SetRetryPolicy(std::unique_ptr policy); + std::unique_ptr TEST_GenerateRetryPolicyUsingOptions(); const std::string &client_name() const override { return client_name_; } const std::string &user_name() const override { return auth_info_.getUser(); } @@ -338,7 +402,7 @@ private: const std::string client_name_; const std::string protocol_name_; const int protocol_version_; - const std::unique_ptr retry_policy_; //null --> no retry + std::unique_ptr retry_policy_; //null --> no retry AuthInfo auth_info_; std::string cluster_name_; std::atomic_int call_id_; @@ -348,6 +412,9 @@ private: std::mutex engine_state_lock_; + // Keep endpoint info for all HA connections, a non-null ptr indicates + // that HA info was found in the configuation. + std::unique_ptr ha_persisted_info_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cb0dad5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index defe95d..3e8c93f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -20,6 +20,7 @@ #include "test.pb.h" #include "RpcHeader.pb.h" #include "rpc/rpc_connection.h" +#include "common/namenode_info.h" #include @@ -43,10 +44,10 @@ namespace pbio = ::google::protobuf::io; namespace hdfs { -std::vector> make_endpoint() { - std::vector> result; - result.push_back(asio::ip::basic_endpoint()); - return result; +std::vector make_endpoint() { + ResolvedNamenodeInfo result; + result.endpoints.push_back(asio::ip::basic_endpoint()); + return std::vector({result}); } class MockRPCConnection : public MockConnectionBase { @@ -68,7 +69,7 @@ class SharedConnectionEngine : public RpcEngine { protected: std::shared_ptr NewConnection() override { // Stuff in some dummy endpoints so we don't error out - last_endpoints_ = make_endpoint(); + last_endpoints_ = make_endpoint()[0].endpoints; return std::make_shared>(this); } @@ -180,6 +181,11 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) { options.rpc_retry_delay_ms = 0; SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1); + // Normally determined during RpcEngine::Connect, but in this case options + // provides enough info to determine policy here. + engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions()); + + EchoResponseProto server_resp; server_resp.set_message("foo"); @@ -215,6 +221,10 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) { options.rpc_retry_delay_ms = 1; SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1); + // Normally determined during RpcEngine::Connect, but in this case options + // provides enough info to determine policy here. + engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions()); + EchoResponseProto server_resp; server_resp.set_message("foo"); @@ -339,6 +349,10 @@ TEST(RpcEngineTest, TestEventCallbacks) options.rpc_retry_delay_ms = 0; SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1); + // Normally determined during RpcEngine::Connect, but in this case options + // provides enough info to determine policy here. + engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions()); + // Set up event callbacks int calls = 0; std::vector callbacks; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org