hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject [12/38] incubator-hawq git commit: HAWQ-582. Handle ApplicationMasterNotRegisteredException in libyarn
Date Thu, 31 Mar 2016 00:24:05 GMT
HAWQ-582. Handle ApplicationMasterNotRegisteredException in libyarn


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7ba95a86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7ba95a86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7ba95a86

Branch: refs/heads/HAWQ-546
Commit: 7ba95a86b03b530b7f9c3bdf04c970740ff9801f
Parents: 959048e
Author: Wen Lin <wlin@pivotal.io>
Authored: Thu Mar 24 14:19:43 2016 +0800
Committer: Oleksandr Diachenko <odiachenko@pivotal.io>
Committed: Wed Mar 30 17:23:28 2016 -0700

----------------------------------------------------------------------
 depends/libyarn/src/CMakeLists.txt              |   2 +-
 depends/libyarn/src/common/Exception.cpp        |   3 +
 depends/libyarn/src/common/Exception.h          |  15 +-
 .../libyarn/src/libyarnclient/LibYarnClient.cpp | 218 +++++++++++--------
 .../libyarnserver/ApplicationMasterProtocol.cpp |  12 +-
 5 files changed, 145 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7ba95a86/depends/libyarn/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt
index 4278d6f..ba70739 100644
--- a/depends/libyarn/src/CMakeLists.txt
+++ b/depends/libyarn/src/CMakeLists.txt
@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 
 SET(libyarn_VERSION_MAJOR 0)
 SET(libyarn_VERSION_MINOR 1)
-SET(libyarn_VERSION_PATCH 13)
+SET(libyarn_VERSION_PATCH 14)
 SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}")
 SET(libyarn_VERSION_API 1)
 SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7ba95a86/depends/libyarn/src/common/Exception.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/Exception.cpp b/depends/libyarn/src/common/Exception.cpp
index 76699e8..09d1312 100644
--- a/depends/libyarn/src/common/Exception.cpp
+++ b/depends/libyarn/src/common/Exception.cpp
@@ -69,6 +69,9 @@ const char * YarnInvalidBlockToken::ReflexName =
 
 const char * SaslException::ReflexName = "javax.security.sasl.SaslException";
 
+const char * ApplicationMasterNotRegisteredException::ReflexName =
+    "org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException";
+
 YarnException::YarnException(const std::string & arg, const char * file,
                              int line, const char * stack) :
     std::runtime_error(arg) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7ba95a86/depends/libyarn/src/common/Exception.h
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/common/Exception.h b/depends/libyarn/src/common/Exception.h
index 2f0cca9..46af910 100644
--- a/depends/libyarn/src/common/Exception.h
+++ b/depends/libyarn/src/common/Exception.h
@@ -482,7 +482,7 @@ public:
 
 class ResourceManagerStandbyException: public YarnException {
 public:
-	ResourceManagerStandbyException(const std::string & arg, const char * file,
+    ResourceManagerStandbyException(const std::string & arg, const char * file,
                              int line, const char * stack) :
         YarnException(arg, file, line, stack) {
     }
@@ -494,7 +494,18 @@ public:
     static const char * ReflexName;
 };
 
+class ApplicationMasterNotRegisteredException: public YarnException {
+public:
+    ApplicationMasterNotRegisteredException(const std::string & arg, const char * file,
+                                            int line, const char * stack) :
+        YarnException(arg, file, line, stack) {
+    }
 
-}
+    ~ApplicationMasterNotRegisteredException() throw () {
+    }
 
+public:
+    static const char * ReflexName;
+};
+}
 #endif /* _YARN_LIBYARN_COMMON_EXCEPTION_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7ba95a86/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
index 87876e4..3c76f92 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
@@ -118,8 +118,17 @@ void* heartbeatFunc(void* args) {
 		try {
 			client->dummyAllocate();
 			failcounter = 0;
-		}
-		catch(const YarnException &e) {
+		} catch (const ApplicationMasterNotRegisteredException &e) {
+			/*
+			 * In case catch this exception,
+			 * heartbeat thread should exits, and re-register AM.
+			 */
+			LOG(WARNING, "LibYarnClient::heartbeatFunc, dummy allocation "
+						 "catch ApplicationMasterNotRegisteredException. %s",
+						 e.msg());
+			client->keepRun = false;
+			break;
+		} catch (const YarnException &e) {
 			LOG(WARNING, "LibYarnClient::heartbeatFunc, dummy allocation "
 						 "is not correctly executed with exception raised. %s",
 						 e.msg());
@@ -477,20 +486,20 @@ repeated NMTokenProto nm_tokens = 9;
 }
 */
 int LibYarnClient::allocateResources(string &jobId,
-			list<string> &blackListAdditions,
-			list<string> &blackListRemovals,
+            list<string> &blackListAdditions,
+            list<string> &blackListRemovals,
             list<Container> &allocatedContainers,
             int32_t num_containers) {
     try{
-    	AllocateResponse response;
-    	int retry = 5;
-    	int allocatedNumOnce = 0;
-    	int allocatedNumTotal = 0;
+        AllocateResponse response;
+        int retry = 5;
+        int allocatedNumOnce = 0;
+        int allocatedNumTotal = 0;
 
         pthread_mutex_lock(&heartbeatLock);
-		if (jobId != clientJobId) {
-			throw std::invalid_argument("The jobId is wrong, check the jobId argument");
-		}
+        if (jobId != clientJobId) {
+            throw std::invalid_argument("The jobId is wrong, check the jobId argument");
+        }
 
         if (!keepRun && needHeartbeatAlive) {
             throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped.");
@@ -509,89 +518,89 @@ int LibYarnClient::allocateResources(string &jobId,
 
         LOG(INFO,"LibYarnClient::allocate, ask: container number:%d,", num_containers);
 
-		while (retry > 0) {
-			LOG(INFO,"LibYarnClient::allocate with response id : %d", response_id);
-			AllocateResponse response = amrmClientAlias->allocate(this->askRequests, releasesBlank,
-								blacklistRequest, response_id, progress);
-			response_id = response.getResponseId();
-			LOG(INFO,"LibYarnClient::allocate returned response id : %d", response_id);
-			list<NMToken> nmTokens =  response.getNMTokens();
-			for (list<NMToken>::iterator it = nmTokens.begin(); it != nmTokens.end(); it++)
{
-				std::ostringstream oss;
-				oss << (*it).getNodeId().getHost() << ":" << (*it).getNodeId().getPort();
-				nmTokenCache[oss.str()] = (*it).getToken();
-			}
-			this->clearAskRequests();
-			list<Container> allocatedContainerOnce = response.getAllocatedContainers();
-			allocatedNumOnce = allocatedContainerOnce.size();
-			if (allocatedNumOnce <= 0) {
-				LOG(WARNING, "LibYarnClient:: fail to allocate from YARN RM, try again");
-				retry--;
-				if(retry == 0 && allocatedNumTotal == 0) {
-					/* If failed, just return to Resource Broker to handle*/
-					pthread_mutex_unlock(&heartbeatLock);
-					LOG(WARNING,"LibYarnClient:: fail to allocate from YARN RM after retry several times");
-					return FR_SUCCEEDED;
-				}
-			} else {
-				allocatedNumTotal += allocatedNumOnce;
-				allocatedContainerCache.insert(allocatedContainerCache.end(), allocatedContainerOnce.begin(),
allocatedContainerOnce.end());
-				LOG(INFO, "LibYarnClient:: allocate %d containers from YARN RM", allocatedNumOnce);
-				if (allocatedNumTotal >= num_containers) {
-					LOG(INFO, "LibYarnClient:: allocate enough containers from YARN RM, "
-							  "expected:%d, total:%d", num_containers, allocatedNumTotal);
-					break;
-				}
+        while (retry > 0) {
+            LOG(INFO,"LibYarnClient::allocate with response id : %d", response_id);
+            AllocateResponse response = amrmClientAlias->allocate(this->askRequests,
releasesBlank,
+                                blacklistRequest, response_id, progress);
+            response_id = response.getResponseId();
+            LOG(INFO,"LibYarnClient::allocate returned response id : %d", response_id);
+            list<NMToken> nmTokens =  response.getNMTokens();
+            for (list<NMToken>::iterator it = nmTokens.begin(); it != nmTokens.end();
it++) {
+                std::ostringstream oss;
+                oss << (*it).getNodeId().getHost() << ":" << (*it).getNodeId().getPort();
+                nmTokenCache[oss.str()] = (*it).getToken();
+            }
+            this->clearAskRequests();
+            list<Container> allocatedContainerOnce = response.getAllocatedContainers();
+            allocatedNumOnce = allocatedContainerOnce.size();
+            if (allocatedNumOnce <= 0) {
+                LOG(WARNING, "LibYarnClient:: fail to allocate from YARN RM, try again");
+                retry--;
+                if(retry == 0 && allocatedNumTotal == 0) {
+                    /* If failed, just return to Resource Broker to handle*/
+                    pthread_mutex_unlock(&heartbeatLock);
+                    LOG(WARNING,"LibYarnClient:: fail to allocate from YARN RM after retry
several times");
+                    return FR_SUCCEEDED;
+                }
+            } else {
+                allocatedNumTotal += allocatedNumOnce;
+                allocatedContainerCache.insert(allocatedContainerCache.end(), allocatedContainerOnce.begin(),
allocatedContainerOnce.end());
+                LOG(INFO, "LibYarnClient:: allocate %d containers from YARN RM", allocatedNumOnce);
+                if (allocatedNumTotal >= num_containers) {
+                    LOG(INFO, "LibYarnClient:: allocate enough containers from YARN RM, "
+                              "expected:%d, total:%d", num_containers, allocatedNumTotal);
+                    break;
+                }
 
-			}
-			usleep(TimeInterval::ALLOCATE_INTERVAL_MS);
+            }
+            usleep(TimeInterval::ALLOCATE_INTERVAL_MS);
         }
 
-		LOG(INFO,"LibYarnClient::allocate, ask: response_id:%d, allocated container number:%d",
-				 response_id, allocatedNumTotal);
+        LOG(INFO,"LibYarnClient::allocate, ask: response_id:%d, allocated container number:%d",
+                 response_id, allocatedNumTotal);
 
-		/* a workaround for allocate more container than request */
+        /* a workaround for allocate more container than request */
         list<ContainerId> releases;
         list<ContainerReport> afterContainerReports;
         afterContainerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId);
         for (list<ContainerReport>::iterator ait=afterContainerReports.begin();
-        		ait!=afterContainerReports.end(); ait++){
-        	bool foundInPre = false;
-        	for (list<ContainerReport>::iterator pit=preContainerReports.begin();pit!=preContainerReports.end();pit++){
-				if (pit->getId().getId() == ait->getId().getId()) {
-					foundInPre = true;
-					break;
-				}
-        	}
-        	if (!foundInPre){
-        		bool foundInNewAllocated = false;
-        		for (list<Container>::iterator cit = allocatedContainerCache.begin();cit!=allocatedContainerCache.end();cit++){
-        			if(cit->getId().getId() == ait->getId().getId()){
-        				foundInNewAllocated = true;
-        				break;
-        			}
-        		}
-        		if (!foundInNewAllocated){
-        			releases.push_back((*ait).getId());
-        		}
-        	}
+                ait!=afterContainerReports.end(); ait++){
+            bool foundInPre = false;
+            for (list<ContainerReport>::iterator pit=preContainerReports.begin();pit!=preContainerReports.end();pit++){
+                if (pit->getId().getId() == ait->getId().getId()) {
+                    foundInPre = true;
+                    break;
+                }
+            }
+            if (!foundInPre){
+                bool foundInNewAllocated = false;
+                for (list<Container>::iterator cit = allocatedContainerCache.begin();cit!=allocatedContainerCache.end();cit++){
+                    if(cit->getId().getId() == ait->getId().getId()){
+                        foundInNewAllocated = true;
+                        break;
+                    }
+                }
+                if (!foundInNewAllocated){
+                    releases.push_back((*ait).getId());
+                }
+            }
         }
 
         int totalNeedRelease = allocatedContainerCache.size() - num_containers;
         LOG(INFO,"LibYarnClient::allocateResources, ask: finished: total_allocated_containers:%ld,
total_need_release:%d",
                  allocatedContainerCache.size(), totalNeedRelease);
         if(totalNeedRelease > 0) {
-			for (int i = 0; i < totalNeedRelease; i++) {
-				list<Container>::iterator it = allocatedContainerCache.begin();
-				releases.push_back((*it).getId());
-				allocatedContainerCache.erase(it);
-			}
+            for (int i = 0; i < totalNeedRelease; i++) {
+                list<Container>::iterator it = allocatedContainerCache.begin();
+                releases.push_back((*it).getId());
+                allocatedContainerCache.erase(it);
+            }
 
-			list<ResourceRequest> asksBlank;
-			ResourceBlacklistRequest blacklistRequestBlank;
-			response = amrmClientAlias->allocate(asksBlank, releases,
-					blacklistRequestBlank, response_id, progress);
-			response_id = response.getResponseId();
+            list<ResourceRequest> asksBlank;
+            ResourceBlacklistRequest blacklistRequestBlank;
+            response = amrmClientAlias->allocate(asksBlank, releases,
+                    blacklistRequestBlank, response_id, progress);
+            response_id = response.getResponseId();
         }
 
         /* 3. store allocated containers */
@@ -609,21 +618,30 @@ int LibYarnClient::allocateResources(string &jobId,
 
         return FR_SUCCEEDED;
     } catch(std::exception &e) {
+        stringstream errorMsg;
 
-    	stringstream errorMsg;
-    	errorMsg << "LibYarnClient::allocateResources, catch exception:" << e.what();
+        errorMsg << "LibYarnClient::allocateResources, catch exception:" << e.what();
+        setErrorMessage(errorMsg.str());
+
+        pthread_mutex_unlock(&heartbeatLock);
+        return FR_FAILED;
+    } catch (const ApplicationMasterNotRegisteredException &e) {
+        stringstream errorMsg;
+
+        errorMsg << "LibYarnClient::allocateResources, "
+                    "catch ApplicationMasterNotRegisteredException." << e.what();
         setErrorMessage(errorMsg.str());
 
         pthread_mutex_unlock(&heartbeatLock);
         return FR_FAILED;
     } catch (...) {
+        stringstream errorMsg;
 
-    	stringstream errorMsg;
-    	errorMsg << "LibYarnClient::allocateResources, catch unexpected exception.";
-		setErrorMessage(errorMsg.str());
+        errorMsg << "LibYarnClient::allocateResources, catch unexpected exception.";
+        setErrorMessage(errorMsg.str());
 
-		pthread_mutex_unlock(&heartbeatLock);
-		return FR_FAILED;
+        pthread_mutex_unlock(&heartbeatLock);
+        return FR_FAILED;
     }
 }
 
@@ -815,24 +833,36 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus
finalStatus)
         LOG(INFO, "LibYarnClient::finishJob, finish AM for jobId:%s, finalStatus:%d", jobId.c_str(),
finalStatus);
         //free the Container* memory
         for (map<int64_t,Container*>::iterator it = jobIdContainers.begin(); it !=
jobIdContainers.end(); it++) {
-        	LOG(INFO,"LibYarnClient::finishJob, container:%ld in jobIdContainers are delete",it->second->getId().getId());
+            LOG(INFO,"LibYarnClient::finishJob, container:%ld in jobIdContainers are delete",it->second->getId().getId());
             delete it->second;
             it->second = NULL;
         }
         jobIdContainers.clear();
         activeFailContainerIds.clear();
+
         return FR_SUCCEEDED;
-    }
-    catch(std::exception& e){
+    } catch (std::exception& e) {
+        stringstream errorMsg;
+
+        errorMsg << "LibYarnClient::finishJob, catch the Exception:" << e.what();
+        setErrorMessage(errorMsg.str());
+
+        return FR_FAILED;
+    } catch (const ApplicationMasterNotRegisteredException &e) {
         stringstream errorMsg;
-        errorMsg << "LibYarnClient::finishJob, Catch the Exception:" << e.what();
+
+        errorMsg << "LibYarnClient::finishJob, "
+                    "catch ApplicationMasterNotRegisteredException." << e.what();
         setErrorMessage(errorMsg.str());
+
         return FR_FAILED;
     } catch (...) {
-    	stringstream errorMsg;
-		errorMsg << "LibYarnClient::finishJob, catch unexpected exception.";
-		setErrorMessage(errorMsg.str());
-		return FR_FAILED;
+        stringstream errorMsg;
+
+        errorMsg << "LibYarnClient::finishJob, catch unexpected exception.";
+        setErrorMessage(errorMsg.str());
+
+        return FR_FAILED;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7ba95a86/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
index 57cf1de..d8f3b58 100644
--- a/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
+++ b/depends/libyarn/src/libyarnserver/ApplicationMasterProtocol.cpp
@@ -62,9 +62,6 @@ RegisterApplicationMasterResponse ApplicationMasterProtocol::registerApplication
         return RegisterApplicationMasterResponse(responseProto);
     } catch (const YarnFailoverException & e) {
          throw;
-    } catch (const YarnRpcServerException & e) {
-        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
-        unwrapper.unwrap(__FILE__, __LINE__);
     } catch (...) {
         THROW(YarnIOException,
               "Unexpected exception: when calling "
@@ -83,10 +80,9 @@ AllocateResponse ApplicationMasterProtocol::allocate(AllocateRequest &request)
{
     } catch (const YarnFailoverException & e) {
          throw;
     } catch (const YarnRpcServerException & e) {
-        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        UnWrapper<ApplicationMasterNotRegisteredException, YarnIOException> unwrapper(e);
         unwrapper.unwrap(__FILE__, __LINE__);
-    }
-    catch (...) {
+    } catch (...) {
         THROW(YarnIOException,
               "Unexpected exception: when calling "
               "ApplicationMasterProtocol::allocate in %s: %d",
@@ -104,9 +100,9 @@ FinishApplicationMasterResponse ApplicationMasterProtocol::finishApplicationMast
     } catch (const YarnFailoverException & e) {
          throw;
     } catch (const YarnRpcServerException & e) {
-        UnWrapper<UnresolvedLinkException, YarnIOException> unwrapper(e);
+        UnWrapper<ApplicationMasterNotRegisteredException, YarnIOException> unwrapper(e);
         unwrapper.unwrap(__FILE__, __LINE__);
-    }	catch (...) {
+    } catch (...) {
         THROW(YarnIOException,
               "Unexpected exception: when calling "
               "ApplicationMasterProtocol::finishApplicationMaster in %s: %d",


Mime
View raw message