hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject hadoop git commit: HDFS-10311: DatanodeConnection::Cancel should not delete the underlying socket. Contributed by James Clampffer
Date Fri, 22 Apr 2016 02:08:13 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 d8653c8dc -> 4ece444fc


HDFS-10311: DatanodeConnection::Cancel should not delete the underlying socket.  Contributed
by James Clampffer


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ece444f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ece444f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ece444f

Branch: refs/heads/HDFS-8707
Commit: 4ece444fc4f9fd3e253a79b490d97d4746e4e765
Parents: d8653c8
Author: James <jhc@apache.org>
Authored: Thu Apr 21 22:07:26 2016 -0400
Committer: James <jhc@apache.org>
Committed: Thu Apr 21 22:07:26 2016 -0400

----------------------------------------------------------------------
 .../main/native/libhdfspp/lib/common/util.cc    | 31 +++++++++++++++
 .../src/main/native/libhdfspp/lib/common/util.h | 13 ++++++-
 .../lib/connection/datanodeconnection.cc        |  7 +++-
 .../lib/connection/datanodeconnection.h         | 41 +++++++-------------
 4 files changed, 64 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ece444f/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
index cc65ec2..32359f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
@@ -19,6 +19,7 @@
 #include "common/util.h"
 
 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <exception>
 
 namespace hdfs {
 
@@ -64,4 +65,34 @@ std::string GetRandomClientName() {
   return ss.str();
 }
 
+std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
+  std::string err;
+  if(sock && sock->is_open()) {
+    /**
+     *  Even though we just checked that the socket is open it's possible
+     *  it isn't in a state where it can properly send or receive.  If that's
+     *  the case asio will turn the underlying error codes from shutdown()
+     *  and close() into unhelpfully named std::exceptions.  Due to the
+     *  relatively innocuous nature of most of these error codes it's better
+     *  to just catch and return a flag so the caller can log failure.
+     **/
+
+    try {
+      sock->shutdown(asio::ip::tcp::socket::shutdown_both);
+    } catch (const std::exception &e) {
+      err = std::string("shutdown() threw") + e.what();
+    }
+
+    try {
+      sock->close();
+    } catch (const std::exception &e) {
+      // don't append if shutdown() already failed, first failure is the useful one
+      if(err.empty())
+        err = std::string("close() threw") + e.what();
+    }
+
+  }
+  return err;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ece444f/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
index 870ce2e..a6616c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
@@ -21,16 +21,22 @@
 #include "hdfspp/status.h"
 
 #include <sstream>
+#include <mutex>
+#include <string>
 
 #include <asio/error_code.hpp>
 #include <openssl/rand.h>
 
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
-
+#include <asio.hpp>
 
 namespace hdfs {
 
+// typedefs based on code that's repeated everywhere
+typedef std::lock_guard<std::mutex> mutex_guard;
+
+
 static inline Status ToStatus(const ::asio::error_code &ec) {
   if (ec) {
     return Status(ec.value(), ec.message().c_str());
@@ -71,6 +77,11 @@ bool lock_held(T & mutex) {
   return result;
 }
 
+// Shutdown and close a socket safely; will check if the socket is open and
+// catch anything thrown by asio.
+// Returns a string containing error message on failure, otherwise an empty string.
+std::string SafeDisconnect(asio::ip::tcp::socket *sock);
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ece444f/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
index be36fce..acc80c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -47,6 +47,7 @@ DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
 void DataNodeConnectionImpl::Connect(
              std::function<void(Status status, std::shared_ptr<DataNodeConnection>
dn)> handler) {
   // Keep the DN from being freed until we're done
+  mutex_guard state_lock(state_lock_);
   auto shared_this = shared_from_this();
   asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(),
           [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint,
1>::iterator it) {
@@ -55,7 +56,11 @@ void DataNodeConnectionImpl::Connect(
 }
 
 void DataNodeConnectionImpl::Cancel() {
-  conn_.reset();
+  mutex_guard state_lock(state_lock_);
+  std::string err = SafeDisconnect(conn_.get());
+  if(!err.empty()) {
+    LOG_WARN(kBlockReader, << "Error disconnecting socket in DataNodeConnectionImpl::Cancel,
" << err);
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ece444f/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
index 96f2659..aa193f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -23,11 +23,10 @@
 #include "ClientNamenodeProtocol.pb.h"
 #include "common/libhdfs_events_impl.h"
 #include "common/logging.h"
+#include "common/util.h"
 
 #include "asio.hpp"
 
-#include <exception>
-
 namespace hdfs {
 
 class DataNodeConnection : public AsyncStream {
@@ -43,31 +42,19 @@ public:
 
 struct SocketDeleter {
   inline void operator()(asio::ip::tcp::socket *sock) {
-    if(sock->is_open()) {
-      /**
-       *  Even though we just checked that the socket is open it's possible
-       *  it isn't in a state where it can properly send or receive.  If that's
-       *  the case asio will turn the underlying error codes from shutdown()
-       *  and close() into unhelpfully named std::exceptions.  Due to the
-       *  relatively innocuous nature of most of these error codes it's better
-       *  to just catch, give a warning, and move on with life.
-       **/
-      try {
-        sock->shutdown(asio::ip::tcp::socket::shutdown_both);
-      } catch (const std::exception &e) {
-        LOG_WARN(kBlockReader, << "Error calling socket->shutdown");
-      }
-      try {
-        sock->close();
-      } catch (const std::exception &e) {
-        LOG_WARN(kBlockReader, << "Error calling socket->close");
-      }
+    // Cancel may have already closed the socket.
+    std::string err = SafeDisconnect(sock);
+    if(!err.empty()) {
+        LOG_WARN(kBlockReader, << "Error disconnecting socket: " << err);
     }
     delete sock;
   }
 };
 
 class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
+private:
+  // held (briefly) while posting async ops to the asio task queue
+  std::mutex state_lock_;
 public:
   std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
   std::array<asio::ip::tcp::endpoint, 1> endpoints_;
@@ -84,19 +71,21 @@ public:
   void Cancel() override;
 
   void async_read_some(const MutableBuffers &buf,
-        std::function<void (const asio::error_code & error,
-                               std::size_t bytes_transferred) > handler) override {
+                         std::function<void (const asio::error_code & error, std::size_t
bytes_transferred) > handler)
+                       override {
     event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
 
+
+    mutex_guard state_lock(state_lock_);
     conn_->async_read_some(buf, handler);
   };
 
   void async_write_some(const ConstBuffers &buf,
-            std::function<void (const asio::error_code & error,
-                                 std::size_t bytes_transferred) > handler) override {
-
+                          std::function<void (const asio::error_code & error, std::size_t
bytes_transferred) > handler)
+                        override {
     event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
 
+    mutex_guard state_lock(state_lock_);
     conn_->async_write_some(buf, handler);
   }
 };


Mime
View raw message