hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject incubator-hawq git commit: HAWQ-676. Apply fix for common codes to libyarn
Date Fri, 15 Apr 2016 02:09:23 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 856991354 -> 641176646


HAWQ-676. Apply fix for common codes to libyarn


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/64117664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/64117664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/64117664

Branch: refs/heads/master
Commit: 641176646831ac9674e77e2f552d573e2d83853e
Parents: 8569913
Author: Wen Lin <wlin@pivotal.io>
Authored: Fri Apr 15 10:06:18 2016 +0800
Committer: Wen Lin <wlin@pivotal.io>
Committed: Fri Apr 15 10:06:18 2016 +0800

----------------------------------------------------------------------
 depends/libyarn/CMake/Options.cmake             |  1 +
 depends/libyarn/src/CMakeLists.txt              |  2 +-
 .../libyarn/src/common/ExceptionInternal.cpp    | 54 ++++++++--------
 depends/libyarn/src/common/ExceptionInternal.h  | 65 +++++++++++++-------
 depends/libyarn/src/platform.h.in               |  3 +-
 depends/libyarn/src/rpc/RpcChannel.cpp          | 17 ++---
 depends/libyarn/src/rpc/RpcClient.cpp           |  7 ++-
 .../libyarn/test/function/TestLibYarnClient.cpp | 31 +++-------
 .../TestMockApplicationClientProtocol.cpp       |  4 +-
 9 files changed, 99 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/CMake/Options.cmake
----------------------------------------------------------------------
diff --git a/depends/libyarn/CMake/Options.cmake b/depends/libyarn/CMake/Options.cmake
index 5de057b..bcd3d94 100644
--- a/depends/libyarn/CMake/Options.cmake
+++ b/depends/libyarn/CMake/Options.cmake
@@ -22,6 +22,7 @@ OPTION(ENABLE_SSE "enable SSE4.2 buildin function" ON)
 OPTION(ENABLE_FRAME_POINTER "enable frame pointer on 64bit system with flag -fno-omit-frame-pointer,
on 32bit system, it is always enabled" ON)
 OPTION(ENABLE_LIBCPP "using libc++ instead of libstdc++, only valid for clang compiler" OFF)
 OPTION(ENABLE_BOOST "using boost instead of native compiler c++0x support" OFF)
+OPTION(STRERROR_R_RETURN_INT "checking strerror_r return type is int or not" ON)
 
 INCLUDE (CheckFunctionExists)
 CHECK_FUNCTION_EXISTS(dladdr HAVE_DLADDR)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index a1b2444..c2a3cff 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 
 SET(libyarn_VERSION_MAJOR 0)
 SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 15)
+SET(libyarn_VERSION_PATCH 16)
 SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
 SET(libyarn_VERSION_API 1)
 SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/common/ExceptionInternal.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/ExceptionInternal.cpp b/depends/libyarn/src/common/ExceptionInternal.cpp
index 40f94e4..f559214 100644
--- a/depends/libyarn/src/common/ExceptionInternal.cpp
+++ b/depends/libyarn/src/common/ExceptionInternal.cpp
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include "platform.h"
 
 #include "Exception.h"
 #include "ExceptionInternal.h"
@@ -40,24 +41,18 @@ bool CheckOperationCanceled() {
 }
 
 const char * GetSystemErrorInfo(int eno) {
-    static THREAD_LOCAL char buffer[64];
     static THREAD_LOCAL char message[64];
+    char buffer[64], *pbuffer;
+    pbuffer = buffer;
+#ifdef STRERROR_R_RETURN_INT
     strerror_r(eno, buffer, sizeof(buffer));
-    snprintf(message, sizeof(message), "(errno: %d) %s", eno, buffer);
+#else
+    pbuffer = strerror_r(eno, buffer, sizeof(buffer));
+#endif
+    snprintf(message, sizeof(message), "(errno: %d) %s", eno, pbuffer);
     return message;
 }
 
-static THREAD_LOCAL std::string * MessageBuffer = NULL;
-static THREAD_LOCAL once_flag once;
-
-static void CreateMessageBuffer() {
-    MessageBuffer = new std::string;
-}
-
-static void InitMessageBuffer() {
-    call_once(once, &CreateMessageBuffer);
-    assert(MessageBuffer != NULL);
-}
 
 static void GetExceptionDetailInternal(const Yarn::YarnException & e,
                                        std::stringstream & ss, bool topLevel);
@@ -104,25 +99,25 @@ static void GetExceptionDetailInternal(const Yarn::YarnException &
e,
     }
 }
 
-const char * GetExceptionDetail(const Yarn::YarnException & e) {
-    std::stringstream ss;
-    GetExceptionDetailInternal(e, ss, true);
-
-    try {
-        InitMessageBuffer();
-        *MessageBuffer = ss.str();
-    } catch (const std::bad_alloc & e) {
+const char * GetExceptionDetail(const Yarn::YarnException & e,
+                                std::string& buffer) {
+   try {
+        std::stringstream ss;
+        ss.imbue(std::locale::classic());
+        GetExceptionDetailInternal(e, ss, true);
+        buffer = ss.str();
+    } catch (const std::bad_alloc& e) {
         return "Out of memory";
     }
 
-    return MessageBuffer->c_str();
+    return buffer.c_str();
 }
 
-const char * GetExceptionDetail(const exception_ptr e) {
+const char * GetExceptionDetail(const exception_ptr e, std::string& buffer) {
     std::stringstream ss;
+    ss.imbue(std::locale::classic());
 
     try {
-        InitMessageBuffer();
         Yarn::rethrow_exception(e);
     } catch (const Yarn::YarnException & nested) {
         GetExceptionDetailInternal(nested, ss, true);
@@ -131,12 +126,12 @@ const char * GetExceptionDetail(const exception_ptr e) {
     }
 
     try {
-        *MessageBuffer = ss.str();
-    } catch (const std::bad_alloc & e) {
+        buffer = ss.str();
+    } catch (const std::bad_alloc& e) {
         return "Out of memory";
     }
 
-    return MessageBuffer->c_str();
+    return buffer.c_str();
 }
 
 static void GetExceptionMessage(const std::exception & e,
@@ -156,7 +151,7 @@ static void GetExceptionMessage(const std::exception & e,
     }
 
     try {
-    		Yarn::rethrow_if_nested(e);
+        Yarn::rethrow_if_nested(e);
     } catch (const std::exception & nested) {
         GetExceptionMessage(nested, ss, recursive + 1);
     }
@@ -164,9 +159,10 @@ static void GetExceptionMessage(const std::exception & e,
 
 const char * GetExceptionMessage(const exception_ptr e, std::string & buffer) {
     std::stringstream ss;
+    ss.imbue(std::locale::classic());
 
     try {
-    		Yarn::rethrow_exception(e);
+        Yarn::rethrow_exception(e);
     } catch (const std::bad_alloc & e) {
         return "Out of memory";
     } catch (const std::exception & e) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/common/ExceptionInternal.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/ExceptionInternal.h b/depends/libyarn/src/common/ExceptionInternal.h
index a45346d..930ee71 100644
--- a/depends/libyarn/src/common/ExceptionInternal.h
+++ b/depends/libyarn/src/common/ExceptionInternal.h
@@ -56,10 +56,29 @@ namespace Yarn {
 using boost::exception_ptr;
 using boost::rethrow_exception;
 using boost::current_exception;
+}
+
+#else
+#include <exception>
+#include <stdexcept>
+
+namespace Yarn {
+using std::rethrow_exception;
+using std::current_exception;
+using std::make_exception_ptr;
+using std::exception_ptr;
+}
+#endif  //  include headers
 
+#if defined(NEED_BOOST) || !defined(HAVE_NESTED_EXCEPTION)  //  define nested exception
+namespace Yarn {
+#ifdef NEED_BOOST
 class nested_exception : virtual public boost::exception {
+#else
+class nested_exception : virtual public std::exception {
+#endif
 public:
-    nested_exception() : p(boost::current_exception()) {
+    nested_exception() : p(current_exception()) {
     }
 
     nested_exception(const nested_exception & other) : p(other.p) {
@@ -73,14 +92,14 @@ public:
     virtual ~nested_exception() throw() {}
 
     void rethrow_nested() const {
-        boost::rethrow_exception(p);
+        rethrow_exception(p);
     }
 
-    boost::exception_ptr nested_ptr() const {
+    exception_ptr nested_ptr() const {
         return p;
     }
 protected:
-    boost::exception_ptr p;
+    exception_ptr p;
 };
 
 template<typename BaseType>
@@ -96,7 +115,11 @@ static inline void throw_with_nested(T const & e) {
         std::terminate();
     }
 
+#ifdef NEED_BOOST
     boost::throw_exception(ExceptionWrapper<T>(static_cast < T const & >(e)));
+#else
+    throw ExceptionWrapper<T>(static_cast < T const & >(e));
+#endif
 }
 
 template<typename T>
@@ -113,6 +136,16 @@ static inline void rethrow_if_nested(const nested_exception & e)
{
     e.rethrow_nested();
 }
 
+}  // namespace Yarn
+#else  //  not boost and have nested exception
+namespace Yarn {
+using std::throw_with_nested;
+using std::rethrow_if_nested;
+}  //  namespace Yarn
+#endif  //  define nested exception
+
+#ifdef NEED_BOOST
+namespace Yarn {
 namespace Internal {
 
 
@@ -150,24 +183,13 @@ void ThrowException(bool nested, const char * f, int l,
 
     throw std::logic_error("should not reach here.");
 }
-}
 
-}
+}  //  namespace Internal
+}  //  namespace Yarn
 
 #else
 
-#include <exception>
-#include <stdexcept>
-
 namespace Yarn {
-
-using std::rethrow_exception;
-using std::current_exception;
-using std::make_exception_ptr;
-using std::throw_with_nested;
-using std::rethrow_if_nested;
-using std::exception_ptr;
-
 namespace Internal {
 
 template<typename THROWABLE>
@@ -204,8 +226,8 @@ void ThrowException(bool nested, const char * f, int l,
     throw std::logic_error("should not reach here.");
 }
 
-}
-}
+}  //  namespace Internal
+}  //  namespace Yarn
 
 #endif
 
@@ -237,7 +259,8 @@ bool CheckOperationCanceled();
  * @param e The exception which detail message to be return.
  * @return The exception's detail message.
  */
-const char * GetExceptionDetail(const Yarn::YarnException & e);
+const char * GetExceptionDetail(const Yarn::YarnException & e,
+				std::string &buffer);
 
 /**
  * Get a exception's detail message.
@@ -245,7 +268,7 @@ const char * GetExceptionDetail(const Yarn::YarnException & e);
  * @param e The exception which detail message to be return.
  * @return The exception's detail message.
  */
-const char * GetExceptionDetail(const exception_ptr e);
+const char * GetExceptionDetail(const exception_ptr e, std::string &buffer);
 
 const char * GetExceptionMessage(const exception_ptr e, std::string & buffer);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/platform.h.in
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/platform.h.in b/depends/libyarn/src/platform.h.in
index 86d04e5..de2d294 100644
--- a/depends/libyarn/src/platform.h.in
+++ b/depends/libyarn/src/platform.h.in
@@ -30,7 +30,8 @@
 #cmakedefine ENABLE_FRAME_POINTER
 #cmakedefine HAVE_SYMBOLIZE
 #cmakedefine NEED_BOOST
-
+#cmakedefine STRERROR_R_RETURN_INT
+#cmakedefine HAVE_NESTED_EXCEPTION
 // defined by gcc
 #if defined(__ELF__) && defined(OS_LINUX)
 # define HAVE_SYMBOLIZE

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/rpc/RpcChannel.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/rpc/RpcChannel.cpp b/depends/libyarn/src/rpc/RpcChannel.cpp
index 81d11d1..814541c 100644
--- a/depends/libyarn/src/rpc/RpcChannel.cpp
+++ b/depends/libyarn/src/rpc/RpcChannel.cpp
@@ -248,6 +248,7 @@ void RpcChannelImpl::connect() {
     exception_ptr lastError;
     const RpcConfig & conf = key.getConf();
     const RpcServerInfo & server = key.getServer();
+    std::string buffer;
 
     for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) {
         RpcAuth auth = key.getAuth();
@@ -294,19 +295,19 @@ void RpcChannelImpl::connect() {
             lastError = current_exception();
             LOG(LOG_ERROR,
                 "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
-                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e));
+                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e,
buffer));
         } catch (const YarnNetworkException & e) {
             sleep = 1;
             lastError = current_exception();
             LOG(LOG_ERROR,
                 "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
-                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e));
+                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e,
buffer));
         } catch (const YarnTimeoutException & e) {
             sleep = 1;
             lastError = current_exception();
             LOG(LOG_ERROR,
                 "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
-                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e));
+                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e,
buffer));
         }
 
         if (i + 1 < conf.getMaxRetryOnConnect()) {
@@ -429,11 +430,12 @@ void RpcChannelImpl::invoke(const RpcCall & call) {
 
                 if (!retry && call.isIdempotent()) {
                     retry = true;
+                    std::string buffer;
                     LOG(LOG_ERROR,
                         "Failed to invoke RPC call \"%s\" on server \"%s:%s\": \n%s",
                         call.getName(), key.getServer().getHost().c_str(),
                         key.getServer().getPort().c_str(),
-                        GetExceptionDetail(lastError));
+                        GetExceptionDetail(lastError, buffer));
                     LOG(INFO,
                         "Retry idempotent RPC call \"%s\" on server \"%s:%s\"",
                         call.getName(), key.getServer().getHost().c_str(),
@@ -598,12 +600,13 @@ bool RpcChannelImpl::checkIdle() {
                 sendPing();
             }
         } catch (...) {
+            std::string buffer;
             LOG(LOG_ERROR,
                 "Failed to send ping via idle RPC channel to server \"%s:%s\": "
                 "\n%s",
                 key.getServer().getHost().c_str(),
                 key.getServer().getPort().c_str(),
-                GetExceptionDetail(current_exception()));
+                GetExceptionDetail(current_exception(), buffer));
             sock->close();
             return true;
         }
@@ -751,8 +754,8 @@ void RpcChannelImpl::readOneResponse(bool writeLock) {
     std::vector<char> buffer(128);
     hadoop::common::RpcResponseHeaderProto curRespHeader;
     hadoop::common::RpcResponseHeaderProto::RpcStatusProto status;
-    uint32_t totalen, headerSize = 0, bodySize = 0;
-    totalen = in->readBigEndianInt32(readTimeout);
+    uint32_t headerSize = 0, bodySize = 0;
+    in->readBigEndianInt32(readTimeout);
     /*
      * read response header
      */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/rpc/RpcClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/rpc/RpcClient.cpp b/depends/libyarn/src/rpc/RpcClient.cpp
index ae93edc..3dacaa0 100644
--- a/depends/libyarn/src/rpc/RpcClient.cpp
+++ b/depends/libyarn/src/rpc/RpcClient.cpp
@@ -95,8 +95,9 @@ void RpcClientImpl::clean() {
             }
         }
     } catch (const Yarn::YarnException & e) {
+        std::string buffer;
         LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s",
-            GetExceptionDetail(e));
+            GetExceptionDetail(e, buffer));
     } catch (const std::exception & e) {
         LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", e.what());
     }
@@ -146,8 +147,6 @@ RpcChannel & RpcClientImpl::getChannel(const RpcAuth & auth,
             allChannels[key] = rc;
         }
 
-        rc->addRef();
-
         if (!cleaning) {
             cleaning = true;
 
@@ -157,6 +156,8 @@ RpcChannel & RpcClientImpl::getChannel(const RpcAuth & auth,
 
             CREATE_THREAD(cleaner, bind(&RpcClientImpl::clean, this));
         }
+        // increase ref count after successfully done without any exception
+        rc->addRef();
     } catch (const YarnRpcException & e) {
         throw;
     } catch (...) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/test/function/TestLibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/test/function/TestLibYarnClient.cpp b/depends/libyarn/test/function/TestLibYarnClient.cpp
index 3043ea9..5260775 100644
--- a/depends/libyarn/test/function/TestLibYarnClient.cpp
+++ b/depends/libyarn/test/function/TestLibYarnClient.cpp
@@ -29,15 +29,16 @@ using namespace libyarn;
 class TestLibYarnClient: public ::testing::Test {
 public:
 	TestLibYarnClient(){
+		string user_name("postgres");
 		string rmHost("localhost");
-		string rmPort("9980");
+		string rmPort("8032");
 		string schedHost("localhost");
-		string schedPort("9981");
+		string schedPort("8030");
 		string amHost("localhost");
 		int32_t amPort = 0;
 		string am_tracking_url("url");
 		int heartbeatInterval = 1000;
-		client = new LibYarnClient(rmHost, rmPort, schedHost, schedPort, amHost, amPort, am_tracking_url,heartbeatInterval);
+		client = new LibYarnClient(user_name, rmHost, rmPort, schedHost, schedPort, amHost, amPort,
am_tracking_url, heartbeatInterval);
 	}
 	~TestLibYarnClient(){
 	}
@@ -47,33 +48,21 @@ protected:
 
 TEST_F(TestLibYarnClient,TestLibYarn){
 	string jobName("libyarn");
-	string queue("sample_queue");
+	string queue("default");
 	string jobId("");
 	int result = client->createJob(jobName, queue,jobId);
 	EXPECT_EQ(result,0);
 
-	ResourceRequest resRequest;
-	string host("*");
-	resRequest.setResourceName(host);
-	Resource capability;
-	capability.setVirtualCores(1);
-	capability.setMemory(1024);
-	resRequest.setCapability(capability);
-	resRequest.setNumContainers(3);
-	resRequest.setRelaxLocality(true);
-	Priority priority;
-	priority.setPriority(1);
-	resRequest.setPriority(priority);
 	list<string> blackListAdditions;
 	list<string> blackListRemovals;
 	list<Container> allocatedResourcesArray;
-	result = client->allocateResources(jobId, resRequest, blackListAdditions, blackListRemovals,allocatedResourcesArray,5);
+	result = client->allocateResources(jobId, blackListAdditions, blackListRemovals,allocatedResourcesArray,5);
 	EXPECT_EQ(result,0);
 
 	int allocatedResourceArraySize = allocatedResourcesArray.size();
-	int activeContainerIds[allocatedResourceArraySize];
-	int releaseContainerIds[allocatedResourceArraySize];
-	int statusContainerIds[allocatedResourceArraySize];
+	int64_t activeContainerIds[allocatedResourceArraySize];
+	int64_t releaseContainerIds[allocatedResourceArraySize];
+	int64_t statusContainerIds[allocatedResourceArraySize];
 	int i = 0;
 	for (list<Container>::iterator it = allocatedResourcesArray.begin();it != allocatedResourcesArray.end();it++){
 		activeContainerIds[i] = it->getId().getId();
@@ -86,7 +75,7 @@ TEST_F(TestLibYarnClient,TestLibYarn){
 
 	sleep(1);
 
-	set<int> activeFailIds;
+	set<int64_t> activeFailIds;
 	result = client->getActiveFailContainerIds(activeFailIds);
 	EXPECT_EQ(result,0);
 	EXPECT_EQ(activeFailIds.size(),0);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp b/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp
index 8acb039..24a110a 100644
--- a/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp
+++ b/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp
@@ -29,14 +29,14 @@ using std::string;
 class TestMockApplicationClientProtocol: public ::testing::Test {
 public:
 	TestMockApplicationClientProtocol(){
+		string user_name("postgres");
 		string rmHost("localhost");
 		string rmPort("8032");
 		string tokenService = "";
 		Yarn::Config config;
 		Yarn::Internal::SessionConfig sessionConfig(config);
 		Yarn::Internal::UserInfo user = Yarn::Internal::UserInfo::LocalUser();
-		Yarn::Internal::RpcAuth rpcAuth(user, Yarn::Internal::AuthMethod::SIMPLE);
-		protocol = new MockApplicationClientProtocol(rmHost,rmPort,tokenService, sessionConfig,rpcAuth);
+		protocol = new MockApplicationClientProtocol(user_name, rmHost,rmPort,tokenService, sessionConfig);
 }
 	~TestMockApplicationClientProtocol(){
 		delete protocol;


Mime
View raw message