hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject hadoop git commit: HDFS-9643. libhdfs++: Support async cancellation of read operations. Contributed by James Clampffer.
Date Sun, 24 Jan 2016 04:46:05 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 011e79c83 -> 6df167c85


HDFS-9643. libhdfs++: Support async cancellation of read operations. Contributed by James
Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6df167c8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6df167c8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6df167c8

Branch: refs/heads/HDFS-8707
Commit: 6df167c851dfd8af805b7423ae8f82a120e39f29
Parents: 011e79c
Author: James <jhc@apache.org>
Authored: Sat Jan 23 23:45:30 2016 -0500
Committer: James <jhc@apache.org>
Committed: Sat Jan 23 23:45:30 2016 -0500

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/hdfs_ext.h  |  10 +-
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |   6 +
 .../native/libhdfspp/include/hdfspp/status.h    |   3 +
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |   7 +
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../libhdfspp/lib/common/cancel_tracker.cc      |  37 ++++++
 .../libhdfspp/lib/common/cancel_tracker.h       |  40 ++++++
 .../lib/common/continuation/continuation.h      |  15 ++-
 .../lib/connection/datanodeconnection.cc        |   6 +
 .../lib/connection/datanodeconnection.h         |   3 +
 .../main/native/libhdfspp/lib/fs/filehandle.cc  |  35 ++++-
 .../main/native/libhdfspp/lib/fs/filehandle.h   |  12 ++
 .../native/libhdfspp/lib/reader/CMakeLists.txt  |   2 +-
 .../native/libhdfspp/lib/reader/block_reader.cc |  14 +-
 .../native/libhdfspp/lib/reader/block_reader.h  |  11 +-
 .../native/libhdfspp/lib/reader/datatransfer.h  |   2 +
 .../libhdfspp/lib/reader/datatransfer_impl.h    |   5 +
 .../native/libhdfspp/lib/reader/readergroup.cc  |  55 ++++++++
 .../native/libhdfspp/lib/reader/readergroup.h   |  52 ++++++++
 .../native/libhdfspp/tests/bad_datanode_test.cc |   8 ++
 .../libhdfspp/tests/remote_block_reader_test.cc | 128 ++++++++++++++++++-
 21 files changed, 433 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
index 8f4548d..3017fe1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
@@ -64,6 +64,15 @@ void hdfsGetLastError(char *buf, int len);
 
 
 /**
+ *  Cancels operations being made by the FileHandle.
+ *  Note: Cancel cannot be reversed.  This is intended
+ *  to be used before hdfsClose to avoid waiting for
+ *  operations to complete.
+ **/
+LIBHDFS_EXTERNAL
+int hdfsCancel(hdfsFS fs, hdfsFile file);
+
+/**
  * Create an HDFS builder, using the configuration XML files from the indicated
  * directory.  If the directory does not exist, or contains no configuration
  * XML files, a Builder using all default values will be returned.
@@ -99,6 +108,5 @@ int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
      */
 int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val);
 
-
 } /* end extern "C" */
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index effdecb..2cbb62c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -93,6 +93,12 @@ public:
   virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0;
 
   /**
+   * Cancel outstanding file operations.  This is not reversable, once called
+   * the handle should be disposed of.
+   **/
+  virtual void CancelOperations(void) = 0;
+
+  /**
    * Determine if a datanode should be excluded from future operations
    * based on the return Status.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
index 6b58799..89be771 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
@@ -47,6 +47,8 @@ class Status {
   { return Status(kException, expception_class_name, error_message); }
   static Status Error(const char *error_message)
   { return Exception("Exception", error_message); }
+  static Status Canceled()
+  { return Status(kOperationCanceled,""); }
 
   // Returns true iff the status indicates success.
   bool ok() const { return (state_ == NULL); }
@@ -64,6 +66,7 @@ class Status {
     kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
     kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
     kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
+    kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
     kException = 255,
   };
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 7798680..d5b5d6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -267,6 +267,13 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
   return offset;
 }
 
+int hdfsCancel(hdfsFS fs, hdfsFile file) {
+  if (!CheckSystemAndHandle(fs, file)) {
+    return -1;
+  }
+  static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
+  return 0;
+}
 
 /*******************************************************************
  *                BUILDER INTERFACE

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index c02e3db..0344a7c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc
configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc)
+add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc
configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc cancel_tracker.cc)
 add_library(common $<TARGET_OBJECTS:common_obj>)
 target_link_libraries(common ${LIB_DL})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc
new file mode 100644
index 0000000..0f60ed7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "cancel_tracker.h"
+
+namespace hdfs {
+
+CancelTracker::CancelTracker() : canceled_(false) {}
+
+std::shared_ptr<CancelTracker> CancelTracker::New() {
+  return std::make_shared<CancelTracker>();
+}
+
+bool CancelTracker::is_canceled() {
+  return canceled_;
+}
+
+void CancelTracker::set_canceled() {
+  canceled_ = true;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h
new file mode 100644
index 0000000..ba61926
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef COMMON_CANCELTRACKER_H
+#define COMMON_CANCELTRACKER_H
+
+#include <memory>
+#include <atomic>
+
+namespace hdfs {
+
+class CancelTracker : public std::enable_shared_from_this<CancelTracker> {
+ public:
+  CancelTracker();
+  static std::shared_ptr<CancelTracker> New();
+  void set_canceled();
+  bool is_canceled();
+ private:
+  std::atomic_bool canceled_;
+};
+
+typedef std::shared_ptr<CancelTracker> CancelHandle;
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
index 58a1b46..4c9b8ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
@@ -19,6 +19,7 @@
 #define LIB_COMMON_CONTINUATION_CONTINUATION_H_
 
 #include "hdfspp/status.h"
+#include "common/cancel_tracker.h"
 
 #include <functional>
 #include <memory>
@@ -81,6 +82,9 @@ template <class State> class Pipeline {
 public:
   typedef std::function<void(const Status &, const State &)> UserHandler;
   static Pipeline *Create() { return new Pipeline(); }
+  static Pipeline *Create(CancelHandle cancel_handle) {
+    return new Pipeline(cancel_handle);
+  }
   Pipeline &Push(Continuation *stage);
   void Run(UserHandler &&handler);
   State &state() { return state_; }
@@ -91,9 +95,11 @@ private:
   size_t stage_;
   std::function<void(const Status &, const State &)> handler_;
 
-  Pipeline() : stage_(0) {}
+  Pipeline() : stage_(0), cancel_handle_(CancelTracker::New()) {}
+  Pipeline(CancelHandle cancel_handle) : stage_(0), cancel_handle_(cancel_handle) {}
   ~Pipeline() = default;
   void Schedule(const Status &status);
+  CancelHandle cancel_handle_;
 };
 
 template <class State>
@@ -104,7 +110,12 @@ inline Pipeline<State> &Pipeline<State>::Push(Continuation
*stage) {
 
 template <class State>
 inline void Pipeline<State>::Schedule(const Status &status) {
-  if (!status.ok() || stage_ >= routines_.size()) {
+  // catch cancelation signalled from outside of pipeline
+  if(cancel_handle_->is_canceled()) {
+    handler_(Status::Canceled(), state_);
+    routines_.clear();
+    delete this;
+  } else if (!status.ok() || stage_ >= routines_.size()) {
     handler_(status, state_);
     routines_.clear();
     delete this;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
index b053e7f..247c75e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -53,5 +53,11 @@ void DataNodeConnectionImpl::Connect(
             handler(ToStatus(ec), shared_this); });
 }
 
+void DataNodeConnectionImpl::Cancel() {
+  // best to do a shutdown() first for portability
+  conn_->shutdown(asio::ip::tcp::socket::shutdown_both);
+  conn_->close();
+}
+
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
index d5bbb92..8f64110 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -33,6 +33,7 @@ public:
 
     virtual ~DataNodeConnection();
     virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection>
dn)> handler) = 0;
+    virtual void Cancel() = 0;
 };
 
 
@@ -48,6 +49,8 @@ public:
 
   void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection>
dn)> handler) override;
 
+  void Cancel() override;
+
   void async_read_some(const MutableBuffers &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) override {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
index 758fe5c..de9ccca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -34,13 +34,17 @@ FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string
                                  const std::shared_ptr<const struct FileInfo> file_info,
                                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
     : io_service_(io_service), client_name_(client_name), file_info_(file_info),
-      bad_node_tracker_(bad_data_nodes), offset_(0) {
+      bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New())
{
 }
 
 void FileHandleImpl::PositionRead(
     void *buf, size_t nbyte, uint64_t offset,
-    const std::function<void(const Status &, size_t)>
-        &handler) {
+    const std::function<void(const Status &, size_t)> &handler) {
+  /* prevent usage after cancelation */
+  if(cancel_state_->is_canceled()) {
+    handler(Status::Canceled(), 0);
+    return;
+  }
 
   auto callback = [this, handler](const Status &status,
                                   const std::string &contacted_datanode,
@@ -90,6 +94,10 @@ Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
 }
 
 Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
+  if(cancel_state_->is_canceled()) {
+    return Status::Canceled();
+  }
+
   off_t new_offset = -1;
 
   switch (whence) {
@@ -138,6 +146,11 @@ void FileHandleImpl::AsyncPreadSome(
   using ::hadoop::hdfs::DatanodeInfoProto;
   using ::hadoop::hdfs::LocatedBlockProto;
 
+  if(cancel_state_->is_canceled()) {
+    handler(Status::Canceled(), "", 0);
+    return;
+  }
+
   /**
    *  Note: block and chosen_dn will end up pointing to things inside
    *  the blocks_ vector.  They shouldn't be directly deleted.
@@ -210,7 +223,9 @@ void FileHandleImpl::AsyncPreadSome(
 std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions
&options,
                                                std::shared_ptr<DataNodeConnection>
dn)
 {
-  return std::make_shared<BlockReaderImpl>(options, dn);
+  std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options,
dn, cancel_state_);
+  readers_.AddReader(reader);
+  return reader;
 }
 
 std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
@@ -220,6 +235,17 @@ std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
   return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
 }
 
+void FileHandleImpl::CancelOperations() {
+  cancel_state_->set_canceled();
+
+  /* Push update to BlockReaders that may be hung in an asio call */
+  std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders();
+  for(auto reader : live_readers) {
+    reader->CancelOperation();
+  }
+}
+
+
 bool FileHandle::ShouldExclude(const Status &s) {
   if (s.ok()) {
     return false;
@@ -228,6 +254,7 @@ bool FileHandle::ShouldExclude(const Status &s) {
   switch (s.code()) {
     /* client side resource exhaustion */
     case Status::kResourceUnavailable:
+    case Status::kOperationCanceled:
       return false;
     case Status::kInvalidArgument:
     case Status::kUnimplemented:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
index 95b5869..8c03b37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -20,7 +20,9 @@
 
 #include "common/hdfs_public_api.h"
 #include "common/async_stream.h"
+#include "common/cancel_tracker.h"
 #include "reader/fileinfo.h"
+#include "reader/readergroup.h"
 
 #include "asio.hpp"
 #include "bad_datanode_tracker.h"
@@ -94,6 +96,14 @@ public:
                       const std::function<void(const Status &status,
                       const std::string &dn_id, size_t bytes_read)> handler);
 
+
+  /**
+   *  Cancels all operations instantiated from this FileHandle.
+   *  Will set a flag to abort continuation pipelines when they try to move to the next step.
+   *  Closes TCP connections to Datanode in order to abort pipelines waiting on slow IO.
+   **/
+  virtual void CancelOperations(void) override;
+
 protected:
   virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
                                                  std::shared_ptr<DataNodeConnection>
dn);
@@ -108,6 +118,8 @@ private:
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
   bool CheckSeekBounds(ssize_t desired_position);
   off_t offset_;
+  CancelHandle cancel_state_;
+  ReaderGroup readers_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
index d4bb837..2bcfd92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -16,6 +16,6 @@
 # limitations under the License.
 #
 
-add_library(reader_obj OBJECT block_reader.cc datatransfer.cc)
+add_library(reader_obj OBJECT block_reader.cc datatransfer.cc readergroup.cc)
 add_dependencies(reader_obj proto)
 add_library(reader $<TARGET_OBJECTS:reader_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
index a4e21de..594aaf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -22,6 +22,7 @@
 
 #include <future>
 
+
 namespace hdfs {
 
 hadoop::hdfs::OpReadBlockProto
@@ -65,7 +66,7 @@ void BlockReaderImpl::AsyncRequestBlock(
     hadoop::hdfs::BlockOpResponseProto response;
   };
 
-  auto m = continuation::Pipeline<State>::Create();
+  auto m = continuation::Pipeline<State>::Create(cancel_state_);
   State *s = &m->state();
 
   s->header.insert(s->header.begin(),
@@ -287,7 +288,7 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
     }
 
     auto m =
-        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
     m->state().set_status(parent_->options_.verify_checksum
                               ? hadoop::hdfs::Status::CHECKSUM_OK
                               : hadoop::hdfs::Status::SUCCESS);
@@ -316,7 +317,7 @@ void BlockReaderImpl::AsyncReadPacket(
   struct State {
     std::shared_ptr<size_t> bytes_transferred;
   };
-  auto m = continuation::Pipeline<State>::Create();
+  auto m = continuation::Pipeline<State>::Create(cancel_state_);
   m->state().bytes_transferred = std::make_shared<size_t>(0);
 
   m->Push(new ReadPacketHeader(this))
@@ -415,7 +416,7 @@ void BlockReaderImpl::AsyncReadBlock(
     const MutableBuffers &buffers,
     const std::function<void(const Status &, size_t)> handler) {
 
-  auto m = continuation::Pipeline<size_t>::Create();
+  auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
   size_t * bytesTransferred = &m->state();
 
   size_t size = asio::buffer_size(buffers);
@@ -430,4 +431,9 @@ void BlockReaderImpl::AsyncReadBlock(
   });
 }
 
+void BlockReaderImpl::CancelOperation() {
+  /* just forward cancel to DNConnection */
+  dn_->Cancel();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
index 7984c7e..3d5ece4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -20,6 +20,7 @@
 
 #include "hdfspp/status.h"
 #include "common/async_stream.h"
+#include "common/cancel_tracker.h"
 #include "datatransfer.pb.h"
 #include "connection/datanodeconnection.h"
 
@@ -82,14 +83,17 @@ public:
     uint64_t length,
     uint64_t offset,
     const std::function<void(Status)> &handler) = 0;
+
+  virtual void CancelOperation() = 0;
 };
 
 class BlockReaderImpl
     : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
 public:
-  explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection>
dn)
+  explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection>
dn,
+                           CancelHandle cancel_state)
       : dn_(dn), state_(kOpen), options_(options),
-        chunk_padding_bytes_(0) {}
+        chunk_padding_bytes_(0), cancel_state_(cancel_state) {}
 
   virtual void AsyncReadPacket(
     const MutableBuffers &buffers,
@@ -108,6 +112,8 @@ public:
     const MutableBuffers &buffers,
     const std::function<void(const Status &, size_t)> handler) override;
 
+  virtual void CancelOperation() override;
+
   size_t ReadPacket(const MutableBuffers &buffers, Status *status);
 
   Status RequestBlock(
@@ -143,6 +149,7 @@ private:
   int chunk_padding_bytes_;
   long long bytes_to_read_;
   std::vector<char> checksum_;
+  CancelHandle cancel_state_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
index 8be9ef8..93103c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -58,6 +58,8 @@ public:
 
   void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection>
dn)> handler) override
   {(void)handler;  /*TODO: Handshaking goes here*/};
+
+  void Cancel();
 private:
   DataTransferSaslStream(const DataTransferSaslStream &) = delete;
   DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
index 3ca16e9..e2cd790 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
@@ -126,6 +126,11 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next)
{
   m->Run([next](const Status &status, const State &) { next(status); });
 }
 
+template <class Stream>
+void DataTransferSaslStream<Stream>::Cancel() {
+  /* implement with secured reads */
+}
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
new file mode 100644
index 0000000..a64800a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "readergroup.h"
+
+#include <algorithm>
+
+namespace hdfs {
+
+void ReaderGroup::AddReader(std::shared_ptr<BlockReader> reader) {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+  ClearDeadReaders();
+  std::weak_ptr<BlockReader> weak_ref = reader;
+  readers_.push_back(weak_ref);
+}
+
+std::vector<std::shared_ptr<BlockReader>> ReaderGroup::GetLiveReaders() {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+
+  std::vector<std::shared_ptr<BlockReader>> live_readers;
+  for(auto it=readers_.begin(); it != readers_.end(); it++) {
+    std::shared_ptr<BlockReader> live_reader = it->lock();
+    if(live_reader) {
+      live_readers.push_back(live_reader);
+    }
+  }
+  return live_readers;
+}
+
+void ReaderGroup::ClearDeadReaders() {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+
+  auto reader_is_dead = [](const std::weak_ptr<BlockReader> &ptr) {
+    return ptr.expired();
+  };
+
+  auto it = std::remove_if(readers_.begin(), readers_.end(), reader_is_dead);
+  readers_.erase(it, readers_.end());
+}
+
+} // end namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
new file mode 100644
index 0000000..e6173f7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef READER_READER_GROUP_H_
+#define READER_READER_GROUP_H_
+
+#include "block_reader.h"
+
+#include <memory>
+#include <vector>
+#include <mutex>
+
+namespace hdfs {
+
+/**
+ * Provide a way of logically grouping ephemeral block readers
+ * so that their status can be monitored or changed.
+ *
+ * Note: This does not attempt to extend the reader life
+ * cycle.  Readers are assumed to be owned by something else
+ * using a shared_ptr.
+ **/
+
+class ReaderGroup {
+ public:
+  ReaderGroup() {};
+  void AddReader(std::shared_ptr<BlockReader> reader);
+  /* find live readers, promote to shared_ptr */
+  std::vector<std::shared_ptr<BlockReader>> GetLiveReaders();
+ private:
+  /* remove weak_ptrs that don't point to live object */
+  void ClearDeadReaders();
+  std::recursive_mutex state_lock_;
+  std::vector<std::weak_ptr<BlockReader>> readers_;
+};
+
+} // end namespace hdfs
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index 771d85f..4741817 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -55,6 +55,10 @@ public:
     size_t offset,
     const MutableBuffers &buffers,
     const std::function<void(const Status &, size_t)> handler));
+
+  virtual void CancelOperation() override {
+    /* no-op, declared pure virtual */
+  }
 };
 
 class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection>
{
@@ -75,6 +79,10 @@ class MockDNConnection : public DataNodeConnection, public std::enable_shared_fr
       (void)buf;
       handler(asio::error::fault, 0);
   }
+
+  virtual void Cancel() override {
+    /* no-op, declared pure virtual */
+  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
index 7d926ba..80127f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -20,6 +20,7 @@
 
 #include "datatransfer.pb.h"
 #include "common/util.h"
+#include "common/cancel_tracker.h"
 #include "reader/block_reader.h"
 #include "reader/datatransfer.h"
 #include "reader/fileinfo.h"
@@ -29,6 +30,8 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include <iostream>
+
 using namespace hdfs;
 
 using ::hadoop::common::TokenProto;
@@ -58,14 +61,18 @@ namespace hdfs {
 class MockDNConnection : public MockConnectionBase, public DataNodeConnection{
 public:
   MockDNConnection(::asio::io_service &io_service)
-      : MockConnectionBase(&io_service) {}
+      : MockConnectionBase(&io_service), OnRead([](){}) {}
   MOCK_METHOD0(Produce, ProducerResult());
 
   MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection>
dn)>));
 
+  /* event handler to trigger side effects */
+  std::function<void(void)> OnRead;
+
   void async_read_some(const MutableBuffers &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) override {
+      this->OnRead();
       this->MockConnectionBase::async_read_some(buf, handler);
   }
 
@@ -74,6 +81,10 @@ public:
                                  std::size_t bytes_transferred) > handler) override {
     this->MockConnectionBase::async_write_some(buf, handler);
   }
+
+  void Cancel() {
+    /* no-op, declared pure virtual */
+  }
 };
 
 // Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we
@@ -81,7 +92,7 @@ public:
 class PartialMockReader : public BlockReaderImpl {
 public:
   PartialMockReader() :
-    BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {};
+    BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>(), CancelTracker::New())
{};
 
   MOCK_METHOD2(
       AsyncReadPacket,
@@ -221,9 +232,9 @@ template <class Stream = MockDNConnection, class Handler>
 static std::shared_ptr<BlockReaderImpl>
 ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block,
             uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
-            const Handler &handler) {
+            const Handler &handler, CancelHandle cancel_handle = CancelTracker::New())
{
   BlockReaderOptions options;
-  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn, cancel_handle);
   Status result;
   reader->AsyncRequestBlock("libhdfs++", &block, length, offset,
                         [buf, reader, handler](const Status &stat) {
@@ -268,6 +279,59 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   ASSERT_TRUE(done);
 }
 
+/* used for cancelation tests, global to avoid cluttering capture lists */
+CancelHandle packet_canceller;
+
+TEST(RemoteBlockReaderTest, TestCancelWhileReceiving) {
+  packet_canceller = CancelTracker::New();
+
+  static const size_t kChunkSize = 512;
+  static const string kChunkData(kChunkSize, 'a');
+  ::asio::io_service io_service;
+  auto conn = std::make_shared<MockDNConnection>(io_service);
+  BlockOpResponseProto block_op_resp;
+
+  /**
+   * async_read would normally get called 5 times here; once for each
+   * continuation in the pipeline.  Cancel will be triggered on the
+   * fourth call to catch the pipeline mid-execution.
+   **/
+  int call_count = 0;
+  int trigger_at_count = 4;
+  auto cancel_trigger = [&call_count, &trigger_at_count]() {
+    call_count += 1;
+    std::cout << "read called " << call_count << " times" << std::endl;
+    if(call_count == trigger_at_count)
+       packet_canceller->set_canceled();
+  };
+
+  conn->OnRead = cancel_trigger;
+
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+  EXPECT_CALL(*conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  bool done = false;
+  std::string data(kChunkSize, 0);
+  ReadContent(conn, block, kChunkSize, 0,
+              buffer(const_cast<char *>(data.c_str()), data.size()),
+              [&data, &io_service, &done](const Status &stat, size_t transferred)
{
+                ASSERT_EQ(stat.code(), Status::kOperationCanceled);
+                ASSERT_EQ(0, transferred);
+                done = true;
+                io_service.stop();
+              }, packet_canceller);
+
+  io_service.run();
+  ASSERT_TRUE(done);
+}
+
 TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   static const size_t kChunkSize = 1024;
   static const size_t kLength = kChunkSize / 4 * 3;
@@ -332,7 +396,7 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   string data(kChunkSize, 0);
   mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
   BlockReaderOptions options;
-  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn, CancelTracker::New());
   Status result;
   reader->AsyncRequestBlock(
       "libhdfs++", &block, data.size(), 0,
@@ -358,6 +422,60 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   io_service.run();
 }
 
+TEST(RemoteBlockReaderTest, TestReadCancelBetweenPackets) {
+  packet_canceller = CancelTracker::New();
+
+  static const size_t kChunkSize = 1024;
+  static const string kChunkData(kChunkSize, 'a');
+
+  ::asio::io_service io_service;
+  auto conn = std::make_shared<MockDNConnection>(io_service);
+  BlockOpResponseProto block_op_resp;
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+  EXPECT_CALL(*conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)));
+      /* the second AsyncReadPacket should never attempt to read */
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  string data(kChunkSize, 0);
+  mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
+  BlockReaderOptions options;
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn, packet_canceller);
+  Status result;
+  reader->AsyncRequestBlock(
+      "libhdfs++", &block, data.size(), 0,
+      [buf, reader, &data, &io_service](const Status &stat) {
+        ASSERT_TRUE(stat.ok());
+        reader->AsyncReadPacket(
+            buf, [buf, reader, &data, &io_service](const Status &stat, size_t
transferred) {
+              ASSERT_TRUE(stat.ok());
+              ASSERT_EQ(kChunkSize, transferred);
+              ASSERT_EQ(kChunkData, data);
+              data.clear();
+              data.resize(kChunkSize);
+              transferred = 0;
+
+              /* Cancel the operation.*/
+              packet_canceller->set_canceled();
+
+              reader->AsyncReadPacket(
+                  buf, [&data,&io_service](const Status &stat, size_t transferred)
{
+                    ASSERT_EQ(stat.code(), Status::kOperationCanceled);
+                    ASSERT_EQ(0, transferred);
+                    io_service.stop();
+                  });
+            });
+      });
+  io_service.run();
+}
+
+
 TEST(RemoteBlockReaderTest, TestSaslConnection) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');


Mime
View raw message