hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [2/2] hadoop git commit: HDFS-9144. Refactoring libhdfs++ into stateful/ephemeral objects. Contributed by Bob Hansen.
Date Thu, 03 Dec 2015 12:31:35 GMT
HDFS-9144.  Refactoring libhdfs++ into stateful/ephemeral objects.  Contributed by Bob Hansen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a06bc8e1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a06bc8e1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a06bc8e1

Branch: refs/heads/HDFS-8707
Commit: a06bc8e1232622aab0b7e4f26b547125de9ab300
Parents: d6d056d
Author: James <jhc@apache.org>
Authored: Thu Dec 3 07:30:22 2015 -0500
Committer: James <jhc@apache.org>
Committed: Thu Dec 3 07:30:22 2015 -0500

----------------------------------------------------------------------
 .../native/libhdfspp/include/libhdfspp/hdfs.h   |  38 +-
 .../libhdfspp/include/libhdfspp/options.h       |  11 +
 .../main/native/libhdfspp/lib/CMakeLists.txt    |   1 +
 .../libhdfspp/lib/bindings/c/CMakeLists.txt     |   2 +-
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |  37 +-
 .../native/libhdfspp/lib/bindings/c/hdfs_cpp.cc | 216 ---------
 .../native/libhdfspp/lib/bindings/c/hdfs_cpp.h  | 105 -----
 .../native/libhdfspp/lib/common/CMakeLists.txt  |  19 +-
 .../native/libhdfspp/lib/common/async_stream.h  |  49 +++
 .../libhdfspp/lib/common/continuation/asio.h    |   4 +-
 .../lib/common/continuation/protobuf.h          |  21 +-
 .../libhdfspp/lib/common/hdfs_public_api.cc     |  16 -
 .../main/native/libhdfspp/lib/common/options.cc |   3 +-
 .../main/native/libhdfspp/lib/common/util.cc    |  35 ++
 .../src/main/native/libhdfspp/lib/common/util.h |   7 +
 .../libhdfspp/lib/connection/CMakeLists.txt     |   2 +
 .../lib/connection/datanodeconnection.cc        |  57 +++
 .../lib/connection/datanodeconnection.h         |  66 +++
 .../main/native/libhdfspp/lib/fs/CMakeLists.txt |   2 +-
 .../main/native/libhdfspp/lib/fs/filehandle.cc  | 240 ++++++++++
 .../main/native/libhdfspp/lib/fs/filehandle.h   | 115 +++++
 .../main/native/libhdfspp/lib/fs/filesystem.cc  | 212 +++++++--
 .../main/native/libhdfspp/lib/fs/filesystem.h   | 137 +++---
 .../main/native/libhdfspp/lib/fs/inputstream.cc |  48 --
 .../native/libhdfspp/lib/fs/inputstream_impl.h  | 207 ---------
 .../native/libhdfspp/lib/reader/CMakeLists.txt  |   2 +-
 .../native/libhdfspp/lib/reader/block_reader.cc | 433 +++++++++++++++++++
 .../native/libhdfspp/lib/reader/block_reader.h  |  83 +++-
 .../native/libhdfspp/lib/reader/datatransfer.h  |  28 +-
 .../libhdfspp/lib/reader/datatransfer_impl.h    |  23 +-
 .../main/native/libhdfspp/lib/reader/fileinfo.h |  36 ++
 .../libhdfspp/lib/reader/remote_block_reader.cc |  46 --
 .../lib/reader/remote_block_reader_impl.h       | 342 ---------------
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  14 -
 .../main/native/libhdfspp/tests/CMakeLists.txt  |  12 +-
 .../native/libhdfspp/tests/bad_datanode_test.cc | 208 ++++++---
 .../native/libhdfspp/tests/inputstream_test.cc  | 232 ----------
 .../native/libhdfspp/tests/mock_connection.h    |  15 +-
 .../libhdfspp/tests/remote_block_reader_test.cc | 202 +++++++--
 39 files changed, 1802 insertions(+), 1524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
index 06619fd..dfff20b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -24,6 +24,7 @@
 #include <functional>
 #include <memory>
 #include <set>
+#include <iostream>
 
 namespace hdfs {
 
@@ -68,10 +69,10 @@ class NodeExclusionRule {
 };
 
 /**
- * Applications opens an InputStream to read files in HDFS.
+ * Applications opens a FileHandle to read files in HDFS.
  **/
-class InputStream {
- public:
+class FileHandle {
+public:
   /**
    * Read data from a specific position. The current implementation
    * stops at the block boundary.
@@ -83,10 +84,14 @@ class InputStream {
    * The handler returns the datanode that serves the block and the number of
    * bytes has read.
    **/
-  virtual void PositionRead(
-      void *buf, size_t nbyte, uint64_t offset,
-      const std::function<void(const Status &, const std::string &, size_t)> &
-          handler) = 0;
+  virtual void
+  PositionRead(void *buf, size_t nbyte, uint64_t offset,
+               const std::function<void(const Status &, size_t)> &handler) = 0;
+
+  virtual Status PositionRead(void *buf, size_t *nbyte, off_t offset) = 0;
+  virtual Status Read(void *buf, size_t *nbyte) = 0;
+  virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0;
+
   /**
    * Determine if a datanode should be excluded from future operations
    * based on the return Status.
@@ -97,7 +102,7 @@ class InputStream {
    **/
   static bool ShouldExclude(const Status &status);
 
-  virtual ~InputStream();
+  virtual ~FileHandle();
 };
 
 /**
@@ -114,15 +119,24 @@ class FileSystem {
       IoService *io_service, const Options &options, const std::string &server,
       const std::string &service,
       const std::function<void(const Status &, FileSystem *)> &handler);
+
+  /* Synchronous call of New*/
+  static FileSystem *
+  New(IoService *io_service, const Options &options, const std::string &server,
+      const std::string &service);
+
   /**
    * Open a file on HDFS. The call issues an RPC to the NameNode to
    * gather the locations of all blocks in the file and to return a
    * new instance of the @ref InputStream object.
    **/
-  virtual void Open(
-      const std::string &path,
-      const std::function<void(const Status &, InputStream *)> &handler) = 0;
-  virtual ~FileSystem();
+  virtual void
+  Open(const std::string &path,
+       const std::function<void(const Status &, FileHandle *)> &handler) = 0;
+  virtual Status Open(const std::string &path, FileHandle **handle) = 0;
+
+  virtual ~FileSystem() {};
+
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
index 0abdfa0..79b9c54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
@@ -31,6 +31,17 @@ struct Options {
   int rpc_timeout;
 
   /**
+   * Maximum number of retries for RPC operations
+   **/
+  const static int NO_RPC_RETRY = -1;
+  int max_rpc_retries;
+
+  /**
+   * Number of ms to wait between retry of RPC operations
+   **/
+  int rpc_retry_delay_ms;
+
+  /**
    * Exclusion time for failed datanodes in milliseconds.
    * Default: 60000
    **/

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
index 434dc4e..c851597 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
@@ -21,4 +21,5 @@ add_subdirectory(fs)
 add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)
+add_subdirectory(connection)
 add_subdirectory(bindings)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
index e170370..664518a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
@@ -16,5 +16,5 @@
 # under the License.
 
 
-add_library(bindings_c hdfs.cc hdfs_cpp.cc)
+add_library(bindings_c hdfs.cc)
 add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 853e9d3..802b3ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -16,8 +16,10 @@
  * limitations under the License.
  */
 
-#include "hdfs_cpp.h"
+#include "fs/filesystem.h"
 
+#include <hdfs/hdfs.h>
+#include <string>
 #include <cstring>
 #include <iostream>
 
@@ -25,15 +27,15 @@ using namespace hdfs;
 
 /* Seperate the handles used by the C api from the C++ API*/
 struct hdfs_internal {
-  hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {}
-  hdfs_internal(std::unique_ptr<HadoopFileSystem> p)
+  hdfs_internal(FileSystem *p) : filesystem_(p) {}
+  hdfs_internal(std::unique_ptr<FileSystem> p)
       : filesystem_(std::move(p)) {}
   virtual ~hdfs_internal(){};
-  HadoopFileSystem *get_impl() { return filesystem_.get(); }
-  const HadoopFileSystem *get_impl() const { return filesystem_.get(); }
+  FileSystem *get_impl() { return filesystem_.get(); }
+  const FileSystem *get_impl() const { return filesystem_.get(); }
 
  private:
-  std::unique_ptr<HadoopFileSystem> filesystem_;
+  std::unique_ptr<FileSystem> filesystem_;
 };
 
 struct hdfsFile_internal {
@@ -102,17 +104,23 @@ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
 int hdfsFileIsOpenForRead(hdfsFile file) {
   /* files can only be open for reads at the moment, do a quick check */
   if (file) {
-    return file->get_impl()->IsOpenForRead();
+    return true; // Update implementation when we get file writing
   }
   return false;
 }
 
 hdfsFS hdfsConnect(const char *nn, tPort port) {
-  HadoopFileSystem *fs = new HadoopFileSystem();
-  Status stat = fs->Connect(nn, port);
-  if (!stat.ok()) {
+  std::string port_as_string = std::to_string(port);
+  IoService * io_service = IoService::New();
+  FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
+  if (!fs) {
     ReportError(ENODEV, "Unable to connect to NameNode.");
-    delete fs;
+
+    // FileSystem's ctor might take ownership of the io_service; if it does,
+    //    it will null out the pointer
+    if (io_service)
+      delete io_service;
+
     return nullptr;
   }
   return new hdfs_internal(fs);
@@ -139,7 +147,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
     return nullptr;
   }
   FileHandle *f = nullptr;
-  Status stat = fs->get_impl()->OpenFileForRead(path, &f);
+  Status stat = fs->get_impl()->Open(path, &f);
   if (!stat.ok()) {
     return nullptr;
   }
@@ -150,7 +158,6 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
   if (!CheckSystemAndHandle(fs, file)) {
     return -1;
   }
-
   delete file;
   return 0;
 }
@@ -162,8 +169,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
   }
 
   size_t len = length;
-  Status stat = file->get_impl()->Pread(buffer, &len, position);
-  if (!stat.ok()) {
+  Status stat = file->get_impl()->PositionRead(buffer, &len, position);
+  if(!stat.ok()) {
     return Error(stat);
   }
   return (tSize)len;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
deleted file mode 100644
index 8872b1d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "hdfs_cpp.h"
-
-#include <cstdint>
-#include <cerrno>
-#include <string>
-#include <future>
-#include <memory>
-#include <thread>
-#include <vector>
-#include <set>
-#include <tuple>
-
-#include <hdfs/hdfs.h>
-#include "libhdfspp/hdfs.h"
-#include "libhdfspp/status.h"
-#include "fs/filesystem.h"
-#include "common/hdfs_public_api.h"
-
-namespace hdfs {
-
-FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){}
-
-Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) {
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, std::string, size_t>>>();
-  std::future<std::tuple<Status, std::string, size_t>> future(callstate->get_future());
-
-  /* wrap async call with promise/future to make it blocking */
-  auto callback = [callstate](
-      const Status &s, const std::string &dn, size_t bytes) {
-    callstate->set_value(std::make_tuple(s, dn, bytes));
-  };
-
-  input_stream_->PositionRead(buf, *nbyte, offset, callback);
-
-  /* wait for async to finish */
-  auto returnstate = future.get();
-  auto stat = std::get<0>(returnstate);
-
-  if (!stat.ok()) {
-    /* determine if DN gets marked bad */
-    if (InputStream::ShouldExclude(stat)) {
-      InputStreamImpl *impl =
-          static_cast<InputStreamImpl *>(input_stream_.get());
-      impl->bad_node_tracker_->AddBadNode(std::get<1>(returnstate));
-    }
-
-    return stat;
-  }
-  *nbyte = std::get<2>(returnstate);
-  return Status::OK();
-}
-
-Status FileHandle::Read(void *buf, size_t *nbyte) {
-  Status stat = Pread(buf, nbyte, offset_);
-  if (!stat.ok()) {
-    return stat;
-  }
-
-  offset_ += *nbyte;
-  return Status::OK();
-}
-
-Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) {
-  off_t new_offset = -1;
-
-  switch (whence) {
-    case std::ios_base::beg:
-      new_offset = *offset;
-      break;
-    case std::ios_base::cur:
-      new_offset = offset_ + *offset;
-      break;
-    case std::ios_base::end:
-      new_offset = static_cast<InputStreamImpl *>(input_stream_.get())
-                       ->get_file_length() +
-                   *offset;
-      break;
-    default:
-      /* unsupported */
-      return Status::InvalidArgument("Invalid Seek whence argument");
-  }
-
-  if (!CheckSeekBounds(new_offset)) {
-    return Status::InvalidArgument("Seek offset out of bounds");
-  }
-  offset_ = new_offset;
-
-  *offset = offset_;
-  return Status::OK();
-}
-
-/* return false if seek will be out of bounds */
-bool FileHandle::CheckSeekBounds(ssize_t desired_position) {
-  ssize_t file_length =
-      static_cast<InputStreamImpl *>(input_stream_.get())->get_file_length();
-
-  if (desired_position < 0 || desired_position >= file_length) {
-    return false;
-  }
-
-  return true;
-}
-
-bool FileHandle::IsOpenForRead() {
-  /* for now just check if InputStream exists */
-  if (!input_stream_) {
-    return false;
-  }
-  return true;
-}
-
-HadoopFileSystem::~HadoopFileSystem() {
-  /**
-   * Note: IoService must be stopped before getting rid of worker threads.
-   * Once worker threads are joined and deleted the service can be deleted.
-   **/
-
-  file_system_.reset(nullptr);
-  service_->Stop();
-  worker_threads_.clear();
-  service_.reset(nullptr);
-}
-
-Status HadoopFileSystem::Connect(const char *nn, tPort port,
-                                 unsigned int threads) {
-  /* IoService::New can return nullptr */
-  if (!service_) {
-    return Status::Error("Null IoService");
-  }
-  /* spawn background threads for asio delegation */
-  for (unsigned int i = 0; i < threads; i++) {
-    AddWorkerThread();
-  }
-  /* synchronized */
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem*>>>();
-  std::future<std::tuple<Status, FileSystem*>> future(callstate->get_future());
-
-  auto callback = [callstate](const Status &s, FileSystem *f) {
-    callstate->set_value(std::make_tuple(s,f));
-  };
-
-  /* dummy options object until this is hooked up to HDFS-9117 */
-  Options options_object;
-  FileSystem::New(service_.get(), options_object, nn, std::to_string(port),
-                  callback);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  FileSystem *fs = std::get<1>(returnstate);
-
-  /* check and see if it worked */
-  if (!stat.ok() || !fs) {
-    service_->Stop();
-    worker_threads_.clear();
-    return stat;
-  }
-
-  file_system_ = std::unique_ptr<FileSystem>(fs);
-  return stat;
-}
-
-int HadoopFileSystem::AddWorkerThread() {
-  auto service_task = [](IoService *service) { service->Run(); };
-  worker_threads_.push_back(
-      WorkerPtr(new std::thread(service_task, service_.get())));
-  return worker_threads_.size();
-}
-
-Status HadoopFileSystem::OpenFileForRead(const std::string &path,
-                                         FileHandle **handle) {
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, InputStream*>>>();
-  std::future<std::tuple<Status, InputStream*>> future(callstate->get_future());
-
-  /* wrap async FileSystem::Open with promise to make it a blocking call */
-  auto h = [callstate](const Status &s, InputStream *is) {
-    callstate->set_value(std::make_tuple(s, is));
-  };
-
-  file_system_->Open(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  InputStream *input_stream = std::get<1>(returnstate);
-
-  if (!stat.ok()) {
-    delete input_stream;
-    return stat;
-  }
-  if (!input_stream) {
-    return stat;
-  }
-
-  *handle = new FileHandle(input_stream);
-  return stat;
-}
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
deleted file mode 100644
index b36ee8f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H
-#define LIBHDFSPP_BINDINGS_HDFSCPP_H
-
-#include <cstdint>
-#include <thread>
-#include <vector>
-#include <mutex>
-#include <chrono>
-#include <iostream>
-
-#include "libhdfspp/hdfs.h"
-#include "fs/bad_datanode_tracker.h"
-#include <hdfs/hdfs.h>
-
-namespace hdfs {
-
-/**
- * Implement a very simple 'it just works' interface in C++
- * that provides posix-like file operations + extra stuff for hadoop.
- * Then provide very thin C wrappers over each method.
- */
-
-class HadoopFileSystem;
-
-class FileHandle {
- public:
-  virtual ~FileHandle(){};
-  /**
-   * Note:  The nbyte argument for Read and Pread as well as the
-   * offset argument for Seek are in/out parameters.
-   *
-   * For Read and Pread the value referenced by nbyte should
-   * be set to the number of bytes to read. Before returning
-   * the value referenced will be set by the callee to the number
-   * of bytes that was successfully read.
-   *
-   * For Seek the value referenced by offset should be the number
-   * of bytes to shift from the specified whence position.  The
-   * referenced value will be set to the new offset before returning.
-   **/
-  Status Pread(void *buf, size_t *nbyte, off_t offset);
-  Status Read(void *buf, size_t *nbyte);
-  Status Seek(off_t *offset, std::ios_base::seekdir whence);
-  bool IsOpenForRead();
-
- private:
-  /* handle should only be created by fs */
-  friend class HadoopFileSystem;
-  FileHandle(InputStream *is);
-  bool CheckSeekBounds(ssize_t desired_position);
-  std::unique_ptr<InputStream> input_stream_;
-  off_t offset_;
-};
-
-class HadoopFileSystem {
- public:
-  HadoopFileSystem() : service_(IoService::New()) {}
-  virtual ~HadoopFileSystem();
-
-  /* attempt to connect to namenode, return false on failure */
-  Status Connect(const char *nn, tPort port, unsigned int threads = 1);
-
-  /* how many worker threads are servicing asio requests */
-  int WorkerThreadCount() { return worker_threads_.size(); }
-
-  /* add a new thread to handle asio requests, return number of threads in pool
-   */
-  int AddWorkerThread();
-
-  Status OpenFileForRead(const std::string &path, FileHandle **handle);
-
- private:
-  std::unique_ptr<IoService> service_;
-  /* std::thread needs to join before deletion */
-  struct WorkerDeleter {
-    void operator()(std::thread *t) {
-      t->join();
-      delete t;
-    }
-  };
-  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
-  std::vector<WorkerPtr> worker_threads_;
-  std::unique_ptr<FileSystem> file_system_;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 8dbcd03..0d3752a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -1 +1,18 @@
-add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc)
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc util.cc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
new file mode 100644
index 0000000..575904c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_COMMON_ASYNC_STREAM_H_
+#define LIB_COMMON_ASYNC_STREAM_H_
+
+#include <asio.hpp>
+
+namespace hdfs {
+
+typedef asio::mutable_buffers_1 MutableBuffers;
+typedef asio::const_buffers_1   ConstBuffers;
+
+/*
+ * asio-compatible stream implementation.
+ *
+ * Lifecycle: should be managed using std::shared_ptr so the object can be
+ *    handed from consumer to consumer
+ * Threading model: async_read_some and async_write_some are not thread-safe.
+ */
+class AsyncStream  {
+public:
+  virtual void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) = 0;
+
+  virtual void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) = 0;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
index 5630934..a5a0446 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -29,7 +29,9 @@
 #include <asio/ip/tcp.hpp>
 
 namespace hdfs {
-namespace continuation {
+namespace asio_continuation {
+
+using namespace continuation;
 
 template <class Stream, class MutableBufferSequence>
 class ReadContinuation : public Continuation {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
index 08caf0d..54caeed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -33,7 +33,7 @@ namespace continuation {
 
 template <class Stream, size_t MaxMessageSize = 512>
 struct ReadDelimitedPBMessageContinuation : public Continuation {
-  ReadDelimitedPBMessageContinuation(Stream *stream,
+  ReadDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
                                      ::google::protobuf::MessageLite *msg)
       : stream_(stream), msg_(msg) {}
 
@@ -56,8 +56,8 @@ struct ReadDelimitedPBMessageContinuation : public Continuation {
       }
       next(status);
     };
-    asio::async_read(
-        *stream_, asio::buffer(buf_),
+    asio::async_read(*stream_,
+        asio::buffer(buf_),
         std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
                   std::placeholders::_1, std::placeholders::_2),
         handler);
@@ -82,14 +82,14 @@ private:
     return offset ? len + offset - transferred : 1;
   }
 
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   ::google::protobuf::MessageLite *msg_;
   std::array<char, MaxMessageSize> buf_;
 };
 
 template <class Stream>
 struct WriteDelimitedPBMessageContinuation : Continuation {
-  WriteDelimitedPBMessageContinuation(Stream *stream,
+  WriteDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
                                       const google::protobuf::MessageLite *msg)
       : stream_(stream), msg_(msg) {}
 
@@ -101,28 +101,25 @@ struct WriteDelimitedPBMessageContinuation : Continuation {
     pbio::CodedOutputStream os(&ss);
     os.WriteVarint32(size);
     msg_->SerializeToCodedStream(&os);
-    write_coroutine_ =
-        std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
-    write_coroutine_->Run([next](const Status &stat) { next(stat); });
+    asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } );
   }
 
 private:
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   const google::protobuf::MessageLite *msg_;
   std::string buf_;
-  std::shared_ptr<Continuation> write_coroutine_;
 };
 
 template <class Stream, size_t MaxMessageSize = 512>
 static inline Continuation *
-ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ReadDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
   return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
                                                                         msg);
 }
 
 template <class Stream>
 static inline Continuation *
-WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+WriteDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
   return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
 }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
index 37408de..0251ce5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
@@ -24,20 +24,4 @@ IoService::~IoService() {}
 
 IoService *IoService::New() { return new IoServiceImpl(); }
 
-bool InputStream::ShouldExclude(const Status &s) {
-  if (s.ok()) {
-    return false;
-  }
-
-  switch (s.code()) {
-    /* client side resource exhaustion */
-    case Status::kResourceUnavailable:
-      return false;
-    case Status::kInvalidArgument:
-    case Status::kUnimplemented:
-    case Status::kException:
-    default:
-      return true;
-  }
-}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
index e71b6b3..9e1acbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
@@ -20,5 +20,6 @@
 
 namespace hdfs {
 
-Options::Options() : rpc_timeout(30000), host_exclusion_duration(600000) {}
+Options::Options() : rpc_timeout(30000), max_rpc_retries(0),
+                     rpc_retry_delay_ms(10000), host_exclusion_duration(600000) {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
new file mode 100644
index 0000000..eaef2d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/util.h"
+
+namespace hdfs {
+
+std::string GetRandomClientName() {
+  unsigned char buf[6] = {
+      0,
+  };
+  RAND_pseudo_bytes(buf, sizeof(buf));
+
+  std::stringstream ss;
+  ss << "libhdfs++_"
+     << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
+  return ss.str();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
index ff9f36c..a6acc4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
@@ -20,7 +20,10 @@
 
 #include "libhdfspp/status.h"
 
+#include <sstream>
+
 #include <asio/error_code.hpp>
+#include <openssl/rand.h>
 
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
@@ -53,6 +56,10 @@ static inline void ReadDelimitedPBMessage(
 
 std::string Base64Encode(const std::string &src);
 
+/*
+ * Returns a new high-entropy client name
+ */
+std::string GetRandomClientName();
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
new file mode 100644
index 0000000..54ba96c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
@@ -0,0 +1,2 @@
+add_library(connection datanodeconnection.cc)
+add_dependencies(connection proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
new file mode 100644
index 0000000..b053e7f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datanodeconnection.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+DataNodeConnection::~DataNodeConnection(){}
+DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
+
+DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
+                                                const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                                                const hadoop::common::TokenProto *token)
+{
+  using namespace ::asio::ip;
+
+  conn_.reset(new tcp::socket(*io_service));
+  auto datanode_addr = dn_proto.id();
+  endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
+                                  datanode_addr.xferport());
+  uuid_ = dn_proto.id().datanodeuuid();
+
+  if (token) {
+    token_.reset(new hadoop::common::TokenProto());
+    token_->CheckTypeAndMergeFrom(*token);
+  }
+}
+
+
+void DataNodeConnectionImpl::Connect(
+             std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) {
+  // Keep the DN from being freed until we're done
+  auto shared_this = shared_from_this();
+  asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(),
+          [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint, 1>::iterator it) {
+            (void)it;
+            handler(ToStatus(ec), shared_this); });
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
new file mode 100644
index 0000000..d5bbb92
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+
+#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include "asio.hpp"
+
+namespace hdfs {
+
+class DataNodeConnection : public AsyncStream {
+public:
+    std::string uuid_;
+    std::unique_ptr<hadoop::common::TokenProto> token_;
+
+    virtual ~DataNodeConnection();
+    virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0;
+};
+
+
+class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
+public:
+  std::unique_ptr<asio::ip::tcp::socket> conn_;
+  std::array<asio::ip::tcp::endpoint, 1> endpoints_;
+  std::string uuid_;
+
+  virtual ~DataNodeConnectionImpl();
+  DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                          const hadoop::common::TokenProto *token);
+
+  void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override;
+
+  void async_read_some(const MutableBuffers &buf,
+        std::function<void (const asio::error_code & error,
+                               std::size_t bytes_transferred) > handler) override {
+    conn_->async_read_some(buf, handler);
+  };
+
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    conn_->async_write_some(buf, handler);
+  }
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
index 0870fb3..ae56b3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -1,2 +1,2 @@
-add_library(fs filesystem.cc inputstream.cc bad_datanode_tracker.cc)
+add_library(fs filesystem.cc filehandle.cc bad_datanode_tracker.cc)
 add_dependencies(fs proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
new file mode 100644
index 0000000..8cf41ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filehandle.h"
+#include "common/continuation/continuation.h"
+#include "connection/datanodeconnection.h"
+#include "reader/block_reader.h"
+
+#include <future>
+#include <tuple>
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+FileHandle::~FileHandle() {}
+
+FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
+                                 const std::shared_ptr<const struct FileInfo> file_info,
+                                 std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
+    : io_service_(io_service), client_name_(client_name), file_info_(file_info),
+      bad_node_tracker_(bad_data_nodes), offset_(0) {
+}
+
+void FileHandleImpl::PositionRead(
+    void *buf, size_t nbyte, uint64_t offset,
+    const std::function<void(const Status &, size_t)>
+        &handler) {
+
+  auto callback = [this, handler](const Status &status,
+                                  const std::string &contacted_datanode,
+                                  size_t bytes_read) {
+    /* determine if DN gets marked bad */
+    if (ShouldExclude(status)) {
+      bad_node_tracker_->AddBadNode(contacted_datanode);
+    }
+
+    handler(status, bytes_read);
+  };
+
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback);
+}
+
+Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
+  std::future<std::tuple<Status, size_t>> future(callstate->get_future());
+
+  /* wrap async call with promise/future to make it blocking */
+  auto callback = [callstate](const Status &s, size_t bytes) {
+    callstate->set_value(std::make_tuple(s,bytes));
+  };
+
+  PositionRead(buf, *nbyte, offset, callback);
+
+  /* wait for async to finish */
+  auto returnstate = future.get();
+  auto stat = std::get<0>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *nbyte = std::get<1>(returnstate);
+  return stat;
+}
+
+Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
+  Status stat = PositionRead(buf, nbyte, offset_);
+  if(!stat.ok()) {
+    return stat;
+  }
+
+  offset_ += *nbyte;
+  return Status::OK();
+}
+
+Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
+  off_t new_offset = -1;
+
+  switch (whence) {
+    case std::ios_base::beg:
+      new_offset = *offset;
+      break;
+    case std::ios_base::cur:
+      new_offset = offset_ + *offset;
+      break;
+    case std::ios_base::end:
+      new_offset = file_info_->file_length_ + *offset;
+      break;
+    default:
+      /* unsupported */
+      return Status::InvalidArgument("Invalid Seek whence argument");
+  }
+
+  if(!CheckSeekBounds(new_offset)) {
+    return Status::InvalidArgument("Seek offset out of bounds");
+  }
+  offset_ = new_offset;
+
+  *offset = offset_;
+  return Status::OK();
+}
+
+/* return false if seek will be out of bounds */
+bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
+  ssize_t file_length = file_info_->file_length_;
+
+  if (desired_position < 0 || desired_position >= file_length) {
+    return false;
+  }
+
+  return true;
+}
+
+/*
+ * Note that this method must be thread-safe w.r.t. the unsafe operations occurring
+ * on the FileHandle
+ */
+void FileHandleImpl::AsyncPreadSome(
+    size_t offset, const MutableBuffers &buffers,
+    std::shared_ptr<NodeExclusionRule> excluded_nodes,
+    const std::function<void(const Status &, const std::string &, size_t)> handler) {
+  using ::hadoop::hdfs::DatanodeInfoProto;
+  using ::hadoop::hdfs::LocatedBlockProto;
+
+  /**
+   *  Note: block and chosen_dn will end up pointing to things inside
+   *  the blocks_ vector.  They shouldn't be directly deleted.
+   **/
+  auto block = std::find_if(
+      file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (block == file_info_->blocks_.end()) {
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
+    return;
+  }
+
+  /**
+   * If user supplies a rule use it, otherwise use the tracker.
+   * User is responsible for making sure one of them isn't null.
+   **/
+  std::shared_ptr<NodeExclusionRule> rule =
+      excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
+
+  auto datanodes = block->locs();
+  auto it = std::find_if(datanodes.begin(), datanodes.end(),
+                         [rule](const DatanodeInfoProto &dn) {
+                           return !rule->IsBadNode(dn.id().datanodeuuid());
+                         });
+
+  if (it == datanodes.end()) {
+    handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
+    return;
+  }
+
+  DatanodeInfoProto &chosen_dn = *it;
+
+  uint64_t offset_within_block = offset - block->offset();
+  uint64_t size_within_block = std::min<uint64_t>(
+      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+  // This is where we will put the logic for re-using a DN connection; we can
+  //    steal the FileHandle's dn and put it back when we're done
+  std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, nullptr /*token*/);
+  std::string dn_id = dn->uuid_;
+  std::string client_name = client_name_;
+
+  // Wrap the DN in a block reader to handle the state and logic of the
+  //    block request protocol
+  std::shared_ptr<BlockReader> reader;
+  reader = CreateBlockReader(BlockReaderOptions(), dn);
+
+
+  auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) {
+    handler(status, dn_id, transferred);
+  };
+
+  dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
+          (Status status, std::shared_ptr<DataNodeConnection> dn) {
+    (void)dn;
+    if (status.ok()) {
+      reader->AsyncReadBlock(
+          client_name, *block, offset_within_block,
+          asio::buffer(buffers, size_within_block), read_handler);
+    } else {
+      handler(status, dn_id, 0);
+    }
+  });
+
+  return;
+}
+
+std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
+                                               std::shared_ptr<DataNodeConnection> dn)
+{
+  return std::make_shared<BlockReaderImpl>(options, dn);
+}
+
+std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
+    ::asio::io_service * io_service,
+    const ::hadoop::hdfs::DatanodeInfoProto & dn,
+    const hadoop::common::TokenProto * token) {
+  return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
+}
+
+bool FileHandle::ShouldExclude(const Status &s) {
+  if (s.ok()) {
+    return false;
+  }
+
+  switch (s.code()) {
+    /* client side resource exhaustion */
+    case Status::kResourceUnavailable:
+      return false;
+    case Status::kInvalidArgument:
+    case Status::kUnimplemented:
+    case Status::kException:
+    default:
+      return true;
+  }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
new file mode 100644
index 0000000..95b5869
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+
+#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
+#include "reader/fileinfo.h"
+
+#include "asio.hpp"
+#include "bad_datanode_tracker.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include <mutex>
+#include <iostream>
+
+namespace hdfs {
+
+class BlockReader;
+class BlockReaderOptions;
+class DataNodeConnection;
+
+/*
+ * FileHandle: coordinates operations on a particular file in HDFS
+ *
+ * Threading model: not thread-safe; consumers and io_service should not call
+ *    concurrently.  PositionRead is the exceptions; they can be
+ *    called concurrently and repeatedly.
+ * Lifetime: pointer returned to consumer by FileSystem::Open.  Consumer is
+ *    resonsible for freeing the object.
+ */
+class FileHandleImpl : public FileHandle {
+public:
+  FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
+                  const std::shared_ptr<const struct FileInfo> file_info,
+                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes);
+
+  /*
+   * [Some day reliably] Reads a particular offset into the data file.
+   * On error, bytes_read returns the number of bytes successfully read; on
+   * success, bytes_read will equal nbyte
+   */
+  void PositionRead(
+		void *buf,
+		size_t nbyte,
+		uint64_t offset,
+    const std::function<void(const Status &status, size_t bytes_read)> &handler
+    ) override;
+
+  /**
+   * Note:  The nbyte argument for Read and Pread as well as the
+   * offset argument for Seek are in/out parameters.
+   *
+   * For Read and Pread the value referenced by nbyte should
+   * be set to the number of bytes to read. Before returning
+   * the value referenced will be set by the callee to the number
+   * of bytes that was successfully read.
+   *
+   * For Seek the value referenced by offset should be the number
+   * of bytes to shift from the specified whence position.  The
+   * referenced value will be set to the new offset before returning.
+   **/
+  Status PositionRead(void *buf, size_t *bytes_read, off_t offset) override;
+  Status Read(void *buf, size_t *nbyte) override;
+  Status Seek(off_t *offset, std::ios_base::seekdir whence) override;
+
+
+  /*
+   * Reads some amount of data into the buffer.  Will attempt to find the best
+   * datanode and read data from it.
+   *
+   * If an error occurs during connection or transfer, the callback will be
+   * called with bytes_read equal to the number of bytes successfully transferred.
+   * If no data nodes can be found, status will be Status::ResourceUnavailable.
+   *
+   */
+  void AsyncPreadSome(size_t offset, const MutableBuffers &buffers,
+                      std::shared_ptr<NodeExclusionRule> excluded_nodes,
+                      const std::function<void(const Status &status,
+                      const std::string &dn_id, size_t bytes_read)> handler);
+
+protected:
+  virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
+                                                 std::shared_ptr<DataNodeConnection> dn);
+  virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+      ::asio::io_service *io_service,
+      const ::hadoop::hdfs::DatanodeInfoProto & dn,
+      const hadoop::common::TokenProto * token);
+private:
+  ::asio::io_service * const io_service_;
+  const std::string client_name_;
+  const std::shared_ptr<const struct FileInfo> file_info_;
+  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+  bool CheckSeekBounds(ssize_t desired_position);
+  off_t offset_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index e0c27ac..7404606 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -22,7 +22,10 @@
 
 #include <asio/ip/tcp.hpp>
 
+#include <functional>
 #include <limits>
+#include <future>
+#include <tuple>
 
 namespace hdfs {
 
@@ -32,38 +35,17 @@ static const int kNamenodeProtocolVersion = 1;
 
 using ::asio::ip::tcp;
 
-FileSystem::~FileSystem() {}
+/*****************************************************************************
+ *                    NAMENODE OPERATIONS
+ ****************************************************************************/
 
-void FileSystem::New(
-    IoService *io_service, const Options &options, const std::string &server,
-    const std::string &service,
-    const std::function<void(const Status &, FileSystem *)> &handler) {
-  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
-  impl->Connect(server, service, [impl, handler](const Status &stat) {
-    if (stat.ok()) {
-      handler(stat, impl);
-    } else {
-      delete impl;
-      handler(stat, nullptr);
-    }
-  });
-}
-
-FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options)
-    : io_service_(static_cast<IoServiceImpl *>(io_service)),
-      engine_(&io_service_->io_service(), options,
-              RpcEngine::GetRandomClientName(), kNamenodeProtocol,
-              kNamenodeProtocolVersion),
-      namenode_(&engine_),
-      bad_node_tracker_(std::make_shared<BadDataNodeTracker>()) {}
-
-void FileSystemImpl::Connect(const std::string &server,
+void NameNodeOperations::Connect(const std::string &server,
                              const std::string &service,
-                             std::function<void(const Status &)> &&handler) {
-  using namespace continuation;
+                             std::function<void(const Status &)> &handler) {
+  using namespace asio_continuation;
   typedef std::vector<tcp::endpoint> State;
   auto m = Pipeline<State>::Create();
-  m->Push(Resolve(&io_service_->io_service(), server, service,
+  m->Push(Resolve(io_service_, server, service,
                   std::back_inserter(m->state())))
       .Push(Bind([this, m](const Continuation::Next &next) {
         engine_.Connect(m->state().front(), next);
@@ -76,9 +58,9 @@ void FileSystemImpl::Connect(const std::string &server,
   });
 }
 
-void FileSystemImpl::Open(
-    const std::string &path,
-    const std::function<void(const Status &, InputStream *)> &handler) {
+void NameNodeOperations::GetBlockLocations(const std::string & path,
+  std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
+{
   using ::hadoop::hdfs::GetBlockLocationsRequestProto;
   using ::hadoop::hdfs::GetBlockLocationsResponseProto;
 
@@ -99,10 +81,174 @@ void FileSystemImpl::Open(
       [this, s](const continuation::Continuation::Next &next) {
         namenode_.GetBlockLocations(&s->req, s->resp, next);
       }));
+
   m->Run([this, handler](const Status &stat, const State &s) {
-    handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations(),
-                                                  bad_node_tracker_)
+    if (stat.ok()) {
+      auto file_info = std::make_shared<struct FileInfo>();
+      auto locations = s.resp->locations();
+
+      file_info->file_length_ = locations.filelength();
+
+      for (const auto &block : locations.blocks()) {
+        file_info->blocks_.push_back(block);
+      }
+
+      if (locations.has_lastblock() && locations.lastblock().b().numbytes()) {
+        file_info->blocks_.push_back(locations.lastblock());
+      }
+
+      handler(stat, file_info);
+    } else {
+      handler(stat, nullptr);
+    }
+  });
+}
+
+
+/*****************************************************************************
+ *                    FILESYSTEM BASE CLASS
+ ****************************************************************************/
+
+void FileSystem::New(
+    IoService *io_service, const Options &options, const std::string &server,
+    const std::string &service,
+    const std::function<void(const Status &, FileSystem *)> &handler) {
+  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
+  impl->Connect(server, service, [impl, handler](const Status &stat) {
+    if (stat.ok()) {
+      handler(stat, impl);
+    } else {
+      delete impl;
+      handler(stat, nullptr);
+    }
+  });
+}
+
+FileSystem * FileSystem::New(
+    IoService *io_service, const Options &options, const std::string &server,
+    const std::string &service) {
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
+  std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
+
+  auto callback = [callstate](const Status &s, FileSystem * fs) {
+    callstate->set_value(std::make_tuple(s, fs));
+  };
+
+  New(io_service, options, server, service, callback);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+
+  if (std::get<0>(returnstate).ok()) {
+    return std::get<1>(returnstate);
+  } else {
+    return nullptr;
+  }
+}
+
+/*****************************************************************************
+ *                    FILESYSTEM IMPLEMENTATION
+ ****************************************************************************/
+
+FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options)
+  :   io_service_(static_cast<IoServiceImpl *>(io_service)),
+      nn_(&io_service_->io_service(), options,
+          GetRandomClientName(), kNamenodeProtocol,
+          kNamenodeProtocolVersion),
+      client_name_(GetRandomClientName())
+{
+  // Poor man's move
+  io_service = nullptr;
+
+  /* spawn background threads for asio delegation */
+  unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
+  for (unsigned int i = 0; i < threads; i++) {
+    AddWorkerThread();
+  }
+}
+
+FileSystemImpl::~FileSystemImpl() {
+  /**
+   * Note: IoService must be stopped before getting rid of worker threads.
+   * Once worker threads are joined and deleted the service can be deleted.
+   **/
+  io_service_->Stop();
+  worker_threads_.clear();
+  io_service_.reset(nullptr);
+}
+
+void FileSystemImpl::Connect(const std::string &server,
+                             const std::string &service,
+                             std::function<void(const Status &)> &&handler) {
+  /* IoService::New can return nullptr */
+  if (!io_service_) {
+    handler (Status::Error("Null IoService"));
+  }
+  nn_.Connect(server, service, handler);
+}
+
+Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
+  /* synchronized */
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat](const Status &s) {
+    stat->set_value(s);
+  };
+
+  Connect(server, service, callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  return s;
+}
+
+
+int FileSystemImpl::AddWorkerThread() {
+  auto service_task = [](IoService *service) { service->Run(); };
+  worker_threads_.push_back(
+      WorkerPtr(new std::thread(service_task, io_service_.get())));
+  return worker_threads_.size();
+}
+
+void FileSystemImpl::Open(
+    const std::string &path,
+    const std::function<void(const Status &, FileHandle *)> &handler) {
+
+  nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
+    handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_)
                             : nullptr);
   });
 }
+
+Status FileSystemImpl::Open(const std::string &path,
+                                         FileHandle **handle) {
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
+  std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
+
+  /* wrap async FileSystem::Open with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, FileHandle *is) {
+    callstate->set_value(std::make_tuple(s, is));
+  };
+
+  Open(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  FileHandle *file_handle = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    delete file_handle;
+    return stat;
+  }
+  if (!file_handle) {
+    return stat;
+  }
+
+  *handle = file_handle;
+  return stat;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index dfe8b0c..cc8a8e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -18,75 +18,110 @@
 #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 
+#include "filehandle.h"
 #include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
 #include "libhdfspp/hdfs.h"
 #include "fs/bad_datanode_tracker.h"
 #include "rpc/rpc_engine.h"
+#include "reader/block_reader.h"
+#include "reader/fileinfo.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "ClientNamenodeProtocol.hrpc.inl"
 
+#include "asio.hpp"
+
+#include <thread>
+
 namespace hdfs {
 
-class FileHandle;
-class HadoopFileSystem;
+/**
+ * NameNodeConnection: abstracts the details of communicating with a NameNode
+ * and the implementation of the communications protocol.
+ *
+ * Will eventually handle retry and failover.
+ *
+ * Threading model: thread-safe; all operations can be called concurrently
+ * Lifetime: owned by a FileSystemImpl
+ */
+class NameNodeOperations {
+public:
+  NameNodeOperations(::asio::io_service *io_service, const Options &options,
+            const std::string &client_name, const char *protocol_name,
+            int protocol_version) :
+  io_service_(io_service),
+  engine_(io_service, options, client_name, protocol_name, protocol_version),
+  namenode_(& engine_) {}
+
+  void Connect(const std::string &server,
+               const std::string &service,
+               std::function<void(const Status &)> &handler);
+
+  void GetBlockLocations(const std::string & path,
+    std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
 
+private:
+  ::asio::io_service * io_service_;
+  RpcEngine engine_;
+  ClientNamenodeProtocol namenode_;
+};
+
+/*
+ * FileSystem: The consumer's main point of interaction with the cluster as
+ * a whole.
+ *
+ * Initially constructed in a disconnected state; call Connect before operating
+ * on the FileSystem.
+ *
+ * All open files must be closed before the FileSystem is destroyed.
+ *
+ * Threading model: thread-safe for all operations
+ * Lifetime: pointer created for consumer who is responsible for deleting it
+ */
 class FileSystemImpl : public FileSystem {
- public:
-  FileSystemImpl(IoService *io_service, const Options &options);
+public:
+  FileSystemImpl(IoService *&io_service, const Options &options);
+  ~FileSystemImpl() override;
+
+  /* attempt to connect to namenode, return bad status on failure */
   void Connect(const std::string &server, const std::string &service,
                std::function<void(const Status &)> &&handler);
+  /* attempt to connect to namenode, return bad status on failure */
+  Status Connect(const std::string &server, const std::string &service);
+
+
   virtual void Open(const std::string &path,
-                    const std::function<void(const Status &, InputStream *)> &
-                        handler) override;
-  RpcEngine &rpc_engine() { return engine_; }
+                    const std::function<void(const Status &, FileHandle *)>
+                        &handler) override;
+  Status Open(const std::string &path, FileHandle **handle) override;
 
- private:
-  IoServiceImpl *io_service_;
-  RpcEngine engine_;
-  ClientNamenodeProtocol namenode_;
-  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
-};
 
-class InputStreamImpl : public InputStream {
- public:
-  InputStreamImpl(FileSystemImpl *fs,
-                  const ::hadoop::hdfs::LocatedBlocksProto *blocks,
-                  std::shared_ptr<BadDataNodeTracker> tracker);
-  virtual void PositionRead(
-      void *buf, size_t nbyte, uint64_t offset,
-      const std::function<void(const Status &, const std::string &, size_t)> &
-          handler) override;
-  /**
-   * If optional_rule_override is null then use the bad_datanode_tracker.  If
-   * non-null use the provided NodeExclusionRule to determine eligible
-   * datanodes.
-   **/
-  template <class MutableBufferSequence, class Handler>
-  void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
-                      std::shared_ptr<NodeExclusionRule> excluded_nodes,
-                      const Handler &handler);
-
-  template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
-  void AsyncReadBlock(const std::string &client_name,
-                      const hadoop::hdfs::LocatedBlockProto &block,
-                      const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
-                      const MutableBufferSequence &buffers,
-                      const Handler &handler);
-  uint64_t get_file_length() const;
- private:
-  FileSystemImpl *fs_;
-  unsigned long long file_length_;
-  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
-  template <class Reader>
-  struct HandshakeContinuation;
-  template <class Reader, class MutableBufferSequence>
-  struct ReadBlockContinuation;
-  struct RemoteBlockReaderTrait;
-  friend class FileHandle;
+  /* add a new thread to handle asio requests, return number of threads in pool
+   */
+  int AddWorkerThread();
+
+  /* how many worker threads are servicing asio requests */
+  int WorkerThreadCount() { return worker_threads_.size(); }
+
+
+private:
+  std::unique_ptr<IoServiceImpl> io_service_;
+  NameNodeOperations nn_;
+  const std::string client_name_;
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+
+  struct WorkerDeleter {
+    void operator()(std::thread *t) {
+      t->join();
+      delete t;
+    }
+  };
+  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
+  std::vector<WorkerPtr> worker_threads_;
+
 };
-}
 
-#include "inputstream_impl.h"
+
+}
 
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
deleted file mode 100644
index 0b78c93..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "filesystem.h"
-
-namespace hdfs {
-
-using ::hadoop::hdfs::LocatedBlocksProto;
-
-InputStream::~InputStream() {}
-
-InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
-                                 const LocatedBlocksProto *blocks,
-                                 std::shared_ptr<BadDataNodeTracker> tracker)
-    : fs_(fs), file_length_(blocks->filelength()), bad_node_tracker_(tracker) {
-  for (const auto &block : blocks->blocks()) {
-    blocks_.push_back(block);
-  }
-
-  if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
-    blocks_.push_back(blocks->lastblock());
-  }
-}
-
-void InputStreamImpl::PositionRead(
-    void *buf, size_t nbyte, uint64_t offset,
-    const std::function<void(const Status &, const std::string &, size_t)> &
-        handler) {
-  AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler);
-}
-
-uint64_t InputStreamImpl::get_file_length() const { return file_length_; }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
deleted file mode 100644
index 2f64d39..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef FS_INPUTSTREAM_IMPL_H_
-#define FS_INPUTSTREAM_IMPL_H_
-
-#include "reader/block_reader.h"
-
-#include "common/continuation/asio.h"
-#include "common/continuation/protobuf.h"
-
-#include <functional>
-#include <future>
-#include <type_traits>
-#include <algorithm>
-
-namespace hdfs {
-
-struct InputStreamImpl::RemoteBlockReaderTrait {
-  typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
-  struct State {
-    std::unique_ptr<asio::ip::tcp::socket> conn_;
-    std::shared_ptr<Reader> reader_;
-    std::array<asio::ip::tcp::endpoint, 1> endpoints_;
-    size_t transferred_;
-    Reader *reader() { return reader_.get(); }
-    size_t *transferred() { return &transferred_; }
-    const size_t *transferred() const { return &transferred_; }
-  };
-  static continuation::Pipeline<State> *CreatePipeline(
-      ::asio::io_service *io_service,
-      const ::hadoop::hdfs::DatanodeInfoProto &dn) {
-    using namespace ::asio::ip;
-    auto m = continuation::Pipeline<State>::Create();
-    auto &s = m->state();
-    s.conn_.reset(new tcp::socket(*io_service));
-    s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get());
-    auto datanode = dn.id();
-    s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
-                                    datanode.xferport());
-
-    m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
-                                  s.endpoints_.end()));
-    return m;
-  }
-};
-
-template <class Reader>
-struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
-  HandshakeContinuation(Reader *reader, const std::string &client_name,
-                        const hadoop::common::TokenProto *token,
-                        const hadoop::hdfs::ExtendedBlockProto *block,
-                        uint64_t length, uint64_t offset)
-      : reader_(reader),
-        client_name_(client_name),
-        length_(length),
-        offset_(offset) {
-    if (token) {
-      token_.reset(new hadoop::common::TokenProto());
-      token_->CheckTypeAndMergeFrom(*token);
-    }
-    block_.CheckTypeAndMergeFrom(*block);
-  }
-
-  virtual void Run(const Next &next) override {
-    reader_->async_connect(client_name_, token_.get(), &block_, length_,
-                           offset_, next);
-  }
-
- private:
-  Reader *reader_;
-  const std::string client_name_;
-  std::unique_ptr<hadoop::common::TokenProto> token_;
-  hadoop::hdfs::ExtendedBlockProto block_;
-  uint64_t length_;
-  uint64_t offset_;
-};
-
-template <class Reader, class MutableBufferSequence>
-struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
-  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
-                        size_t *transferred)
-      : reader_(reader),
-        buffer_(buffer),
-        buffer_size_(asio::buffer_size(buffer)),
-        transferred_(transferred) {
-    static_assert(!std::is_reference<MutableBufferSequence>::value,
-                  "Buffer must not be a reference type");
-  }
-
-  virtual void Run(const Next &next) override {
-    *transferred_ = 0;
-    next_ = next;
-    OnReadData(Status::OK(), 0);
-  }
-
- private:
-  Reader *reader_;
-  const MutableBufferSequence buffer_;
-  const size_t buffer_size_;
-  size_t *transferred_;
-  std::function<void(const Status &)> next_;
-
-  void OnReadData(const Status &status, size_t transferred) {
-    using std::placeholders::_1;
-    using std::placeholders::_2;
-    *transferred_ += transferred;
-    if (!status.ok()) {
-      next_(status);
-    } else if (*transferred_ >= buffer_size_) {
-      next_(status);
-    } else {
-      reader_->async_read_some(
-          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
-          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
-    }
-  }
-};
-
-template <class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncPreadSome(
-    size_t offset, const MutableBufferSequence &buffers,
-    std::shared_ptr<NodeExclusionRule> excluded_nodes, const Handler &handler) {
-  using ::hadoop::hdfs::DatanodeInfoProto;
-  using ::hadoop::hdfs::LocatedBlockProto;
-
-  /**
-   *  Note: block and chosen_dn will end up pointing to things inside
-   *  the blocks_ vector.  They shouldn't be directly deleted.
-   **/
-  auto block = std::find_if(
-      blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
-        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
-      });
-
-  if (block == blocks_.end()) {
-    handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
-    return;
-  }
-
-  /**
-   * If user supplies a rule use it, otherwise use the tracker.
-   * User is responsible for making sure one of them isn't null.
-   **/
-  std::shared_ptr<NodeExclusionRule> rule =
-      excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
-
-  auto datanodes = block->locs();
-  auto it = std::find_if(datanodes.begin(), datanodes.end(),
-                         [rule](const DatanodeInfoProto &dn) {
-                           return !rule->IsBadNode(dn.id().datanodeuuid());
-                         });
-
-  if (it == datanodes.end()) {
-    handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
-    return;
-  }
-
-  DatanodeInfoProto *chosen_dn = &*it;
-
-  uint64_t offset_within_block = offset - block->offset();
-  uint64_t size_within_block = std::min<uint64_t>(
-      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
-
-  AsyncReadBlock<RemoteBlockReaderTrait>(
-      fs_->rpc_engine().client_name(), *block, *chosen_dn, offset_within_block,
-      asio::buffer(buffers, size_within_block), handler);
-}
-
-template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncReadBlock(
-    const std::string &client_name,
-    const hadoop::hdfs::LocatedBlockProto &block,
-    const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
-    const MutableBufferSequence &buffers, const Handler &handler) {
-  typedef typename BlockReaderTrait::Reader Reader;
-  auto m =
-      BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);
-  auto &s = m->state();
-  size_t size = asio::buffer_size(buffers);
-  m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
-                                            &block.b(), size, offset))
-      .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>(
-          s.reader(), buffers, s.transferred()));
-  const std::string &dnid = dn.id().datanodeuuid();
-  m->Run([handler, dnid](const Status &status,
-                         const typename BlockReaderTrait::State &state) {
-    handler(status, dnid, *state.transferred());
-  });
-}
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
index 71e28ac..0dcae29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -16,5 +16,5 @@
 # limitations under the License.
 #
 
-add_library(reader remote_block_reader.cc datatransfer.cc)
+add_library(reader block_reader.cc datatransfer.cc)
 add_dependencies(reader proto)


Mime
View raw message