From common-commits-return-80520-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Fri Mar 23 01:56:46 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DC221180676 for ; Fri, 23 Mar 2018 01:56:43 +0100 (CET) Received: (qmail 17397 invoked by uid 500); 23 Mar 2018 00:56:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 14635 invoked by uid 99); 23 Mar 2018 00:56:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Mar 2018 00:56:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 30668F6799; Fri, 23 Mar 2018 00:56:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Fri, 23 Mar 2018 00:57:04 -0000 Message-Id: <8c872367f47942818f0907c9ed37e557@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [44/51] [abbrv] [partial] hadoop git commit: Revert "Merge branch 'trunk' into HDFS-7240" http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h deleted file mode 100644 index 1d596ad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#ifndef LIBHDFSPP_BADDATANODETRACKER_H -#define LIBHDFSPP_BADDATANODETRACKER_H - -#include -#include -#include -#include -#include - -#include "hdfspp/options.h" -#include "hdfspp/hdfspp.h" - -namespace hdfs { - -/** - * ExclusionSet is a simple override that can be filled with known - * bad node UUIDs and passed to AsyncPreadSome. - **/ -class ExclusionSet : public NodeExclusionRule { - public: - ExclusionSet(const std::set& excluded); - virtual ~ExclusionSet(); - virtual bool IsBadNode(const std::string& node_uuid); - - private: - std::set excluded_; -}; - -/** - * BadDataNodeTracker keeps a timestamped list of datanodes that have - * failed during past operations. Entries present in this list will - * not be used for new requests. Entries will be evicted from the list - * after a period of time has elapsed; the default is 10 minutes. - */ -class BadDataNodeTracker : public NodeExclusionRule { - public: - BadDataNodeTracker(const Options& options = Options()); - virtual ~BadDataNodeTracker(); - /* add a bad DN to the list */ - void AddBadNode(const std::string& dn); - /* check if a node should be excluded */ - virtual bool IsBadNode(const std::string& dn); - /* only for tests, shift clock by t milliseconds*/ - void TEST_set_clock_shift(int t); - - private: - typedef std::chrono::steady_clock Clock; - typedef std::chrono::time_point TimePoint; - bool TimeoutExpired(const TimePoint& t); - /* after timeout_duration_ elapses remove DN */ - const unsigned int timeout_duration_; /* milliseconds */ - std::map datanodes_; - std::mutex datanodes_update_lock_; - int test_clock_shift_; -}; -} -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc deleted file mode 100644 index ba702b0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ /dev/null @@ -1,370 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filehandle.h" -#include "common/continuation/continuation.h" -#include "common/logging.h" -#include "connection/datanodeconnection.h" -#include "reader/block_reader.h" -#include "hdfspp/events.h" - -#include -#include - -#define FMT_THIS_ADDR "this=" << (void*)this - -namespace hdfs { - -using ::hadoop::hdfs::LocatedBlocksProto; - -FileHandle::~FileHandle() {} - -FileHandleImpl::FileHandleImpl(const std::string & cluster_name, - const std::string & path, - ::asio::io_service *io_service, const std::string &client_name, - const std::shared_ptr file_info, - std::shared_ptr bad_data_nodes, - std::shared_ptr event_handlers) - : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), - bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) { - LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" - << FMT_THIS_ADDR << ", ...) called"); - -} - -void FileHandleImpl::PositionRead( - void *buf, size_t buf_size, uint64_t offset, - const std::function &handler) { - LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead(" - << FMT_THIS_ADDR << ", buf=" << buf - << ", buf_size=" << std::to_string(buf_size) << ") called"); - - /* prevent usage after cancelation */ - if(cancel_state_->is_canceled()) { - handler(Status::Canceled(), 0); - return; - } - - auto callback = [this, handler](const Status &status, - const std::string &contacted_datanode, - size_t bytes_read) { - /* determine if DN gets marked bad */ - if (ShouldExclude(status)) { - bad_node_tracker_->AddBadNode(contacted_datanode); - } - - bytes_read_ += bytes_read; - handler(status, bytes_read); - }; - - AsyncPreadSome(offset, asio::buffer(buf, buf_size), bad_node_tracker_, callback); -} - -Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) { - LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead(" - << FMT_THIS_ADDR << ", buf=" << buf - << ", buf_size=" << std::to_string(buf_size) - << ", offset=" << offset << ") called"); - - auto callstate = std::make_shared>>(); - std::future> future(callstate->get_future()); - - /* wrap async call with promise/future to make it blocking */ - auto callback = [callstate](const Status &s, size_t bytes) { - callstate->set_value(std::make_tuple(s,bytes)); - }; - - PositionRead(buf, buf_size, offset, callback); - - /* wait for async to finish */ - auto returnstate = future.get(); - auto stat = std::get<0>(returnstate); - - if (!stat.ok()) { - return stat; - } - - *bytes_read = std::get<1>(returnstate); - return stat; -} - -Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) { - LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read(" - << FMT_THIS_ADDR << ", buf=" << buf - << ", buf_size=" << std::to_string(buf_size) << ") called"); - - Status stat = PositionRead(buf, buf_size, offset_, bytes_read); - if(!stat.ok()) { - return stat; - } - - offset_ += *bytes_read; - return Status::OK(); -} - -Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) { - LOG_DEBUG(kFileHandle, << "FileHandleImpl::Seek(" - << ", offset=" << *offset << ", ...) called"); - - if(cancel_state_->is_canceled()) { - return Status::Canceled(); - } - - off_t new_offset = -1; - - switch (whence) { - case std::ios_base::beg: - new_offset = *offset; - break; - case std::ios_base::cur: - new_offset = offset_ + *offset; - break; - case std::ios_base::end: - new_offset = file_info_->file_length_ + *offset; - break; - default: - /* unsupported */ - return Status::InvalidArgument("Invalid Seek whence argument"); - } - - if(!CheckSeekBounds(new_offset)) { - return Status::InvalidArgument("Seek offset out of bounds"); - } - offset_ = new_offset; - - *offset = offset_; - return Status::OK(); -} - -/* return false if seek will be out of bounds */ -bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) { - ssize_t file_length = file_info_->file_length_; - - if (desired_position < 0 || desired_position > file_length) { - return false; - } - - return true; -} - -/* - * Note that this method must be thread-safe w.r.t. the unsafe operations occurring - * on the FileHandle - */ -void FileHandleImpl::AsyncPreadSome( - size_t offset, const MutableBuffers &buffers, - std::shared_ptr excluded_nodes, - const std::function handler) { - using ::hadoop::hdfs::DatanodeInfoProto; - using ::hadoop::hdfs::LocatedBlockProto; - - LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" - << FMT_THIS_ADDR << ", ...) called"); - - if(cancel_state_->is_canceled()) { - handler(Status::Canceled(), "", 0); - return; - } - - if(offset == file_info_->file_length_) { - handler(Status::OK(), "", 0); - return; - } else if(offset > file_info_->file_length_){ - handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past the EOF"), "", 0); - return; - } - - /** - * Note: block and chosen_dn will end up pointing to things inside - * the blocks_ vector. They shouldn't be directly deleted. - **/ - auto block = std::find_if( - file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) { - return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); - }); - - if (block == file_info_->blocks_.end()) { - LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR - << ", ...) Cannot find corresponding blocks"); - handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); - return; - } - - /** - * If user supplies a rule use it, otherwise use the tracker. - * User is responsible for making sure one of them isn't null. - **/ - std::shared_ptr rule = - excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_; - - auto datanodes = block->locs(); - auto it = std::find_if(datanodes.begin(), datanodes.end(), - [rule](const DatanodeInfoProto &dn) { - return !rule->IsBadNode(dn.id().datanodeuuid()); - }); - - if (it == datanodes.end()) { - LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" - << FMT_THIS_ADDR << ", ...) No datanodes available"); - - handler(Status::ResourceUnavailable("No datanodes available"), "", 0); - return; - } - - DatanodeInfoProto &chosen_dn = *it; - - std::string dnIpAddr = chosen_dn.id().ipaddr(); - std::string dnHostName = chosen_dn.id().hostname(); - - uint64_t offset_within_block = offset - block->offset(); - uint64_t size_within_block = std::min( - block->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); - - LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" - << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr - << ", file path=\"" << path_ << "\", offset=" << std::to_string(offset) << ", read size=" << size_within_block); - - // This is where we will put the logic for re-using a DN connection; we can - // steal the FileHandle's dn and put it back when we're done - std::shared_ptr dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken()); - std::string dn_id = dn->uuid_; - std::string client_name = client_name_; - - // Wrap the DN in a block reader to handle the state and logic of the - // block request protocol - std::shared_ptr reader; - reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_); - - // Lambdas cannot capture copies of member variables so we'll make explicit - // copies for it - auto event_handlers = event_handlers_; - auto path = path_; - auto cluster_name = cluster_name_; - - auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { - event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response_type() == event_response::kTest_Error) { - handler(event_resp.status(), dn_id, transferred); - return; - } -#endif - - handler(status, dn_id, transferred); - }; - - auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] - (Status status, std::shared_ptr dn) { - (void)dn; - event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); -#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED - if (event_resp.response_type() == event_response::kTest_Error) { - status = event_resp.status(); - } -#endif - - if (status.ok()) { - reader->AsyncReadBlock( - client_name, *block, offset_within_block, - asio::buffer(buffers, size_within_block), read_handler); - } else { - handler(status, dn_id, 0); - } - }; - - dn->Connect(connect_handler); - - return; -} - -std::shared_ptr FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn, - std::shared_ptr event_handlers) -{ - std::shared_ptr reader = std::make_shared(options, dn, cancel_state_, event_handlers); - - LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR - << ", ..., dnconn=" << dn.get() - << ") called. New BlockReader = " << reader.get()); - - readers_.AddReader(reader); - return reader; -} - -std::shared_ptr FileHandleImpl::CreateDataNodeConnection( - ::asio::io_service * io_service, - const ::hadoop::hdfs::DatanodeInfoProto & dn, - const hadoop::common::TokenProto * token) { - LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection(" - << FMT_THIS_ADDR << ", ...) called"); - return std::make_shared(io_service, dn, token, event_handlers_.get()); -} - -std::shared_ptr FileHandleImpl::get_event_handlers() { - return event_handlers_; -} - -void FileHandleImpl::CancelOperations() { - LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations(" - << FMT_THIS_ADDR << ") called"); - - cancel_state_->set_canceled(); - - /* Push update to BlockReaders that may be hung in an asio call */ - std::vector> live_readers = readers_.GetLiveReaders(); - for(auto reader : live_readers) { - reader->CancelOperation(); - } -} - -void FileHandleImpl::SetFileEventCallback(file_event_callback callback) { - std::shared_ptr new_event_handlers; - if (event_handlers_) { - new_event_handlers = std::make_shared(*event_handlers_); - } else { - new_event_handlers = std::make_shared(); - } - new_event_handlers->set_file_callback(callback); - event_handlers_ = new_event_handlers; -} - - - -bool FileHandle::ShouldExclude(const Status &s) { - if (s.ok()) { - return false; - } - - switch (s.code()) { - /* client side resource exhaustion */ - case Status::kResourceUnavailable: - case Status::kOperationCanceled: - return false; - case Status::kInvalidArgument: - case Status::kUnimplemented: - case Status::kException: - default: - return true; - } -} - -uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_.load(); } - -void FileHandleImpl::clear_bytes_read() { bytes_read_.store(0); } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h deleted file mode 100644 index 4135156..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ -#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ - -#include "common/hdfs_ioservice.h" -#include "common/async_stream.h" -#include "common/cancel_tracker.h" -#include "common/libhdfs_events_impl.h" -#include "common/new_delete.h" -#include "reader/fileinfo.h" -#include "reader/readergroup.h" - -#include "asio.hpp" -#include "bad_datanode_tracker.h" -#include "ClientNamenodeProtocol.pb.h" - -#include -#include - -namespace hdfs { - -class BlockReader; -struct BlockReaderOptions; -class DataNodeConnection; - -/* - * FileHandle: coordinates operations on a particular file in HDFS - * - * Threading model: not thread-safe; consumers and io_service should not call - * concurrently. PositionRead is the exceptions; they can be - * called concurrently and repeatedly. - * Lifetime: pointer returned to consumer by FileSystem::Open. Consumer is - * resonsible for freeing the object. - */ -class FileHandleImpl : public FileHandle { -public: - MEMCHECKED_CLASS(FileHandleImpl) - FileHandleImpl(const std::string & cluster_name, - const std::string & path, - ::asio::io_service *io_service, const std::string &client_name, - const std::shared_ptr file_info, - std::shared_ptr bad_data_nodes, - std::shared_ptr event_handlers); - - /* - * Reads the file at the specified offset into the buffer. - * bytes_read returns the number of bytes successfully read on success - * and on error. Status::InvalidOffset is returned when trying to begin - * a read past the EOF. - */ - void PositionRead( - void *buf, - size_t buf_size, - uint64_t offset, - const std::function &handler - ) override; - - /** - * Reads the file at the specified offset into the buffer. - * @param buf output buffer - * @param buf_size size of the output buffer - * @param offset offset at which to start reading - * @param bytes_read number of bytes successfully read - */ - Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) override; - Status Read(void *buf, size_t buf_size, size_t *bytes_read) override; - Status Seek(off_t *offset, std::ios_base::seekdir whence) override; - - - /* - * Reads some amount of data into the buffer. Will attempt to find the best - * datanode and read data from it. - * - * If an error occurs during connection or transfer, the callback will be - * called with bytes_read equal to the number of bytes successfully transferred. - * If no data nodes can be found, status will be Status::ResourceUnavailable. - * If trying to begin a read past the EOF, status will be Status::InvalidOffset. - * - */ - void AsyncPreadSome(size_t offset, const MutableBuffers &buffers, - std::shared_ptr excluded_nodes, - const std::function handler); - - /** - * Cancels all operations instantiated from this FileHandle. - * Will set a flag to abort continuation pipelines when they try to move to the next step. - * Closes TCP connections to Datanode in order to abort pipelines waiting on slow IO. - **/ - virtual void CancelOperations(void) override; - - virtual void SetFileEventCallback(file_event_callback callback) override; - - /** - * Ephemeral objects created by the filehandle will need to get the event - * handler registry owned by the FileSystem. - **/ - std::shared_ptr get_event_handlers(); - - /* how many bytes have been successfully read */ - virtual uint64_t get_bytes_read() override; - - /* resets the number of bytes read to zero */ - virtual void clear_bytes_read() override; - -protected: - virtual std::shared_ptr CreateBlockReader(const BlockReaderOptions &options, - std::shared_ptr dn, - std::shared_ptr event_handlers); - virtual std::shared_ptr CreateDataNodeConnection( - ::asio::io_service *io_service, - const ::hadoop::hdfs::DatanodeInfoProto & dn, - const hadoop::common::TokenProto * token); -private: - const std::string cluster_name_; - const std::string path_; - ::asio::io_service * const io_service_; - const std::string client_name_; - const std::shared_ptr file_info_; - std::shared_ptr bad_node_tracker_; - bool CheckSeekBounds(ssize_t desired_position); - off_t offset_; - CancelHandle cancel_state_; - ReaderGroup readers_; - std::shared_ptr event_handlers_; - std::atomic bytes_read_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc deleted file mode 100644 index 56d02d8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ /dev/null @@ -1,859 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filesystem.h" - -#include "common/namenode_info.h" - -#include -#include -#include -#include -#include -#include -#include - -#define FMT_THIS_ADDR "this=" << (void*)this - -namespace hdfs { - -static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; -static const int kNamenodeProtocolVersion = 1; - -using ::asio::ip::tcp; - -static constexpr uint16_t kDefaultPort = 8020; - -// forward declarations -const std::string get_effective_user_name(const std::string &); - -uint32_t FileSystem::GetDefaultFindMaxDepth() { - return std::numeric_limits::max(); -} - -uint16_t FileSystem::GetDefaultPermissionMask() { - return 0755; -} - -Status FileSystem::CheckValidPermissionMask(uint16_t permissions) { - if (permissions > 01777) { - std::stringstream errormsg; - errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct - << std::showbase << permissions << " (should be between 0 and 01777)"; - return Status::InvalidArgument(errormsg.str().c_str()); - } - return Status::OK(); -} - -Status FileSystem::CheckValidReplication(uint16_t replication) { - if (replication < 1 || replication > 512) { - std::stringstream errormsg; - errormsg << "CheckValidReplication: argument 'replication' is " - << replication << " (should be between 1 and 512)"; - return Status::InvalidArgument(errormsg.str().c_str()); - } - return Status::OK(); -} - -FileSystem::~FileSystem() {} - -/***************************************************************************** - * FILESYSTEM BASE CLASS - ****************************************************************************/ - -FileSystem *FileSystem::New( - IoService *&io_service, const std::string &user_name, const Options &options) { - return new FileSystemImpl(io_service, user_name, options); -} - -FileSystem *FileSystem::New( - std::shared_ptr io_service, const std::string &user_name, const Options &options) { - return new FileSystemImpl(io_service, user_name, options); -} - -FileSystem *FileSystem::New() { - // No, this pointer won't be leaked. The FileSystem takes ownership. - std::shared_ptr io_service = IoService::MakeShared(); - if(!io_service) - return nullptr; - int thread_count = io_service->InitDefaultWorkers(); - if(thread_count < 1) - return nullptr; - - std::string user_name = get_effective_user_name(""); - Options options; - return new FileSystemImpl(io_service, user_name, options); -} - -/***************************************************************************** - * FILESYSTEM IMPLEMENTATION - ****************************************************************************/ - -const std::string get_effective_user_name(const std::string &user_name) { - if (!user_name.empty()) - return user_name; - - // If no user name was provided, try the HADOOP_USER_NAME and USER environment - // variables - const char * env = getenv("HADOOP_USER_NAME"); - if (env) { - return env; - } - - env = getenv("USER"); - if (env) { - return env; - } - - // If running on POSIX, use the currently logged in user -#if defined(_POSIX_VERSION) - uid_t uid = geteuid(); - struct passwd *pw = getpwuid(uid); - if (pw && pw->pw_name) - { - return pw->pw_name; - } -#endif - - return "unknown_user"; -} - -FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) : - io_service_(static_cast(io_service)), options_(options), - client_name_(GetRandomClientName()), - nn_( - &io_service_->io_service(), options, client_name_, - get_effective_user_name(user_name), kNamenodeProtocol, - kNamenodeProtocolVersion - ), - bad_node_tracker_(std::make_shared()), - event_handlers_(std::make_shared()) -{ - - LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" - << FMT_THIS_ADDR << ") called"); - - // Poor man's move - io_service = nullptr; - - unsigned int running_workers = 0; - if(options.io_threads_ < 1) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing default number of worker threads"); - running_workers = io_service_->InitDefaultWorkers(); - } else { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " << options_.io_threads_ << " worker threads."); - running_workers = io_service->InitWorkers(options_.io_threads_); - } - - if(running_workers < 1) { - LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to start worker threads"); - } -} - -FileSystemImpl::FileSystemImpl(std::shared_ptr io_service, const std::string& user_name, const Options &options) : - io_service_(std::static_pointer_cast(io_service)), options_(options), - client_name_(GetRandomClientName()), - nn_( - &io_service_->io_service(), options, client_name_, - get_effective_user_name(user_name), kNamenodeProtocol, - kNamenodeProtocolVersion - ), - bad_node_tracker_(std::make_shared()), - event_handlers_(std::make_shared()) -{ - LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" - << FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called"); - int worker_thread_count = io_service_->get_worker_thread_count(); - if(worker_thread_count < 1) { - LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. " - << "It needs at least 1 worker to connect to an HDFS cluster.") - } else { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << worker_thread_count << " worker threads."); - } -} - -FileSystemImpl::~FileSystemImpl() { - LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl(" - << FMT_THIS_ADDR << ") called"); - - /** - * Note: IoService must be stopped before getting rid of worker threads. - * Once worker threads are joined and deleted the service can be deleted. - **/ - io_service_->Stop(); -} - -void FileSystemImpl::Connect(const std::string &server, - const std::string &service, - const std::function &handler) { - LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR - << ", server=" << server << ", service=" - << service << ") called"); - connect_callback_.SetCallback(handler); - - /* IoService::New can return nullptr */ - if (!io_service_) { - handler (Status::Error("Null IoService"), this); - } - - // DNS lookup here for namenode(s) - std::vector resolved_namenodes; - - auto name_service = options_.services.find(server); - if(name_service != options_.services.end()) { - cluster_name_ = name_service->first; - resolved_namenodes = BulkResolve(&io_service_->io_service(), name_service->second); - } else { - cluster_name_ = server + ":" + service; - - // tmp namenode info just to get this in the right format for BulkResolve - NamenodeInfo tmp_info; - try { - tmp_info.uri = URI::parse_from_string("hdfs://" + cluster_name_); - } catch (const uri_parse_error& e) { - LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << cluster_name_); - handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this); - } - - resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info}); - } - - for(unsigned int i=0;i &handler) { - std::string scheme = options_.defaultFS.get_scheme(); - if (strcasecmp(scheme.c_str(), "hdfs") != 0) { - std::string error_message; - error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not supported"; - handler(Status::InvalidArgument(error_message.c_str()), nullptr); - return; - } - - std::string host = options_.defaultFS.get_host(); - if (host.empty()) { - handler(Status::InvalidArgument("defaultFS must specify a hostname"), nullptr); - return; - } - - int16_t port = options_.defaultFS.get_port_or_default(kDefaultPort); - std::string port_as_string = std::to_string(port); - - Connect(host, port_as_string, handler); -} - -int FileSystemImpl::AddWorkerThread() { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread(" - << FMT_THIS_ADDR << ") called." - << " Existing thread count = " << WorkerThreadCount()); - - if(!io_service_) - return -1; - - io_service_->AddWorkerThread(); - return 1; -} - -int FileSystemImpl::WorkerThreadCount() { - if(!io_service_) { - return -1; - } else { - return io_service_->get_worker_thread_count(); - } -} - -bool FileSystemImpl::CancelPendingConnect() { - if(connect_callback_.IsCallbackAccessed()) { - // Temp fix for failover hangs, allow CancelPendingConnect to be called so it can push a flag through the RPC engine - LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called after Connect completed"); - return nn_.CancelPendingConnect(); - } - - if(!connect_callback_.IsCallbackSet()) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called before Connect started"); - return false; - } - - // First invoke callback, then do proper teardown in RpcEngine and RpcConnection - ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) { - LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled FileSystem@" << fs << "::Connect with status: " << stat.ToString()); - }; - - bool callback_swapped = false; - ConnectCallback original_callback = connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped); - - if(callback_swapped) { - // Take original callback and invoke it as if it was canceled. - LOG_DEBUG(kFileSystem, << "Swapped in dummy callback. Invoking connect callback with canceled status."); - std::function wrapped_callback = [original_callback, this](){ - // handling code expected to check status before dereferenceing 'this' - original_callback(Status::Canceled(), this); - }; - io_service_->PostTask(wrapped_callback); - } else { - LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect. It hasn't been invoked yet or may have already completed.") - return false; - } - - // Now push cancel down to clean up where possible and make sure the RpcEngine - // won't try to do retries in the background. The rest of the memory cleanup - // happens when this FileSystem is deleted by the user. - return nn_.CancelPendingConnect(); -} - -void FileSystemImpl::Open( - const std::string &path, - const std::function &handler) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - nn_.GetBlockLocations(path, 0, std::numeric_limits::max(), [this, path, handler](const Status &stat, std::shared_ptr file_info) { - if(!stat.ok()) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString()); - if(stat.get_server_exception_type() == Status::kStandbyException) { - LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode"); - } - } - handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) - : nullptr); - }); -} - - -BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock) -{ - BlockLocation result; - - result.setCorrupt(locatedBlock.corrupt()); - result.setOffset(locatedBlock.offset()); - - std::vector dn_info; - dn_info.reserve(locatedBlock.locs_size()); - for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: locatedBlock.locs()) { - const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id(); - DNInfo newInfo; - if (id.has_ipaddr()) - newInfo.setIPAddr(id.ipaddr()); - if (id.has_hostname()) - newInfo.setHostname(id.hostname()); - if (id.has_xferport()) - newInfo.setXferPort(id.xferport()); - if (id.has_infoport()) - newInfo.setInfoPort(id.infoport()); - if (id.has_ipcport()) - newInfo.setIPCPort(id.ipcport()); - if (id.has_infosecureport()) - newInfo.setInfoSecurePort(id.infosecureport()); - if (datanode_info.has_location()) - newInfo.setNetworkLocation(datanode_info.location()); - dn_info.push_back(newInfo); - } - result.setDataNodes(dn_info); - - if (locatedBlock.has_b()) { - const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b(); - result.setLength(b.numbytes()); - } - - - return result; -} - -void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - const std::function locations)> handler) -{ - LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - //Protobuf gives an error 'Negative value is not supported' - //if the high bit is set in uint64 in GetBlockLocations - if (IsHighBitSet(offset)) { - handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr); - return; - } - if (IsHighBitSet(length)) { - handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr); - return; - } - - auto conversion = [handler](const Status & status, std::shared_ptr fileInfo) { - if (status.ok()) { - auto result = std::make_shared(); - - result->setFileLength(fileInfo->file_length_); - result->setLastBlockComplete(fileInfo->last_block_complete_); - result->setUnderConstruction(fileInfo->under_construction_); - - std::vector blocks; - for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: fileInfo->blocks_) { - auto newLocation = LocatedBlockToBlockLocation(locatedBlock); - blocks.push_back(newLocation); - } - result->setBlockLocations(blocks); - - handler(status, result); - } else { - handler(status, std::shared_ptr()); - } - }; - - nn_.GetBlockLocations(path, offset, length, conversion); -} - -void FileSystemImpl::GetPreferredBlockSize(const std::string &path, - const std::function &handler) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - nn_.GetPreferredBlockSize(path, handler); -} - - -void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << path << - ", replication=" << replication << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); - return; - } - Status replStatus = FileSystem::CheckValidReplication(replication); - if (!replStatus.ok()) { - handler(replStatus); - return; - } - - nn_.SetReplication(path, replication, handler); -} - - -void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, - std::function handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path << - ", mtime=" << mtime << ", atime=" << atime << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty")); - return; - } - - nn_.SetTimes(path, mtime, atime, handler); -} - - -void FileSystemImpl::GetFileInfo( - const std::string &path, - const std::function &handler) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - nn_.GetFileInfo(path, handler); -} - -void FileSystemImpl::GetContentSummary( - const std::string &path, - const std::function &handler) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetContentSummary(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - - nn_.GetContentSummary(path, handler); -} - -void FileSystemImpl::GetFsStats( - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called"); - - nn_.GetFsStats(handler); -} - - -/** - * Helper function for recursive GetListing calls. - * - * Some compilers don't like recursive lambdas, so we make the lambda call a - * method, which in turn creates a lambda calling itself. - */ -void FileSystemImpl::GetListingShim(const Status &stat, const std::vector & stat_infos, bool has_more, - std::string path, const std::function &, bool)> &handler) { - bool has_next = !stat_infos.empty(); - bool get_more = handler(stat, stat_infos, has_more && has_next); - if (get_more && has_more && has_next ) { - auto callback = [this, path, handler](const Status &stat, const std::vector & stat_infos, bool has_more) { - GetListingShim(stat, stat_infos, has_more, path, handler); - }; - - std::string last = stat_infos.back().path; - nn_.GetListing(path, callback, last); - } -} - -void FileSystemImpl::GetListing( - const std::string &path, - const std::function &, bool)> &handler) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetListing(" - << FMT_THIS_ADDR << ", path=" - << path << ") called"); - std::string path_fixed = path; - if(path.back() != '/'){ - path_fixed += "/"; - } - // Caputure the state and push it into the shim - auto callback = [this, path_fixed, handler](const Status &stat, const std::vector & stat_infos, bool has_more) { - GetListingShim(stat, stat_infos, has_more, path_fixed, handler); - }; - - nn_.GetListing(path_fixed, callback); -} - - -void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, - std::function handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << - ", permissions=" << permissions << ", createparent=" << createparent << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty")); - return; - } - - Status permStatus = FileSystem::CheckValidPermissionMask(permissions); - if (!permStatus.ok()) { - handler(permStatus); - return; - } - - nn_.Mkdirs(path, permissions, createparent, handler); -} - - -void FileSystemImpl::Delete(const std::string &path, bool recursive, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty")); - return; - } - - nn_.Delete(path, recursive, handler); -} - - -void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); - - if (oldPath.empty()) { - handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty")); - return; - } - - if (newPath.empty()) { - handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty")); - return; - } - - nn_.Rename(oldPath, newPath, handler); -} - - -void FileSystemImpl::SetPermission(const std::string & path, - uint16_t permissions, const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); - return; - } - Status permStatus = FileSystem::CheckValidPermissionMask(permissions); - if (!permStatus.ok()) { - handler(permStatus); - return; - } - - nn_.SetPermission(path, permissions, handler); -} - - -void FileSystemImpl::SetOwner(const std::string & path, const std::string & username, - const std::string & groupname, const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty")); - return; - } - - nn_.SetOwner(path, username, groupname, handler); -} - - -/** - * Helper function for recursive Find calls. - * - * Some compilers don't like recursive lambdas, so we make the lambda call a - * method, which in turn creates a lambda calling itself. - * - * ***High-level explanation*** - * - * Since we are allowing to use wild cards in both path and name, we start by expanding the path first. - * Boolean search_path is set to true when we search for the path and false when we search for the name. - * When we search for the path we break the given path pattern into sub-directories. Starting from the - * first sub-directory we list them one-by-one and recursively continue into directories that matched the - * path pattern at the current depth. Directories that are large will be requested to continue sending - * the results. We keep track of the current depth within the path pattern in the 'depth' variable. - * This continues recursively until the depth reaches the end of the path. Next that we start matching - * the name pattern. All directories that we find we recurse now, and all names that match the given name - * pattern are being stored in outputs and later sent back to the user. - */ -void FileSystemImpl::FindShim(const Status &stat, const std::vector & stat_infos, bool directory_has_more, - std::shared_ptr operational_state, std::shared_ptr shared_state) { - //We buffer the outputs then send them back at the end - std::vector outputs; - //Return on error - if(!stat.ok()){ - std::lock_guard find_lock(shared_state->lock); - //We send true becuase we do not want the user code to exit before all our requests finished - shared_state->handler(stat, outputs, true); - shared_state->aborted = true; - } - if(!shared_state->aborted){ - //User did not abort the operation - if (directory_has_more) { - //Directory is large and has more results - //We launch another async call to get more results - shared_state->outstanding_requests++; - auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool has_more) { - FindShim(stat, stat_infos, has_more, operational_state, shared_state); - }; - std::string last = stat_infos.back().path; - nn_.GetListing(operational_state->path, callback, last); - } - if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){ - //We are searching for the path and did not reach the end of the path yet - for (StatInfo const& si : stat_infos) { - //If we are at the last depth and it matches both path and name, we need to output it. - if (operational_state->depth == shared_state->dirs.size() - 2 - && !fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0) - && !fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)) { - outputs.push_back(si); - } - //Skip if not directory - if(si.file_type != StatInfo::IS_DIR) { - continue; - } - //Checking for a match with the path at the current depth - if(!fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0)){ - //Launch a new requests for every matched directory - shared_state->outstanding_requests++; - auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool has_more) { - std::shared_ptr new_current_state = std::make_shared(si.full_path, operational_state->depth + 1, true); //true because searching for the path - FindShim(stat, stat_infos, has_more, new_current_state, shared_state); - }; - nn_.GetListing(si.full_path, callback); - } - } - } - else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){ - //We are searching for the name now and maxdepth has not been reached - for (StatInfo const& si : stat_infos) { - //Launch a new request for every directory - if(si.file_type == StatInfo::IS_DIR) { - shared_state->outstanding_requests++; - auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool has_more) { - std::shared_ptr new_current_state = std::make_shared(si.full_path, operational_state->depth + 1, false); //false because searching for the name - FindShim(stat, stat_infos, has_more, new_current_state, shared_state); - }; - nn_.GetListing(si.full_path, callback); - } - //All names that match the specified name are saved to outputs - if(!fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)){ - outputs.push_back(si); - } - } - } - } - //This section needs a lock to make sure we return the final chunk only once - //and no results are sent after aborted is set - std::lock_guard find_lock(shared_state->lock); - //Decrement the counter once since we are done with this chunk - shared_state->outstanding_requests--; - if(shared_state->outstanding_requests == 0){ - //Send the outputs back to the user and notify that this is the final chunk - shared_state->handler(stat, outputs, false); - } else { - //There will be more results and we are not aborting - if (outputs.size() > 0 && !shared_state->aborted){ - //Send the outputs back to the user and notify that there is more - bool user_wants_more = shared_state->handler(stat, outputs, true); - if(!user_wants_more) { - //Abort if user doesn't want more - shared_state->aborted = true; - } - } - } -} - -void FileSystemImpl::Find( - const std::string &path, const std::string &name, const uint32_t maxdepth, - const std::function &, bool)> &handler) { - LOG_DEBUG(kFileSystem, << "FileSystemImpl::Find(" - << FMT_THIS_ADDR << ", path=" - << path << ", name=" - << name << ") called"); - - //Populating the operational state, which includes: - //current search path, depth within the path, and the indication that we are currently searching for a path (not name yet). - std::shared_ptr operational_state = std::make_shared(path, 0, true); - //Populating the shared state, which includes: - //vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag. - std::shared_ptr shared_state = std::make_shared(path, name, maxdepth, handler, 1, false); - auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector & stat_infos, bool directory_has_more) { - FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state); - }; - nn_.GetListing("/", callback); -} - - -void FileSystemImpl::CreateSnapshot(const std::string &path, - const std::string &name, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty")); - return; - } - - nn_.CreateSnapshot(path, name, handler); -} - - -void FileSystemImpl::DeleteSnapshot(const std::string &path, - const std::string &name, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty")); - return; - } - if (name.empty()) { - handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty")); - return; - } - - nn_.DeleteSnapshot(path, name, handler); -} - -void FileSystemImpl::RenameSnapshot(const std::string &path, - const std::string &old_name, const std::string &new_name, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path << - ", old_name=" << old_name << ", new_name=" << new_name << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be empty")); - return; - } - if (old_name.empty()) { - handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' cannot be empty")); - return; - } - if (new_name.empty()) { - handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' cannot be empty")); - return; - } - - nn_.RenameSnapshot(path, old_name, new_name, handler); -} - -void FileSystemImpl::AllowSnapshot(const std::string &path, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty")); - return; - } - - nn_.AllowSnapshot(path, handler); -} - - -void FileSystemImpl::DisallowSnapshot(const std::string &path, - const std::function &handler) { - LOG_DEBUG(kFileSystem, - << "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - - if (path.empty()) { - handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty")); - return; - } - - nn_.DisallowSnapshot(path, handler); -} - -void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) { - if (event_handlers_) { - event_handlers_->set_fs_callback(callback); - nn_.SetFsEventCallback(callback); - } -} - - - -std::shared_ptr FileSystemImpl::get_event_handlers() { - return event_handlers_; -} - -Options FileSystemImpl::get_options() { - return options_; -} - -std::string FileSystemImpl::get_cluster_name() { - return cluster_name_; -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h deleted file mode 100644 index f2e9abd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ -#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ - -#include "filehandle.h" -#include "hdfspp/hdfspp.h" -#include "fs/bad_datanode_tracker.h" -#include "reader/block_reader.h" -#include "reader/fileinfo.h" - -#include "asio.hpp" - -#include -#include "namenode_operations.h" - -namespace hdfs { - -/* - * FileSystem: The consumer's main point of interaction with the cluster as - * a whole. - * - * Initially constructed in a disconnected state; call Connect before operating - * on the FileSystem. - * - * All open files must be closed before the FileSystem is destroyed. - * - * Threading model: thread-safe for all operations - * Lifetime: pointer created for consumer who is responsible for deleting it - */ -class FileSystemImpl : public FileSystem { -public: - MEMCHECKED_CLASS(FileSystemImpl) - typedef std::function ConnectCallback; - - explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); - explicit FileSystemImpl(std::shared_ptr, const std::string& user_name, const Options &options); - ~FileSystemImpl() override; - - /* attempt to connect to namenode, return bad status on failure */ - void Connect(const std::string &server, const std::string &service, - const std::function &handler) override; - /* attempt to connect to namenode, return bad status on failure */ - Status Connect(const std::string &server, const std::string &service) override; - - /* Connect to the NN indicated in options.defaultFs */ - virtual void ConnectToDefaultFs( - const std::function &handler) override; - virtual Status ConnectToDefaultFs() override; - - /* Cancel connection if FS is in the middle of one */ - virtual bool CancelPendingConnect() override; - - virtual void Open(const std::string &path, - const std::function - &handler) override; - Status Open(const std::string &path, FileHandle **handle) override; - - virtual void GetPreferredBlockSize(const std::string &path, - const std::function &handler) override; - virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) override; - - virtual void SetReplication(const std::string & path, int16_t replication, std::function handler) override; - virtual Status SetReplication(const std::string & path, int16_t replication) override; - - void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function handler) override; - Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) override; - - void GetFileInfo( - const std::string &path, - const std::function &handler) override; - - Status GetFileInfo(const std::string &path, StatInfo & stat_info) override; - - void GetContentSummary(const std::string &path, - const std::function &handler) override; - Status GetContentSummary(const std::string &path, ContentSummary & stat_info) override; - - /** - * Retrieves the file system information such as the total raw size of all files in the filesystem - * and the raw capacity of the filesystem - * - * @param FsInfo struct to be populated by GetFsStats - **/ - void GetFsStats( - const std::function &handler) override; - - Status GetFsStats(FsInfo & fs_info) override; - - void GetListing( - const std::string &path, - const std::function &, bool)> &handler) override; - - Status GetListing(const std::string &path, std::vector * stat_infos) override; - - virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - const std::function locations)> ) override; - virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, - std::shared_ptr * locations) override; - - virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, - std::function handler) override; - virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) override; - - virtual void Delete(const std::string &path, bool recursive, - const std::function &handler) override; - virtual Status Delete(const std::string &path, bool recursive) override; - - virtual void Rename(const std::string &oldPath, const std::string &newPath, - const std::function &handler) override; - virtual Status Rename(const std::string &oldPath, const std::string &newPath) override; - - virtual void SetPermission(const std::string & path, uint16_t permissions, - const std::function &handler) override; - virtual Status SetPermission(const std::string & path, uint16_t permissions) override; - - virtual void SetOwner(const std::string & path, const std::string & username, - const std::string & groupname, const std::function &handler) override; - virtual Status SetOwner(const std::string & path, - const std::string & username, const std::string & groupname) override; - - void Find( - const std::string &path, const std::string &name, const uint32_t maxdepth, - const std::function &, bool)> &handler) override; - Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector * stat_infos) override; - - /***************************************************************************** - * FILE SYSTEM SNAPSHOT FUNCTIONS - ****************************************************************************/ - - /** - * Creates a snapshot of a snapshottable directory specified by path - * - * @param path Path to the directory to be snapshotted (must be non-empty) - * @param name Name to be given to the created snapshot (may be empty) - **/ - void CreateSnapshot(const std::string &path, const std::string &name, - const std::function &handler) override; - Status CreateSnapshot(const std::string &path, const std::string &name) override; - - /** - * Deletes the directory snapshot specified by path and name - * - * @param path Path to the snapshotted directory (must be non-empty) - * @param name Name of the snapshot to be deleted (must be non-empty) - **/ - void DeleteSnapshot(const std::string &path, const std::string &name, - const std::function &handler) override; - Status DeleteSnapshot(const std::string &path, const std::string &name) override; - - /** - * Renames the directory snapshot specified by path from old_name to new_name - * - * @param path Path to the snapshotted directory (must be non-blank) - * @param old_name Current name of the snapshot (must be non-blank) - * @param new_name New name of the snapshot (must be non-blank) - **/ - void RenameSnapshot(const std::string &path, const std::string &old_name, - const std::string &new_name, const std::function &handler) override; - Status RenameSnapshot(const std::string &path, const std::string &old_name, - const std::string &new_name) override; - - /** - * Allows snapshots to be made on the specified directory - * - * @param path Path to the directory to be made snapshottable (must be non-empty) - **/ - void AllowSnapshot(const std::string &path, const std::function &handler) override; - Status AllowSnapshot(const std::string &path) override; - - /** - * Disallows snapshots to be made on the specified directory - * - * @param path Path to the directory to be made non-snapshottable (must be non-empty) - **/ - void DisallowSnapshot(const std::string &path, const std::function &handler) override; - Status DisallowSnapshot(const std::string &path) override; - - void SetFsEventCallback(fs_event_callback callback) override; - - /* add a new thread to handle asio requests, return number of threads in pool - */ - int AddWorkerThread(); - - /* how many worker threads are servicing asio requests */ - int WorkerThreadCount(); - - /* all monitored events will need to lookup handlers */ - std::shared_ptr get_event_handlers(); - - Options get_options() override; - - std::string get_cluster_name() override; - -private: - /** - * The IoService must be the first member variable to ensure that it gets - * destroyed last. This allows other members to dequeue things from the - * service in their own destructors. - * A side effect of this is that requests may outlive the RpcEngine they - * reference. - **/ - std::shared_ptr io_service_; - const Options options_; - const std::string client_name_; - std::string cluster_name_; - NameNodeOperations nn_; - std::shared_ptr bad_node_tracker_; - - // Keep connect callback around in case it needs to be canceled - SwappableCallbackHolder connect_callback_; - - /** - * Runtime event monitoring handlers. - * Note: This is really handy to have for advanced usage but - * exposes implementation details that may change at any time. - **/ - std::shared_ptr event_handlers_; - - void GetListingShim(const Status &stat, const std::vector & stat_infos, bool has_more, - std::string path, const std::function &, bool)> &handler); - - struct FindSharedState { - //Name pattern (can have wild-cards) to find - const std::string name; - //Maximum depth to recurse after the end of path is reached. - //Can be set to 0 for pure path globbing and ignoring name pattern entirely. - const uint32_t maxdepth; - //Vector of all sub-directories from the path argument (each can have wild-cards) - std::vector dirs; - //Callback from Find - const std::function &, bool)> handler; - //outstanding_requests is incremented once for every GetListing call. - std::atomic outstanding_requests; - //Boolean needed to abort all recursion on error or on user command - std::atomic aborted; - //Shared variables will need protection with a lock - std::mutex lock; - FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_, - const std::function &, bool)> handler_, - uint64_t outstanding_recuests_, bool aborted_) - : name(name_), - maxdepth(maxdepth_), - handler(handler_), - outstanding_requests(outstanding_recuests_), - aborted(aborted_), - lock() { - //Constructing the list of sub-directories - std::stringstream ss(path_); - if(path_.back() != '/'){ - ss << "/"; - } - for (std::string token; std::getline(ss, token, '/'); ) { - dirs.push_back(token); - } - } - }; - - struct FindOperationalState { - const std::string path; - const uint32_t depth; - const bool search_path; - FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_) - : path(path_), - depth(depth_), - search_path(search_path_) { - } - }; - - void FindShim(const Status &stat, const std::vector & stat_infos, - bool directory_has_more, std::shared_ptr current_state, std::shared_ptr shared_state); - -}; -} - -#endif --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org