Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9956018129 for ; Wed, 28 Oct 2015 22:53:05 +0000 (UTC) Received: (qmail 66231 invoked by uid 500); 28 Oct 2015 22:53:05 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 66173 invoked by uid 500); 28 Oct 2015 22:53:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 66164 invoked by uid 99); 28 Oct 2015 22:53:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 22:53:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DE10DFE7C; Wed, 28 Oct 2015 22:53:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8766. Implement a libhdfs(3) compatible API. Contributed by James Clampffer. Date: Wed, 28 Oct 2015 22:53:05 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/HDFS-8707 6d687592d -> 07c904dda HDFS-8766. Implement a libhdfs(3) compatible API. Contributed by James Clampffer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07c904dd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07c904dd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07c904dd Branch: refs/heads/HDFS-8707 Commit: 07c904ddadb10554dfef30eb80a80b1a83e0b86c Parents: 6d68759 Author: Haohui Mai Authored: Wed Oct 28 15:50:26 2015 -0700 Committer: Haohui Mai Committed: Wed Oct 28 15:50:26 2015 -0700 ---------------------------------------------------------------------- .../src/main/native/libhdfspp/CMakeLists.txt | 1 + .../main/native/libhdfspp/lib/CMakeLists.txt | 1 + .../libhdfspp/lib/bindings/CMakeLists.txt | 19 +++ .../libhdfspp/lib/bindings/c/CMakeLists.txt | 20 +++ .../native/libhdfspp/lib/bindings/c/hdfs.cc | 137 ++++++++++++++++ .../native/libhdfspp/lib/bindings/c/hdfs_cpp.cc | 157 +++++++++++++++++++ .../native/libhdfspp/lib/bindings/c/hdfs_cpp.h | 82 ++++++++++ 7 files changed, 417 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt index 17612cf..c544b4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt @@ -51,6 +51,7 @@ include_directories( third_party/asio-1.10.2/include third_party/gmock-1.7.0 ${OPENSSL_INCLUDE_DIR} + ../libhdfs/include ) set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../hadoop-hdfs-client/src/main/proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt index a0e3379..434dc4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt @@ -21,3 +21,4 @@ add_subdirectory(fs) add_subdirectory(reader) add_subdirectory(rpc) add_subdirectory(proto) +add_subdirectory(bindings) http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/CMakeLists.txt new file mode 100644 index 0000000..93139ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/CMakeLists.txt @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_subdirectory(c) http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt new file mode 100644 index 0000000..e170370 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +add_library(bindings_c hdfs.cc hdfs_cpp.cc) +add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common) http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc new file mode 100644 index 0000000..9b985a9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_cpp.h" + +#include +#include + +using namespace hdfs; + +/* Seperate the handles used by the C api from the C++ API*/ +struct hdfs_internal { + hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {} + hdfs_internal(std::unique_ptr p) + : filesystem_(std::move(p)) {} + virtual ~hdfs_internal(){}; + HadoopFileSystem *get_impl() { return filesystem_.get(); } + const HadoopFileSystem *get_impl() const { return filesystem_.get(); } + + private: + std::unique_ptr filesystem_; +}; + +struct hdfsFile_internal { + hdfsFile_internal(FileHandle *p) : file_(p) {} + hdfsFile_internal(std::unique_ptr p) : file_(std::move(p)) {} + virtual ~hdfsFile_internal(){}; + FileHandle *get_impl() { return file_.get(); } + const FileHandle *get_impl() const { return file_.get(); } + + private: + std::unique_ptr file_; +}; + +/* Error handling with optional debug to stderr */ +static void ReportError(int errnum, std::string msg) { + errno = errnum; +#ifdef LIBHDFSPP_C_API_ENABLE_DEBUG + std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg + << "\"" << std::endl; +#else + (void)msg; +#endif +} + +/** + * C API implementations + **/ + +int hdfsFileIsOpenForRead(hdfsFile file) { + /* files can only be open for reads at the moment, do a quick check */ + if (file) { + return file->get_impl()->IsOpenForRead(); + } + return false; +} + +hdfsFS hdfsConnect(const char *nn, tPort port) { + HadoopFileSystem *fs = new HadoopFileSystem(); + Status stat = fs->Connect(nn, port); + if (!stat.ok()) { + ReportError(ENODEV, "Unable to connect to NameNode."); + delete fs; + return nullptr; + } + return new hdfs_internal(fs); +} + +int hdfsDisconnect(hdfsFS fs) { + if (!fs) { + ReportError(ENODEV, "Cannot disconnect null FS handle."); + return -1; + } + + delete fs; + return 0; +} + +hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, + short replication, tSize blocksize) { + (void)flags; + (void)bufferSize; + (void)replication; + (void)blocksize; + if (!fs) { + ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); + return nullptr; + } + FileHandle *f = nullptr; + Status stat = fs->get_impl()->OpenFileForRead(path, &f); + if (!stat.ok()) { + return nullptr; + } + return new hdfsFile_internal(f); +} + +int hdfsCloseFile(hdfsFS fs, hdfsFile file) { + if (!fs) { + ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); + return -1; + } + if (!file) { + ReportError(EBADF, "Cannot perform FS operations with null File handle."); + return -1; + } + delete file; + return 0; +} + +tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, + tSize length) { + if (!fs) { + ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); + return -1; + } + if (!file) { + ReportError(EBADF, "Cannot perform FS operations with null File handle."); + return -1; + } + + return file->get_impl()->Pread(buffer, length, position); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc new file mode 100644 index 0000000..3f3fb8d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_cpp.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "libhdfspp/hdfs.h" +#include "libhdfspp/status.h" +#include "fs/filesystem.h" +#include "common/hdfs_public_api.h" + +namespace hdfs { + +ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) { + auto stat = std::make_shared>(); + std::future future(stat->get_future()); + + /* wrap async call with promise/future to make it blocking */ + size_t read_count = 0; + auto callback = [stat, &read_count](const Status &s, const std::string &dn, + size_t bytes) { + (void)dn; + stat->set_value(s); + read_count = bytes; + }; + + input_stream_->PositionRead(buf, nbyte, offset, std::set(), + callback); + + /* wait for async to finish */ + auto s = future.get(); + + if (!s.ok()) { + return -1; + } + return (ssize_t)read_count; +} + +bool FileHandle::IsOpenForRead() { + /* for now just check if InputStream exists */ + if (!input_stream_) { + return false; + } + return true; +} + +HadoopFileSystem::~HadoopFileSystem() { + /** + * Note: IoService must be stopped before getting rid of worker threads. + * Once worker threads are joined and deleted the service can be deleted. + **/ + + file_system_.reset(nullptr); + service_->Stop(); + worker_threads_.clear(); + service_.reset(nullptr); +} + +Status HadoopFileSystem::Connect(const char *nn, tPort port, + unsigned int threads) { + /* IoService::New can return nullptr */ + if (!service_) { + return Status::Error("Null IoService"); + } + /* spawn background threads for asio delegation */ + for (unsigned int i = 0; i < threads; i++) { + AddWorkerThread(); + } + /* synchronized */ + FileSystem *fs = nullptr; + auto stat = std::make_shared>(); + std::future future = stat->get_future(); + + auto callback = [stat, &fs](const Status &s, FileSystem *f) { + fs = f; + stat->set_value(s); + }; + + /* dummy options object until this is hooked up to HDFS-9117 */ + Options options_object; + FileSystem::New(service_.get(), options_object, nn, std::to_string(port), + callback); + + /* block until promise is set */ + auto s = future.get(); + + /* check and see if it worked */ + if (!fs) { + service_->Stop(); + worker_threads_.clear(); + return s; + } + + file_system_ = std::unique_ptr(fs); + return s; +} + +int HadoopFileSystem::AddWorkerThread() { + auto service_task = [](IoService *service) { service->Run(); }; + worker_threads_.push_back( + WorkerPtr(new std::thread(service_task, service_.get()))); + return worker_threads_.size(); +} + +Status HadoopFileSystem::OpenFileForRead(const std::string &path, + FileHandle **handle) { + auto stat = std::make_shared>(); + std::future future = stat->get_future(); + + /* wrap async FileSystem::Open with promise to make it a blocking call */ + InputStream *input_stream = nullptr; + auto h = [stat, &input_stream](const Status &s, InputStream *is) { + stat->set_value(s); + input_stream = is; + }; + + file_system_->Open(path, h); + + /* block until promise is set */ + auto s = future.get(); + + if (!s.ok()) { + delete input_stream; + return s; + } + if (!input_stream) { + return s; + } + + *handle = new FileHandle(input_stream); + return s; +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/07c904dd/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h new file mode 100644 index 0000000..5822ff4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H +#define LIBHDFSPP_BINDINGS_HDFSCPP_H + +#include +#include +#include + +#include "libhdfspp/hdfs.h" +#include + +namespace hdfs { + +/** + * Implement a very simple 'it just works' interface in C++ + * that provides posix-like file operations + extra stuff for hadoop. + * Then provide very thin C wrappers over each method. + */ + +class FileHandle { + public: + virtual ~FileHandle(){}; + ssize_t Pread(void *buf, size_t nbyte, off_t offset); + bool IsOpenForRead(); + + private: + /* handle should only be created by fs */ + friend class HadoopFileSystem; + FileHandle(InputStream *is) : input_stream_(is){}; + std::unique_ptr input_stream_; +}; + +class HadoopFileSystem { + public: + HadoopFileSystem() : service_(IoService::New()) {} + virtual ~HadoopFileSystem(); + + /* attempt to connect to namenode, return false on failure */ + Status Connect(const char *nn, tPort port, unsigned int threads = 1); + + /* how many worker threads are servicing asio requests */ + int WorkerThreadCount() { return worker_threads_.size(); } + + /* add a new thread to handle asio requests, return number of threads in pool + */ + int AddWorkerThread(); + + Status OpenFileForRead(const std::string &path, FileHandle **handle); + + private: + std::unique_ptr service_; + /* std::thread needs to join before deletion */ + struct WorkerDeleter { + void operator()(std::thread *t) { + t->join(); + delete t; + } + }; + typedef std::unique_ptr WorkerPtr; + std::vector worker_threads_; + std::unique_ptr file_system_; +}; +} + +#endif