Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6CA9A10138 for ; Tue, 8 Dec 2015 08:57:43 +0000 (UTC) Received: (qmail 83237 invoked by uid 500); 8 Dec 2015 08:57:43 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 83191 invoked by uid 500); 8 Dec 2015 08:57:43 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 83182 invoked by uid 99); 8 Dec 2015 08:57:43 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 08:57:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AE05AC5911 for ; Tue, 8 Dec 2015 08:57:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id hGBt3_SnUzXE for ; Tue, 8 Dec 2015 08:57:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id B872120F4F for ; Tue, 8 Dec 2015 08:57:32 +0000 (UTC) Received: (qmail 70979 invoked by uid 99); 8 Dec 2015 08:50:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 08:50:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85578DFCCE; Tue, 8 Dec 2015 08:50:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wlin@apache.org To: commits@hawq.incubator.apache.org Message-Id: <8541186181fc4e1ca0001c46e1d2bda7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-hawq git commit: HAWQ-192. Update libyarn proto files and interfaces. Change container id from int32 to int64 Date: Tue, 8 Dec 2015 08:50:52 +0000 (UTC) Repository: incubator-hawq Updated Branches: refs/heads/master 9b2421ea2 -> a9344ab62 HAWQ-192. Update libyarn proto files and interfaces. Change container id from int32 to int64 Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/a9344ab6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/a9344ab6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/a9344ab6 Branch: refs/heads/master Commit: a9344ab62164b1665e3f5d70aa41bac3a2b5da00 Parents: 9b2421e Author: Wen Lin Authored: Tue Dec 8 16:56:29 2015 +0800 Committer: Wen Lin Committed: Tue Dec 8 16:56:29 2015 +0800 ---------------------------------------------------------------------- depends/libyarn/sample/c_client_main.c | 18 +- depends/libyarn/src/CMakeLists.txt | 2 +- .../src/libyarnclient/ContainerManagement.cpp | 2 +- .../libyarn/src/libyarnclient/LibYarnClient.cpp | 57 +++---- .../libyarn/src/libyarnclient/LibYarnClient.h | 12 +- .../src/libyarnclient/LibYarnClientC.cpp | 24 +-- .../libyarn/src/libyarnclient/LibYarnClientC.h | 14 +- .../proto/YARN_applicationclient_protocol.proto | 11 ++ .../libyarn/src/proto/YARN_yarn_protos.proto | 168 +++++++++++++++---- .../src/proto/YARN_yarn_service_protos.proto | 162 +++++++++++++++++- .../protocolrecords/GetContainersRequest.cpp | 4 +- .../protocolrecords/GetContainersResponse.cpp | 6 +- depends/libyarn/src/records/ContainerId.cpp | 4 +- depends/libyarn/src/records/ContainerId.h | 6 +- depends/libyarn/src/records/ContainerReport.cpp | 32 ++-- depends/libyarn/src/records/ContainerReport.h | 4 +- .../resourcemanager/include/resourcepool.h | 10 +- .../resourcebroker/resourcebroker_API.c | 2 +- .../resourcebroker/resourcebroker_LIBYARN.c | 11 +- .../resourcebroker_LIBYARN_proc.c | 86 +++++----- src/backend/resourcemanager/resourcepool.c | 32 ++-- 21 files changed, 472 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/sample/c_client_main.c ---------------------------------------------------------------------- diff --git a/depends/libyarn/sample/c_client_main.c b/depends/libyarn/sample/c_client_main.c index bf00dab..4425426 100644 --- a/depends/libyarn/sample/c_client_main.c +++ b/depends/libyarn/sample/c_client_main.c @@ -42,7 +42,7 @@ int main() { //1. createJob char *jobName = "libyarn"; - char *queue = "sample_queue"; + char *queue = "default"; char *jobId = NULL; result = createJob(client, jobName, queue,&jobId); printf("1. createJob, jobid:%s The createJob Result Code:%d\n", jobId,result); @@ -67,13 +67,13 @@ int main() { printf("2. allocateResources, errorMessage:%s\n",errorMessage); } - int32_t activeContainerIds[allocatedResourceArraySize]; - int32_t releaseContainerIds[allocatedResourceArraySize]; - int32_t statusContainerIds[allocatedResourceArraySize]; + int64_t activeContainerIds[allocatedResourceArraySize]; + int64_t releaseContainerIds[allocatedResourceArraySize]; + int64_t statusContainerIds[allocatedResourceArraySize]; printf("2. allocateResources, allocatedResourceArraySize:%d\n", allocatedResourceArraySize); for (i = 0 ; i < allocatedResourceArraySize; i++) { puts("----------------------------"); - printf("allocatedResourcesArray[i].containerId:%d\n", allocatedResourcesArray[i].containerId); + printf("allocatedResourcesArray[i].containerId:%ld\n", allocatedResourcesArray[i].containerId); activeContainerIds[i] = allocatedResourcesArray[i].containerId; releaseContainerIds[i] = allocatedResourcesArray[i].containerId; statusContainerIds[i] = allocatedResourcesArray[i].containerId; @@ -94,12 +94,12 @@ int main() { sleep(10); - int *activeFailIds; + int64_t *activeFailIds; int activeFailSize; result = getActiveFailContainerIds(client,&activeFailIds,&activeFailSize); printf("Active Fail Container Size:%d\n",activeFailSize); for (i = 0;i < activeFailSize;i++){ - printf("Active Fail Container Id:%d\n",activeFailIds[i]); + printf("Active Fail Container Id:%ld\n",activeFailIds[i]); } //4. getContainerReport @@ -116,7 +116,7 @@ int main() { printf("containerReportArraySize=%d\n", containerReportArraySize); for (i = 0; i < containerReportArraySize; i++) { printf("-------------container: %d--------------------------\n", i); - printf("containerId:%d\n", containerReportArray[i].containerId); + printf("containerId:%ld\n", containerReportArray[i].containerId); printf("vCores:%d\n", containerReportArray[i].vCores); printf("memory:%d\n", containerReportArray[i].memory); printf("host:%s\n", containerReportArray[i].host); @@ -141,7 +141,7 @@ int main() { printf("containerStatusArraySize=%d\n", containerStatusArraySize); for (i = 0; i < containerStatusArraySize; i++) { printf("-------------container: %d--------------------------\n", i); - printf("containerId:%d\n", containerStatusArray[i].containerId); + printf("containerId:%ld\n", containerStatusArray[i].containerId); printf("exitStatus:%d\n", containerStatusArray[i].exitStatus); printf("state:%d\n", containerStatusArray[i].state); printf("diagnostics:%s\n", containerStatusArray[i].diagnostics); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt index c08ba1f..e9657cc 100644 --- a/depends/libyarn/src/CMakeLists.txt +++ b/depends/libyarn/src/CMakeLists.txt @@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) SET(libyarn_VERSION_MAJOR 0) SET(libyarn_VERSION_MINOR 1) -SET(libyarn_VERSION_PATCH 7) +SET(libyarn_VERSION_PATCH 9) SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}") SET(libyarn_VERSION_API 1) SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/libyarnclient/ContainerManagement.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/ContainerManagement.cpp b/depends/libyarn/src/libyarnclient/ContainerManagement.cpp index 4ab86cc..ee4b6f0 100644 --- a/depends/libyarn/src/libyarnclient/ContainerManagement.cpp +++ b/depends/libyarn/src/libyarnclient/ContainerManagement.cpp @@ -102,7 +102,7 @@ StartContainerResponse ContainerManagement::startContainer(Container &container, scResponse.setServicesMetaData(scsResponse.getServicesMetaData()); LOG(INFO, - "ContainerManagement::startContainer, after start a container, id:%d on NM [%s:%s]", + "ContainerManagement::startContainer, after start a container, id:%ld on NM [%s:%s]", container.getId().getId(), host.c_str(), port.c_str()); //3. free http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/libyarnclient/LibYarnClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp index ff1dd98..3fb7e5b 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp +++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp @@ -41,7 +41,7 @@ LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort, int32_t amPort, string &am_tracking_url,int heartbeatInterval) : amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(amHost), amPort(amPort), am_tracking_url(am_tracking_url), - heartbeatInterval(heartbeatInterval),clientJobId(""),response_id(0), + heartbeatInterval(heartbeatInterval),response_id(0),clientJobId(""), keepRun(false){ pthread_mutex_init( &(heartbeatLock), NULL ); @@ -272,20 +272,20 @@ int LibYarnClient::forceKillJob(string &jobId) { throw std::invalid_argument("The jobId is wrong, please check the jobId argument"); } - for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { + for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { ostringstream key; Container *container = it->second; key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort(); Token nmToken = nmTokenCache[key.str()]; ((ContainerManagement*)nmClient)->stopContainer((*container), nmToken); - LOG(INFO,"LibYarnClient::forceKillJob, container:%d is stopped",container->getId().getId()); + LOG(INFO,"LibYarnClient::forceKillJob, container:%ld is stopped",container->getId().getId()); } ((ApplicationClient*) appClient)->forceKillApplication(clientAppId); LOG(INFO, "LibYarnClient::forceKillJob, forceKillApplication"); - for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { - LOG(INFO,"LibYarnClient::forceKillJob, container:%d in jobIdContainers is deleted",it->second->getId().getId()); + for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { + LOG(INFO,"LibYarnClient::forceKillJob, container:%ld in jobIdContainers is deleted",it->second->getId().getId()); delete it->second; it->second = NULL; } @@ -536,7 +536,7 @@ int LibYarnClient::allocateResources(string &jobId, usleep(TimeInterval::ALLOCATE_INTERVAL_MS); } - LOG(INFO,"LibYarnClient::allocate, ask: response_id:%d, allocated container number:%ld", + LOG(INFO,"LibYarnClient::allocate, ask: response_id:%d, allocated container number:%d", response_id, allocatedNumTotal); /* a workaround for allocate more container than request */ @@ -586,7 +586,7 @@ int LibYarnClient::allocateResources(string &jobId, /* 3. store allocated containers */ for(list::iterator it = allocatedContainerCache.begin();it != allocatedContainerCache.end();it++){ Container *container = new Container((*it)); - int containerId = container->getId().getId(); + int64_t containerId = container->getId().getId(); jobIdContainers[containerId] = container; } allocatedContainers = allocatedContainerCache; @@ -616,7 +616,7 @@ int LibYarnClient::allocateResources(string &jobId, } } -int LibYarnClient::releaseResources(string &jobId,int releaseContainerIds[],int releaseContainerSize) { +int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[],int releaseContainerSize) { try{ pthread_mutex_lock(&heartbeatLock); if (jobId != clientJobId) { @@ -628,8 +628,8 @@ int LibYarnClient::releaseResources(string &jobId,int releaseContainerIds[],int //2) releases list releases; for (int i = 0;i < releaseContainerSize;i++){ - int containerId = releaseContainerIds[i]; - map::iterator it = jobIdContainers.find(containerId); + int64_t containerId = releaseContainerIds[i]; + map::iterator it = jobIdContainers.find(containerId); if (it != jobIdContainers.end()) { releases.push_back((it->second)->getId()); } @@ -645,17 +645,17 @@ int LibYarnClient::releaseResources(string &jobId,int releaseContainerIds[],int response_id = response.getResponseId(); //erase from the map jobIdContainers for (list::iterator it = releases.begin();it != releases.end();it++){ - LOG(INFO, "LibYarnClient::releaseResource, released ContainerId:%d",it->getId()); - map::iterator cit = jobIdContainers.find(it->getId()); + LOG(INFO, "LibYarnClient::releaseResource, released ContainerId:%ld",it->getId()); + map::iterator cit = jobIdContainers.find(it->getId()); if (cit != jobIdContainers.end()){ delete cit->second; cit->second = NULL; jobIdContainers.erase(it->getId()); } //erase the element if in activeFailContainers - set::iterator sit = activeFailContainerIds.find(it->getId()); + set::iterator sit = activeFailContainerIds.find(it->getId()); if(sit != activeFailContainerIds.end()){ - LOG(INFO, "LibYarnClient::releaseResource, remove %d from activeFailContainerIds",(*sit)); + LOG(INFO, "LibYarnClient::releaseResource, remove %ld from activeFailContainerIds",(*sit)); activeFailContainerIds.erase(*sit); } } @@ -700,7 +700,7 @@ message StartContainersResponseProto { repeated ContainerExceptionMapProto failed_requests = 3; } */ -int LibYarnClient::activeResources(string &jobId,int activeContainerIds[],int activeContainerSize) { +int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],int activeContainerSize) { try{ if (jobId != clientJobId) { throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); @@ -708,8 +708,8 @@ int LibYarnClient::activeResources(string &jobId,int activeContainerIds[],int ac LOG(INFO, "LibYarnClient::activeResources, activeResources started"); for (int i = 0; i < activeContainerSize; i++){ - int containerId = activeContainerIds[i]; - map::iterator it = jobIdContainers.find(containerId); + int64_t containerId = activeContainerIds[i]; + map::iterator it = jobIdContainers.find(containerId); if (it != jobIdContainers.end()) { try{ Container *container = it->second; @@ -726,9 +726,10 @@ int LibYarnClient::activeResources(string &jobId,int activeContainerIds[],int ac request.setContainerLaunchCtx(ctx); Token cToken = container->getContainerToken(); request.setContainerToken(cToken); + LOG(INFO, "LibYarnClient::activeResources active containerId:%ld", containerId); ((ContainerManagement*)nmClient)->startContainer((*container), request, nmToken); - }catch(std::exception& e){ - LOG(INFO, "LibYarnClient::activeResources, activeResources Failed Id:%d,exception:%s",containerId,e.what()); + } catch(std::exception& e){ + LOG(INFO, "LibYarnClient::activeResources, activeResources Failed Id:%ld,exception:%s",containerId,e.what()); activeFailContainerIds.insert(containerId); } } @@ -748,7 +749,7 @@ int LibYarnClient::activeResources(string &jobId,int activeContainerIds[],int ac return FR_FAILED; } } -int LibYarnClient::getActiveFailContainerIds(set &activeFailIds){ +int LibYarnClient::getActiveFailContainerIds(set &activeFailIds){ activeFailIds = activeFailContainerIds; return FR_SUCCEEDED; } @@ -777,13 +778,13 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) } //1. we should stop all containers related with this job //ContainerManagement cmgmt; - for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { + for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { ostringstream key; Container *container = it->second; key << container->getNodeId().getHost() << ":" << container->getNodeId().getPort(); Token nmToken = nmTokenCache[key.str()]; ((ContainerManagement*)nmClient)->stopContainer((*container), nmToken); - LOG(INFO,"LibYarnClient::finishJob, container:%d are stopped",container->getId().getId()); + LOG(INFO,"LibYarnClient::finishJob, container:%ld is stopped",container->getId().getId()); } LOG(INFO,"LibYarnClient::finishJob, all containers for jobId:%s are stopped",jobId.c_str()); //2. finish AM @@ -792,8 +793,8 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) ((ApplicationMaster*) amrmClient)->finishApplicationMaster(diagnostics, tracking_url, finalStatus); LOG(INFO, "LibYarnClient::finishJob, finish AM for jobId:%s, finalStatus:%d", jobId.c_str(), finalStatus); //free the Container* memory - for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { - LOG(INFO,"LibYarnClient::finishJob, container:%d in jobIdContainers are delete",it->second->getId().getId()); + for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers.end(); it++) { + LOG(INFO,"LibYarnClient::finishJob, container:%ld in jobIdContainers are delete",it->second->getId().getId()); delete it->second; it->second = NULL; } @@ -864,7 +865,7 @@ int LibYarnClient::getContainerReports(string &jobId,list &cont } } -int LibYarnClient::getContainerStatuses(string &jobId,int32_t containerIds[],int containerSize, +int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int containerSize, list &containerStatues){ try { if (jobId != clientJobId) { @@ -872,8 +873,8 @@ int LibYarnClient::getContainerStatuses(string &jobId,int32_t containerIds[],int } for (int i = 0; i < containerSize; i++) { - int containerId = containerIds[i]; - map::iterator it = jobIdContainers.find(containerId); + int64_t containerId = containerIds[i]; + map::iterator it = jobIdContainers.find(containerId); if (it != jobIdContainers.end()) { try { Container *container = it->second; @@ -886,7 +887,7 @@ int LibYarnClient::getContainerStatuses(string &jobId,int32_t containerIds[],int containerStatues.push_back(containerStatus); } } catch (std::exception& e) { - LOG(INFO,"LibYarnClient::getContainerStatuses, getContainerStatuses Failed Id:%d,exception:%s",containerId, e.what()); + LOG(INFO,"LibYarnClient::getContainerStatuses, getContainerStatuses Failed Id:%ld,exception:%s",containerId, e.what()); } } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/libyarnclient/LibYarnClient.h ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.h b/depends/libyarn/src/libyarnclient/LibYarnClient.h index f2b276c..9ce51be 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClient.h +++ b/depends/libyarn/src/libyarnclient/LibYarnClient.h @@ -72,10 +72,10 @@ namespace libyarn { list &blackListAdditions, list &blackListRemovals, list &allocatedContainers, int32_t num_containers); - virtual int activeResources(string &jobId, int activeContainerIds[], + virtual int activeResources(string &jobId, int64_t activeContainerIds[], int activeContainerSize); - virtual int releaseResources(string &jobId, int releaseContainerIds[], + virtual int releaseResources(string &jobId, int64_t releaseContainerIds[], int releaseContainerSize); virtual int finishJob(string &jobId, FinalApplicationStatus finalStatus); @@ -85,7 +85,7 @@ namespace libyarn { virtual int getContainerReports(string &jobId, list &containerReports); - virtual int getContainerStatuses(string &jobId, int32_t containerIds[], + virtual int getContainerStatuses(string &jobId, int64_t containerIds[], int containerSize, list &containerStatues); virtual int getQueueInfo(string &queue, bool includeApps, bool includeChildQueues, @@ -93,7 +93,7 @@ namespace libyarn { virtual int getClusterNodes(list &states, list &nodeReports); - virtual int getActiveFailContainerIds(set &activeFailIds); + virtual int getActiveFailContainerIds(set &activeFailIds); friend void* heartbeatFunc(void* args); @@ -136,9 +136,9 @@ namespace libyarn { int32_t response_id; string clientJobId; - map jobIdContainers; + map jobIdContainers; map nmTokenCache; - set activeFailContainerIds; + set activeFailContainerIds; list askRequests; volatile bool keepRun; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp index 5ea3722..51971fd 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp +++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp @@ -84,11 +84,11 @@ extern "C" { allocatedContainers, num_containers); } - int activeResources(string &jobId,int activeContainerIds[],int activeContainerSize) { + int activeResources(string &jobId,int64_t activeContainerIds[],int activeContainerSize) { return client->activeResources(jobId, activeContainerIds,activeContainerSize); } - int releaseResources(string &jobId, int releaseContainerIds[],int releaseContainerSize) { + int releaseResources(string &jobId, int64_t releaseContainerIds[],int releaseContainerSize) { return client->releaseResources(jobId, releaseContainerIds,releaseContainerSize); } @@ -96,7 +96,7 @@ extern "C" { return client->finishJob(jobId, finalStatus); } - int getActiveFailContainerIds(set &activeFailIds){ + int getActiveFailContainerIds(set &activeFailIds){ return client->getActiveFailContainerIds(activeFailIds); } @@ -108,7 +108,7 @@ extern "C" { return client->getContainerReports(jobId, containerReports); } - int getContainerStatuses(string &jobId, int32_t containerIds[], + int getContainerStatuses(string &jobId, int64_t containerIds[], int containerSize, list &containerStatues) { return client->getContainerStatuses(jobId, containerIds, containerSize,containerStatues); } @@ -263,7 +263,7 @@ extern "C" { goto exit_err; preferredAllocatedSize = allocatedPreferredList.size(); - preferredAllocatedArray = (LibYarnResource_t *)malloc(sizeof(LibYarnResource_t) * preferredAllocatedSize); + preferredAllocatedArray = (LibYarnResource_t *)(sizeof(LibYarnResource_t) * preferredAllocatedSize); if(preferredAllocatedArray == NULL) { setErrorMessage("LibYarnClientC::fail to allocate memory for resource array"); goto exit_err; @@ -341,7 +341,7 @@ exit_err: return FUNCTION_FAILED; } - int activeResources(LibYarnClient_t *client, char *jobId,int32_t activeContainerIds[],int activeContainerSize){ + int activeResources(LibYarnClient_t *client, char *jobId,int64_t activeContainerIds[],int activeContainerSize){ string jobIdStr(jobId); int result = client->activeResources(jobIdStr,activeContainerIds,activeContainerSize); if (result == FUNCTION_SUCCEEDED){ @@ -352,7 +352,7 @@ exit_err: } } - int releaseResources(LibYarnClient_t *client, char *jobId,int32_t releaseContainerIds[], int releaseContainerSize) { + int releaseResources(LibYarnClient_t *client, char *jobId,int64_t releaseContainerIds[], int releaseContainerSize) { string jobIdStr(jobId); int result = client->releaseResources(jobIdStr, releaseContainerIds, releaseContainerSize); if (result == FUNCTION_SUCCEEDED) { @@ -374,14 +374,14 @@ exit_err: } } - int getActiveFailContainerIds(LibYarnClient_t *client,int32_t *activeFailIds[],int *activeFailSize){ - set activeFails; + int getActiveFailContainerIds(LibYarnClient_t *client,int64_t *activeFailIds[],int *activeFailSize){ + set activeFails; int result = client->getActiveFailContainerIds(activeFails); if (result == FUNCTION_SUCCEEDED) { *activeFailSize = activeFails.size(); - *activeFailIds = (int *)malloc(sizeof(int)*(*activeFailSize)); + *activeFailIds = (int64_t *)malloc(sizeof(int64_t)*(*activeFailSize)); int i = 0; - for (set::iterator it = activeFails.begin(); it != activeFails.end(); it++) { + for (set::iterator it = activeFails.begin(); it != activeFails.end(); it++) { (*activeFailIds)[i] = (*it); i++; } @@ -445,7 +445,7 @@ exit_err: } } - int getContainerStatuses(LibYarnClient_t *client,char *jobId,int32_t containerIds[],int containerSize, + int getContainerStatuses(LibYarnClient_t *client,char *jobId,int64_t containerIds[],int containerSize, LibYarnContainerStatus_t **containerStatusesArray,int *containerStatusesArraySize){ string jobIdStr(jobId); list < ContainerStatus > containerStatuses; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/libyarnclient/LibYarnClientC.h ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.h b/depends/libyarn/src/libyarnclient/LibYarnClientC.h index 233c11c..1c9359b 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClientC.h +++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.h @@ -50,7 +50,7 @@ typedef struct LibYarnResourceRequest_t { } LibYarnResourceRequest_t; typedef struct LibYarnResource_t { - int32_t containerId; + int64_t containerId; char *host; int32_t port; char *nodeHttpAddress; @@ -73,7 +73,7 @@ typedef struct LibYarnApplicationReport_t { }LibYarnApplicationReport_t; typedef struct LibYarnContainerReport_t { - int32_t containerId; + int64_t containerId; int32_t vCores; int32_t memory; char *host; @@ -86,7 +86,7 @@ typedef struct LibYarnContainerReport_t { }LibYarnContainerReport_t; typedef struct LibYarnContainerStatus_t { - int32_t containerId; + int64_t containerId; enum ContainerState state; int32_t exitStatus; char *diagnostics; @@ -170,20 +170,20 @@ int allocateResources(LibYarnClient_t *client, char *jobId, LibYarnNodeInfo_t preferredHost[], int preferredHostSize, LibYarnResource_t **allocatedResourcesArray, int *allocatedResourceArraySize); -int activeResources(LibYarnClient_t *client, char *jobId,int32_t activeContainerIds[],int activeContainerSize); +int activeResources(LibYarnClient_t *client, char *jobId,int64_t activeContainerIds[],int activeContainerSize); -int releaseResources(LibYarnClient_t *client, char *jobId, int32_t releaseContainerIds[],int releaseContainerSize); +int releaseResources(LibYarnClient_t *client, char *jobId, int64_t releaseContainerIds[],int releaseContainerSize); int finishJob(LibYarnClient_t *client, char *jobId, enum FinalApplicationStatus_t finalStatus); -int getActiveFailContainerIds(LibYarnClient_t *client,int32_t *activeFailIds[],int *activeFailSize); +int getActiveFailContainerIds(LibYarnClient_t *client,int64_t *activeFailIds[],int *activeFailSize); int getApplicationReport(LibYarnClient_t *client,char *jobId,LibYarnApplicationReport_t **applicationReport); int getContainerReports(LibYarnClient_t *client, char *jobId, LibYarnContainerReport_t **containerReportArray,int *containerReportArraySize); -int getContainerStatuses(LibYarnClient_t *client,char *jobId,int32_t containerIds[],int containerSize, +int getContainerStatuses(LibYarnClient_t *client,char *jobId,int64_t containerIds[],int containerSize, LibYarnContainerStatus_t **containerStatusesArray,int *containerStatusesArraySize); int getQueueInfo(LibYarnClient_t *client, char *queue, bool includeApps, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/proto/YARN_applicationclient_protocol.proto ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/proto/YARN_applicationclient_protocol.proto b/depends/libyarn/src/proto/YARN_applicationclient_protocol.proto index 2c05889..a83bf76 100644 --- a/depends/libyarn/src/proto/YARN_applicationclient_protocol.proto +++ b/depends/libyarn/src/proto/YARN_applicationclient_protocol.proto @@ -44,5 +44,16 @@ service ApplicationClientProtocolService { rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto); rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto); rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto); + rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto); + rpc getApplicationAttemptReport (GetApplicationAttemptReportRequestProto) returns (GetApplicationAttemptReportResponseProto); + rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto); + rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto); + rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto); + rpc submitReservation (ReservationSubmissionRequestProto) returns (ReservationSubmissionResponseProto); + rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto); + rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto); + rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto); + rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto); + rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/proto/YARN_yarn_protos.proto ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/proto/YARN_yarn_protos.proto b/depends/libyarn/src/proto/YARN_yarn_protos.proto index 052b30b..bae8db8 100644 --- a/depends/libyarn/src/proto/YARN_yarn_protos.proto +++ b/depends/libyarn/src/proto/YARN_yarn_protos.proto @@ -50,7 +50,7 @@ message ApplicationAttemptIdProto { message ContainerIdProto { optional ApplicationIdProto app_id = 1; optional ApplicationAttemptIdProto app_attempt_id = 2; - optional int32 id = 3; + optional int64 id = 3; } message ResourceProto { @@ -58,6 +58,16 @@ message ResourceProto { optional int32 virtual_cores = 2; } +message ResourceOptionProto { + optional ResourceProto resource = 1; + optional int32 over_commit_timeout = 2; +} + +message NodeResourceMapProto { + optional NodeIdProto node_id = 1; + optional ResourceOptionProto resource_option = 2; +} + message PriorityProto { optional int32 priority = 1; } @@ -68,19 +78,6 @@ enum ContainerStateProto { C_COMPLETE = 3; } -enum ContainerExitStatusProto { - SUCCESS = 0; - ABORTED = -100; - DISKS_FAILED = -101; - PREEMPTED = -102; - KILLED_EXCEEDED_VMEM = -103; - KILLED_EXCEEDED_PMEM = -104; - KILLED_BY_APPMASTER = -105; - KILLED_BY_RESOURCEMANAGER = -106; - KILLED_AFTER_APP_COMPLETION = -107; - INVALID = -1000; -} - message ContainerProto { optional ContainerIdProto id = 1; optional NodeIdProto nodeId = 2; @@ -90,6 +87,20 @@ message ContainerProto { optional hadoop.common.TokenProto container_token = 6; } +message ContainerReportProto { + optional ContainerIdProto container_id = 1; + optional ResourceProto resource = 2; + optional NodeIdProto node_id = 3; + optional PriorityProto priority = 4; + optional int64 creation_time = 5; + optional int64 finish_time = 6; + optional string diagnostics_info = 7 [default = "N/A"]; + optional string log_url = 8; + optional int32 container_exit_status = 9; + optional ContainerStateProto container_state = 10; + optional string node_http_address = 11; +} + enum YarnApplicationStateProto { NEW = 1; NEW_SAVING = 2; @@ -101,6 +112,20 @@ enum YarnApplicationStateProto { KILLED = 8; } +enum YarnApplicationAttemptStateProto { + APP_ATTEMPT_NEW = 1; + APP_ATTEMPT_SUBMITTED = 2; + APP_ATTEMPT_SCHEDULED = 3; + APP_ATTEMPT_ALLOCATED_SAVING = 4; + APP_ATTEMPT_ALLOCATED = 5; + APP_ATTEMPT_LAUNCHED = 6; + APP_ATTEMPT_FAILED = 7; + APP_ATTEMPT_RUNNING = 8; + APP_ATTEMPT_FINISHING = 9; + APP_ATTEMPT_FINISHED = 10; + APP_ATTEMPT_KILLED = 11; +} + enum FinalApplicationStatusProto { APP_UNDEFINED = 0; APP_SUCCEEDED = 1; @@ -124,7 +149,7 @@ enum LocalResourceVisibilityProto { enum LocalResourceTypeProto { ARCHIVE = 1; - FILETYPE = 2; + FILE = 2; PATTERN = 3; } @@ -135,6 +160,7 @@ message LocalResourceProto { optional LocalResourceTypeProto type = 4; optional LocalResourceVisibilityProto visibility = 5; optional string pattern = 6; + optional bool should_be_uploaded_to_shared_cache = 7; } message ApplicationResourceUsageReportProto { @@ -143,6 +169,8 @@ message ApplicationResourceUsageReportProto { optional ResourceProto used_resources = 3; optional ResourceProto reserved_resources = 4; optional ResourceProto needed_resources = 5; + optional int64 memory_seconds = 6; + optional int64 vcore_seconds = 7; } message ApplicationReportProto { @@ -165,6 +193,18 @@ message ApplicationReportProto { optional float progress = 17; optional string applicationType = 18; optional hadoop.common.TokenProto am_rm_token = 19; + repeated string applicationTags = 20; +} + +message ApplicationAttemptReportProto { + optional ApplicationAttemptIdProto application_attempt_id = 1; + optional string host = 2; + optional int32 rpc_port = 3; + optional string tracking_url = 4; + optional string diagnostics = 5 [default = "N/A"]; + optional YarnApplicationAttemptStateProto yarn_application_attempt_state = 6; + optional ContainerIdProto am_container_id = 7; + optional string original_tracking_url = 8; } enum NodeStateProto { @@ -191,8 +231,18 @@ message NodeReportProto { optional NodeStateProto node_state = 7; optional string health_report = 8; optional int64 last_health_report_time = 9; + repeated string node_labels = 10; } +message NodeIdToLabelsProto { + optional NodeIdProto nodeId = 1; + repeated string nodeLabels = 2; +} + +message LabelsToNodeIdsProto { + optional string nodeLabels = 1; + repeated NodeIdProto nodeId = 2; +} //////////////////////////////////////////////////////////////////////// ////// From AM_RM_Protocol ///////////////////////////////////////////// @@ -203,6 +253,7 @@ message ResourceRequestProto { optional ResourceProto capability = 3; optional int32 num_containers = 4; optional bool relax_locality = 5 [default = true]; + optional string node_label_expression = 6; } enum AMCommandProto { @@ -251,6 +302,20 @@ message ApplicationSubmissionContextProto { optional int32 maxAppAttempts = 8 [default = 0]; optional ResourceProto resource = 9; optional string applicationType = 10 [default = "YARN"]; + optional bool keep_containers_across_application_attempts = 11 [default = false]; + repeated string applicationTags = 12; + optional int64 attempt_failures_validity_interval = 13 [default = -1]; + optional LogAggregationContextProto log_aggregation_context = 14; + optional ReservationIdProto reservation_id = 15; + optional string node_label_expression = 16; + optional ResourceRequestProto am_container_resource_request = 17; +} + +message LogAggregationContextProto { + optional string include_pattern = 1 [default = ".*"]; + optional string exclude_pattern = 2 [default = ""]; + optional string rolled_logs_include_pattern = 3 [default = ""]; + optional string rolled_logs_exclude_pattern = 4 [default = ".*"]; } enum ApplicationAccessTypeProto { @@ -272,19 +337,6 @@ enum QueueStateProto { Q_RUNNING = 2; } -message ContainerReportProto { - optional ContainerIdProto id = 1; - optional ResourceProto resource = 2; - optional NodeIdProto nodeId = 3; - optional PriorityProto priority = 4; - optional int64 startTime = 5; - optional int64 finishTime = 6; - optional ContainerExitStatusProto container_exit_status = 7; - optional ContainerStateProto state = 8; - optional string diagnostics = 9 [default = "N/A"]; - optional string logUrl = 10; -} - message QueueInfoProto { optional string queueName = 1; optional float capacity = 2; @@ -293,6 +345,8 @@ message QueueInfoProto { optional QueueStateProto state = 5; repeated QueueInfoProto childQueues = 6; repeated ApplicationReportProto applications = 7; + repeated string accessibleNodeLabels = 8; + optional string defaultNodeLabelExpression = 9; } enum QueueACLProto { @@ -306,6 +360,41 @@ message QueueUserACLInfoProto { } //////////////////////////////////////////////////////////////////////// +////// From reservation_protocol ///////////////////////////////////// +//////////////////////////////////////////////////////////////////////// + +message ReservationIdProto { + optional int64 id = 1; + optional int64 cluster_timestamp = 2; +} + +message ReservationRequestProto { + optional ResourceProto capability = 1; + optional int32 num_containers = 2 [default = 1]; + optional int32 concurrency = 3 [default = 1]; + optional int64 duration = 4 [default = -1]; +} + +message ReservationRequestsProto { + repeated ReservationRequestProto reservation_resources = 1; + optional ReservationRequestInterpreterProto interpreter = 2 [default = R_ALL]; +} + +message ReservationDefinitionProto { + optional ReservationRequestsProto reservation_requests = 1; + optional int64 arrival = 2; + optional int64 deadline = 3; + optional string reservation_name = 4; +} + +enum ReservationRequestInterpreterProto { + R_ANY = 0; + R_ALL = 1; + R_ORDER = 2; + R_ORDER_NO_GAP = 3; + } + +//////////////////////////////////////////////////////////////////////// ////// From container_manager ////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// @@ -325,6 +414,29 @@ message ContainerStatusProto { optional int32 exit_status = 4 [default = -1000]; } +enum ContainerExitStatusProto { + SUCCESS = 0; + INVALID = -1000; + ABORTED = -100; + DISKS_FAILED = -101; +} + +message ContainerResourceIncreaseRequestProto { + optional ContainerIdProto container_id = 1; + optional ResourceProto capability = 2; +} + +message ContainerResourceIncreaseProto { + optional ContainerIdProto container_id = 1; + optional ResourceProto capability = 2; + optional hadoop.common.TokenProto container_token = 3; +} + +message ContainerResourceDecreaseProto { + optional ContainerIdProto container_id = 1; + optional ResourceProto capability = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/proto/YARN_yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/proto/YARN_yarn_service_protos.proto b/depends/libyarn/src/proto/YARN_yarn_service_protos.proto index 11ca9ea..d250576 100644 --- a/depends/libyarn/src/proto/YARN_yarn_service_protos.proto +++ b/depends/libyarn/src/proto/YARN_yarn_service_protos.proto @@ -44,6 +44,10 @@ message RegisterApplicationMasterResponseProto { optional ResourceProto maximumCapability = 1; optional bytes client_to_am_token_master_key = 2; repeated ApplicationACLMapProto application_ACLs = 3; + repeated ContainerProto containers_from_previous_attempts = 4; + optional string queue = 5; + repeated NMTokenProto nm_tokens_from_previous_attempts = 6; + repeated SchedulerResourceTypes scheduler_resource_types = 7; } message FinishApplicationMasterRequestProto { @@ -62,6 +66,7 @@ message AllocateRequestProto { optional ResourceBlacklistRequestProto blacklist_request = 3; optional int32 response_id = 4; optional float progress = 5; + repeated ContainerResourceIncreaseRequestProto increase_request = 6; } message NMTokenProto { @@ -79,6 +84,14 @@ message AllocateResponseProto { optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; repeated NMTokenProto nm_tokens = 9; + repeated ContainerResourceIncreaseProto increased_containers = 10; + repeated ContainerResourceDecreaseProto decreased_containers = 11; + optional hadoop.common.TokenProto am_rm_token = 12; +} + +enum SchedulerResourceTypes { + MEMORY = 0; + CPU = 1; } ////////////////////////////////////////////////////// @@ -93,14 +106,6 @@ message GetNewApplicationResponseProto { optional ResourceProto maximumCapability = 2; } -message GetContainersRequestProto { - optional ApplicationAttemptIdProto app_attempt_id = 1; -} - -message GetContainersResponseProto { - repeated ContainerReportProto containers_reports = 1; -} - message GetApplicationReportRequestProto { optional ApplicationIdProto application_id = 1; } @@ -121,6 +126,7 @@ message KillApplicationRequestProto { } message KillApplicationResponseProto { + optional bool is_kill_completed = 1 [default = false]; } message GetClusterMetricsRequestProto { @@ -130,9 +136,32 @@ message GetClusterMetricsResponseProto { optional YarnClusterMetricsProto cluster_metrics = 1; } +message MoveApplicationAcrossQueuesRequestProto { + required ApplicationIdProto application_id = 1; + required string target_queue = 2; +} + +message MoveApplicationAcrossQueuesResponseProto { +} + +enum ApplicationsRequestScopeProto { + ALL = 0; + VIEWABLE = 1; + OWN = 2; +} + message GetApplicationsRequestProto { repeated string application_types = 1; repeated YarnApplicationStateProto application_states = 2; + repeated string users = 3; + repeated string queues = 4; + optional int64 limit = 5; + optional int64 start_begin = 6; + optional int64 start_end = 7; + optional int64 finish_begin = 8; + optional int64 finish_end = 9; + repeated string applicationTags = 10; + optional ApplicationsRequestScopeProto scope = 11 [default = ALL]; } message GetApplicationsResponseProto { @@ -165,6 +194,27 @@ message GetQueueUserAclsInfoResponseProto { repeated QueueUserACLInfoProto queueUserAcls = 1; } +message GetNodesToLabelsRequestProto { +} + +message GetNodesToLabelsResponseProto { + repeated NodeIdToLabelsProto nodeToLabels = 1; +} + +message GetLabelsToNodesRequestProto { + repeated string nodeLabels = 1; +} + +message GetLabelsToNodesResponseProto { + repeated LabelsToNodeIdsProto labelsToNodes = 1; +} + +message GetClusterNodeLabelsRequestProto { +} + +message GetClusterNodeLabelsResponseProto { + repeated string nodeLabels = 1; +} ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// @@ -227,3 +277,99 @@ message GetContainerStatusesResponseProto { repeated ContainerStatusProto status = 1; repeated ContainerExceptionMapProto failed_requests = 2; } + +////////////////////////////////////////////////////// +/////// Application_History_Protocol ///////////////// +////////////////////////////////////////////////////// + +message GetApplicationAttemptReportRequestProto { + optional ApplicationAttemptIdProto application_attempt_id = 1; +} + +message GetApplicationAttemptReportResponseProto { + optional ApplicationAttemptReportProto application_attempt_report = 1; +} + +message GetApplicationAttemptsRequestProto { + optional ApplicationIdProto application_id = 1; +} + +message GetApplicationAttemptsResponseProto { + repeated ApplicationAttemptReportProto application_attempts = 1; +} + +message GetContainerReportRequestProto { + optional ContainerIdProto container_id = 1; +} + +message GetContainerReportResponseProto { + optional ContainerReportProto container_report = 1; +} + +message GetContainersRequestProto { + optional ApplicationAttemptIdProto application_attempt_id = 1; +} + +message GetContainersResponseProto { + repeated ContainerReportProto containers = 1; +} + +////////////////////////////////////////////////////// +/////// client_SCM_Protocol ////////////////////////// +////////////////////////////////////////////////////// + +message UseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message UseSharedCacheResourceResponseProto { + optional string path = 1; +} + +message ReleaseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message ReleaseSharedCacheResourceResponseProto { +} + +////////////////////////////////////////////////////// +// reservation_protocol +////////////////////////////////////////////////////// + +message ReservationSubmissionRequestProto { + optional string queue = 1; + optional ReservationDefinitionProto reservation_definition = 2; +} + +message ReservationSubmissionResponseProto { + optional ReservationIdProto reservation_id = 1; +} + +message ReservationUpdateRequestProto { + optional ReservationDefinitionProto reservation_definition = 1; + optional ReservationIdProto reservation_id = 2; +} + +message ReservationUpdateResponseProto { +} + +message ReservationDeleteRequestProto { + optional ReservationIdProto reservation_id = 1; +} + +message ReservationDeleteResponseProto { +} + +////////////////////////////////////////////////////// +/////// SCM_Admin_Protocol ////////////////////////// +////////////////////////////////////////////////////// + +message RunSharedCacheCleanerTaskRequestProto { +} + +message RunSharedCacheCleanerTaskResponseProto { + optional bool accepted = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/protocolrecords/GetContainersRequest.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/protocolrecords/GetContainersRequest.cpp b/depends/libyarn/src/protocolrecords/GetContainersRequest.cpp index 128577b..4a6aa39 100644 --- a/depends/libyarn/src/protocolrecords/GetContainersRequest.cpp +++ b/depends/libyarn/src/protocolrecords/GetContainersRequest.cpp @@ -40,11 +40,11 @@ GetContainersRequestProto& GetContainersRequest::getProto() { void GetContainersRequest::setApplicationAttemptId(ApplicationAttemptId &appAttemptId) { ApplicationAttemptIdProto *proto = new ApplicationAttemptIdProto(); proto->CopyFrom(appAttemptId.getProto()); - requestProto.set_allocated_app_attempt_id(proto); + requestProto.set_allocated_application_attempt_id(proto); } ApplicationAttemptId GetContainersRequest::getApplicationAttemptId() { - return ApplicationAttemptId(requestProto.app_attempt_id()); + return ApplicationAttemptId(requestProto.application_attempt_id()); } } /* namespace libyarn */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/protocolrecords/GetContainersResponse.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/protocolrecords/GetContainersResponse.cpp b/depends/libyarn/src/protocolrecords/GetContainersResponse.cpp index 53881f9..b65e468 100644 --- a/depends/libyarn/src/protocolrecords/GetContainersResponse.cpp +++ b/depends/libyarn/src/protocolrecords/GetContainersResponse.cpp @@ -39,8 +39,8 @@ GetContainersResponseProto& GetContainersResponse::getProto() { list GetContainersResponse::getcontainersReportList() { list reportList; - for (int i = 0; i < responseProto.containers_reports_size(); i++) { - ContainerReportProto reportProto = responseProto.containers_reports(i); + for (int i = 0; i < responseProto.containers_size(); i++) { + ContainerReportProto reportProto = responseProto.containers(i); reportList.push_back(ContainerReport(reportProto)); } return reportList; @@ -50,7 +50,7 @@ void GetContainersResponse::setcontainersReportList( list &containersReport) { list::iterator it = containersReport.begin(); for (; it != containersReport.end(); it++) { - ContainerReportProto* reportProto = responseProto.add_containers_reports(); + ContainerReportProto* reportProto = responseProto.add_containers(); reportProto->CopyFrom((*it).getProto()); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/records/ContainerId.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/records/ContainerId.cpp b/depends/libyarn/src/records/ContainerId.cpp index fab48cc..6fabddc 100644 --- a/depends/libyarn/src/records/ContainerId.cpp +++ b/depends/libyarn/src/records/ContainerId.cpp @@ -55,11 +55,11 @@ ApplicationAttemptId ContainerId::getApplicationAttemptId() { return ApplicationAttemptId(containerIdProto.app_attempt_id()); } -void ContainerId::setId(int32_t id) { +void ContainerId::setId(int64_t id) { containerIdProto.set_id(id); } -int32_t ContainerId::getId() { +int64_t ContainerId::getId() { return containerIdProto.id(); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/records/ContainerId.h ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/records/ContainerId.h b/depends/libyarn/src/records/ContainerId.h index b92c8b3..ee89d2a 100644 --- a/depends/libyarn/src/records/ContainerId.h +++ b/depends/libyarn/src/records/ContainerId.h @@ -33,7 +33,7 @@ namespace libyarn { message ContainerIdProto { optional ApplicationIdProto app_id = 1; optional ApplicationAttemptIdProto app_attempt_id = 2; - optional int32 id = 3; + optional int64 id = 3; } */ @@ -51,8 +51,8 @@ public: void setApplicationAttemptId(ApplicationAttemptId &appAttemptId); ApplicationAttemptId getApplicationAttemptId(); - void setId(int32_t id); - int32_t getId(); + void setId(int64_t id); + int64_t getId(); private: ContainerIdProto containerIdProto; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/records/ContainerReport.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/records/ContainerReport.cpp b/depends/libyarn/src/records/ContainerReport.cpp index a13f038..9b4d7d2 100644 --- a/depends/libyarn/src/records/ContainerReport.cpp +++ b/depends/libyarn/src/records/ContainerReport.cpp @@ -38,11 +38,11 @@ ContainerReportProto& ContainerReport::getProto(){ void ContainerReport::setId(ContainerId &id){ ContainerIdProto *proto = new ContainerIdProto(); proto->CopyFrom(id.getProto()); - reportProto.set_allocated_id(proto); + reportProto.set_allocated_container_id(proto); } ContainerId ContainerReport::getId(){ - return ContainerId(reportProto.id()); + return ContainerId(reportProto.container_id()); } void ContainerReport::setResource(Resource &resource){ @@ -58,10 +58,10 @@ Resource ContainerReport::getResource(){ void ContainerReport::setNodeId(NodeId &nodeId){ NodeIdProto *proto = new NodeIdProto(); proto->CopyFrom(nodeId.getProto()); - reportProto.set_allocated_nodeid(proto); + reportProto.set_allocated_node_id(proto); } NodeId ContainerReport::getNodeId(){ - return NodeId(reportProto.nodeid()); + return NodeId(reportProto.node_id()); } void ContainerReport::setPriority(Priority &priority){ @@ -74,19 +74,19 @@ Priority ContainerReport::getPriority(){ return Priority(reportProto.priority()); } -void ContainerReport::setStartTime(int64_t time){ - reportProto.set_starttime(time); +void ContainerReport::setCreationTime(int64_t time){ + reportProto.set_creation_time(time); } -int64_t ContainerReport::getStartTime(){ - return reportProto.starttime(); +int64_t ContainerReport::getCreationTime(){ + return reportProto.creation_time(); } void ContainerReport::setFinishTime(int64_t time){ - reportProto.set_finishtime(time); + reportProto.set_finish_time(time); } int64_t ContainerReport::getFinishTime(){ - return reportProto.finishtime(); + return reportProto.finish_time(); } void ContainerReport::setContainerExitStatus(ContainerExitStatus container_exit_status){ @@ -98,27 +98,27 @@ ContainerExitStatus ContainerReport::getContainerExitStatus(){ } void ContainerReport::setContaierState(ContainerState state){ - reportProto.set_state((ContainerStateProto)state); + reportProto.set_container_state((ContainerStateProto)state); } ContainerState ContainerReport::getContainerState(){ - return (ContainerState)reportProto.state(); + return (ContainerState)reportProto.container_state(); } void ContainerReport::setDiagnostics(string &diagnostics){ - reportProto.set_diagnostics(diagnostics); + reportProto.set_diagnostics_info(diagnostics); } string ContainerReport::getDiagnostics(){ - return reportProto.diagnostics(); + return reportProto.diagnostics_info(); } void ContainerReport::setLogUrl(string &url){ - reportProto.set_logurl(url); + reportProto.set_log_url(url); } string ContainerReport::getLogUrl(){ - return reportProto.logurl(); + return reportProto.log_url(); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/depends/libyarn/src/records/ContainerReport.h ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/records/ContainerReport.h b/depends/libyarn/src/records/ContainerReport.h index 41d0c54..d2b7bf9 100644 --- a/depends/libyarn/src/records/ContainerReport.h +++ b/depends/libyarn/src/records/ContainerReport.h @@ -70,8 +70,8 @@ public: void setPriority(Priority &priority); Priority getPriority(); - void setStartTime(int64_t time); - int64_t getStartTime(); + void setCreationTime(int64_t time); + int64_t getCreationTime(); void setFinishTime(int64_t time); int64_t getFinishTime(); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/src/backend/resourcemanager/include/resourcepool.h ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h index c2cc166..b5bfb3b 100644 --- a/src/backend/resourcemanager/include/resourcepool.h +++ b/src/backend/resourcemanager/include/resourcepool.h @@ -206,7 +206,7 @@ typedef struct SegResourceData SegResourceData; */ struct GRMContainerData { - int32_t ID; + int64_t ID; int32_t MemoryMB; int32_t Core; int32_t Life; /* Ref container life-cycle management. */ @@ -228,8 +228,8 @@ struct GRMContainerSetData { typedef struct GRMContainerSetData *GRMContainerSet; typedef struct GRMContainerSetData GRMContainerSetData; -GRMContainer createGRMContainer(uint32_t id, - int32_t memory, +GRMContainer createGRMContainer(int64_t id, + int32_t memory, double core, char *hostname, SegResource segres); @@ -642,10 +642,10 @@ void setAllSegResourceGRMUnavailable(void); struct RB_GRMContainerStatData { - int32_t ContainerID; + int64_t ContainerID; uint8_t isActive; uint8_t isFound; - uint8_t Reserved[2]; + uint8_t Reserved[6]; }; typedef struct RB_GRMContainerStatData RB_GRMContainerStatData; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/src/backend/resourcemanager/resourcebroker/resourcebroker_API.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_API.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_API.c index 8f1acbd..ef87629 100644 --- a/src/backend/resourcemanager/resourcebroker/resourcebroker_API.c +++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_API.c @@ -162,7 +162,7 @@ void RB_clearResource(List **ctnl) (*ctnl) = list_delete_first(*ctnl); MEMORY_CONTEXT_SWITCH_BACK - elog(LOG, "Resource broker dropped GRM container %d " + elog(LOG, "Resource broker dropped GRM container "INT64_FORMAT "(%d MB, %d CORE) on host %s", ctn->ID, ctn->MemoryMB, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c index 80b04a4..038a29d 100644 --- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c +++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c @@ -388,7 +388,7 @@ int RB_LIBYARN_returnResource(List **ctnl) appendSMBVar(&sendBuffer, ctn->ID); - elog(LOG, "YARN mode resource broker returned resource container %d " + elog(LOG, "YARN mode resource broker returned resource container "INT64_FORMAT "(%d MB, %d CORE) to host %s", ctn->ID, ctn->MemoryMB, @@ -711,7 +711,7 @@ int handleRB2RM_ClusterReport(void) int handleRB2RM_AllocatedResource(void) { int fd = ResBrokerNotifyPipe[0]; - int32_t *containerids = NULL; + int64_t *containerids = NULL; char *buffer = NULL; int acceptedcount = 0; int piperes = 0; @@ -748,8 +748,8 @@ int handleRB2RM_AllocatedResource(void) if ( response.ContainerCount > 0 ) { /* Read container ids */ - int contidsize = __SIZE_ALIGN64(sizeof(int32_t) * response.ContainerCount); - containerids = (int32_t *)rm_palloc0(PCONTEXT, contidsize); + int contidsize = __SIZE_ALIGN64(sizeof(int64_t) * response.ContainerCount); + containerids = (int64_t *)rm_palloc0(PCONTEXT, contidsize); piperes = readPipe(fd, containerids, contidsize); if ( piperes != contidsize ) @@ -999,7 +999,8 @@ void buildToReturnNotTrackedGRMContainers(RB_GRMContainerStat ctnstats, int size ctn->Life = 0; ctn->Resource = NULL; - elog(DEBUG3, "YARN mode resource broker creates dummy GRM container %d.", + elog(DEBUG3, "YARN mode resource broker creates dummy GRM container " + INT64_FORMAT".", ctn->ID); addGRMContainerToKicked(ctn); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c index 91b456b..11b612d 100644 --- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c +++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c @@ -63,7 +63,7 @@ int RB2YARN_acquireResource(uint32_t memorymb, uint32_t preferredSize, DQueue containerids, DQueue containerhosts); -int RB2YARN_returnResource(int32_t *contids, int contcount); +int RB2YARN_returnResource(int64_t *contids, int contcount); int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size); int RB2YARN_finishYARNApplication(void); int RB2YARN_disconnectFromYARN(void); @@ -618,13 +618,14 @@ int handleRM2RB_GetClusterReport(void) if ( ctnids.NodeCount > 0 ) { /* Return it at once. */ - int32_t *ctnidarr = rm_palloc0(PCONTEXT, - sizeof(int32_t) * ctnids.NodeCount); + int64_t *ctnidarr = rm_palloc0(PCONTEXT, + sizeof(int64_t) * ctnids.NodeCount); int idx = 0; - DQUEUE_LOOP_BEGIN(&ctnids, iter, void *, ctnid) - ctnidarr[idx] = TYPCONVERT(int32_t,ctnid); - elog(DEBUG3, "YARN mode resource broker returns container ID %d.", - TYPCONVERT(int32_t,ctnid)); + DQUEUE_LOOP_BEGIN(&ctnids, iter, int64_t *, pctnid) + ctnidarr[idx] = *pctnid; + elog(DEBUG3, "YARN mode resource broker returns container ID " + INT64_FORMAT".", + ctnidarr[idx]); idx++; DQUEUE_LOOP_END RB2YARN_freeContainersInMemory(&ctnids, &ctnhosts); @@ -890,9 +891,8 @@ int handleRM2RB_AllocateResource(void) appendSMBVar(&sendBuffer, responsehead); /* Append each container id. */ - DQUEUE_LOOP_BEGIN(&acquiredcontids, iter, void *, contid) - int32_t containerid = TYPCONVERT(int32_t, contid); - appendSMBVar(&sendBuffer, containerid); + DQUEUE_LOOP_BEGIN(&acquiredcontids, iter, int64_t *, pcontid) + appendSMBVar(&sendBuffer, *pcontid); DQUEUE_LOOP_END appendSelfMaintainBufferTill64bitAligned(&sendBuffer); @@ -990,7 +990,7 @@ int handleRM2RB_ReturnResource(void) int libyarnres = FUNCTION_SUCCEEDED; int piperes = 0; int actualsize = 0; - int32_t *containerids = NULL; + int64_t *containerids = NULL; /* Read request content. */ RPCRequestRBReturnResourceContainersHeadData request; @@ -1006,12 +1006,12 @@ int handleRM2RB_ReturnResource(void) /* Read the container id list. */ actualsize = (((request.ContainerCount + 1)>>1)<<1); - containerids = rm_palloc(PCONTEXT, sizeof(int32_t) * actualsize); + containerids = rm_palloc(PCONTEXT, sizeof(int64_t) * actualsize); piperes = readPipe(ResBrokerRequestPipe[0], containerids, - sizeof(int32_t) * actualsize); - if ( piperes != sizeof(int32_t) * actualsize ) + sizeof(int64_t) * actualsize); + if ( piperes != sizeof(int64_t) * actualsize ) { elog(WARNING, "YARN mode resource broker failed to read resource return " "request message (container ids) from pipe. " @@ -1030,7 +1030,8 @@ int handleRM2RB_ReturnResource(void) for ( int i = 0 ; i < request.ContainerCount ; ++i ) { - elog(LOG, "YARN mode resource broker tries to return container of id %d", + elog(LOG, "YARN mode resource broker tries to return container of id " + INT64_FORMAT, containerids[i]); } @@ -1163,7 +1164,7 @@ int handleRM2RB_GetContainerReport(void) for( int i = 0 ; i < size ; ++i ) { - elog(LOG, "Container report ID:%d, isActive:%d", + elog(LOG, "Container report ID:"INT64_FORMAT", isActive:%d", ctnstats[i].ContainerID, ctnstats[i].isActive); } @@ -1604,11 +1605,11 @@ int RB2YARN_acquireResource(uint32_t memorymb, PCONTEXT, HASHTABLE_SLOT_VOLUME_DEFAULT, HASHTABLE_SLOT_VOLUME_DEFAULT_MAX, - HASHTABLE_KEYTYPE_UINT32, + HASHTABLE_KEYTYPE_CHARARRAY, NULL); /* Activate containers. */ - int32_t activeContainerIds[allocatedResourcesArraySize]; + int64_t activeContainerIds[allocatedResourcesArraySize]; for ( int i = 0 ; i < allocatedResourcesArraySize ; ++i ) { activeContainerIds[i] = allocatedResourcesArray[i].containerId; } @@ -1627,7 +1628,7 @@ int RB2YARN_acquireResource(uint32_t memorymb, allocatedResourcesArraySize); /* Return the containers fail to activate. */ - int *activeFailIds = NULL; + int64_t *activeFailIds = NULL; int activeFailSize = 0; yarnres = getActiveFailContainerIds(LIBYARNClient, &activeFailIds, @@ -1642,14 +1643,13 @@ int RB2YARN_acquireResource(uint32_t memorymb, /* Build temporary failed container ids in hash table for fast retrieving.*/ if ( activeFailSize > 0 ) { for (int i = 0 ; i < activeFailSize ; ++i) { - elog(LOG, "YARN mode resource broker failed to activate container %d", + elog(LOG, "YARN mode resource broker failed to activate container "INT64_FORMAT, activeFailIds[i]); - setHASHTABLENode(&FailedIDIndex, - TYPCONVERT(void *, activeFailIds[i]), - TYPCONVERT(void *, activeFailIds[i]), - false); - } + SimpArray key; + setSimpleArrayRef(&key, (void *)&(activeFailIds[i]), sizeof(int64_t)); + setHASHTABLENode(&FailedIDIndex, &key, TYPCONVERT(void *, &activeFailIds[i]), false); + } yarnres = releaseResources(LIBYARNClient, YARNJobID, @@ -1663,11 +1663,12 @@ int RB2YARN_acquireResource(uint32_t memorymb, } /* Build result. */ - for ( int i = 0 ; i < allocatedResourcesArraySize ; ++i ) { + for ( int i = 0 ; i < allocatedResourcesArraySize ; ++i ) + { + int64_t *ctnid = (int64_t *)rm_palloc0(PCONTEXT, sizeof(int64_t)); + *ctnid = allocatedResourcesArray[i].containerId; + insertDQueueTailNode(containerids, ctnid); - insertDQueueTailNode(containerids, - TYPCONVERT(void *, - allocatedResourcesArray[i].containerId)); char *hostnamestr = (char *)rm_palloc0(PCONTEXT, strlen(allocatedResourcesArray[i].host) + 1); @@ -1675,7 +1676,7 @@ int RB2YARN_acquireResource(uint32_t memorymb, insertDQueueTailNode(containerhosts, hostnamestr); elog(LOG, "YARN mode resource broker allocated and activated container. " - "ID : %d (%d MB, %d CORE) at %s.", + "ID : "INT64_FORMAT"(%d MB, %d CORE) at %s.", allocatedResourcesArray[i].containerId, allocatedResourcesArray[i].memory, allocatedResourcesArray[i].vCores, @@ -1696,7 +1697,7 @@ exit: return FUNCTION_SUCCEEDED; } -int RB2YARN_returnResource(int32_t *contids, int contcount) +int RB2YARN_returnResource(int64_t *contids, int contcount) { if( contcount == 0 ) return FUNCTION_SUCCEEDED; @@ -1714,7 +1715,7 @@ int RB2YARN_returnResource(int32_t *contids, int contcount) } for ( int i = 0 ; i < contcount ; ++i ) { - elog(LOG, "YARN mode resource broker returned container of id %d", + elog(LOG, "YARN mode resource broker returned container of id "INT64_FORMAT, contids[i]); } @@ -1729,7 +1730,7 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size) int arrsize = 0; LibYarnContainerStatus_t *ctnstatarr = NULL; int ctnstatsize = 0; - int32_t *ctnidarr = NULL; + int64_t *ctnidarr = NULL; *ctnstats = NULL; *size = 0; @@ -1748,7 +1749,7 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size) * container status. The work round here is to call container status API * to get final container statuses. */ - ctnidarr = (int32_t *)rm_palloc(PCONTEXT, sizeof(int32_t) * arrsize); + ctnidarr = (int64_t *)rm_palloc(PCONTEXT, sizeof(int64_t) * arrsize); for ( int i = 0 ; i < arrsize ; ++i ) { ctnidarr[i] = ctnrparr[i].containerId; @@ -1778,8 +1779,6 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size) (*ctnstats)[i].ContainerID = ctnstatarr[i].containerId; (*ctnstats)[i].isActive = ctnstatarr[i].state == C_RUNNING ? 1 : 0; (*ctnstats)[i].isFound = 0; - (*ctnstats)[i].Reserved[0] = 0; - (*ctnstats)[i].Reserved[1] = 0; } } freeContainerStatusArray(ctnstatarr, ctnstatsize); @@ -1791,13 +1790,20 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size) void RB2YARN_freeContainersInMemory(DQueue containerids, DQueue containerhosts) { - if ( containerids != NULL ) { - removeAllDQueueNodes(containerids); + if ( containerids != NULL ) + { + while( containerids->NodeCount > 0 ) + { + int64_t *pctnid = removeDQueueHeadNode(containerids); + rm_pfree(PCONTEXT, pctnid); + } cleanDQueue(containerids); } - if ( containerhosts != NULL ) { - while( containerhosts->NodeCount > 0 ) { + if ( containerhosts != NULL ) + { + while( containerhosts->NodeCount > 0 ) + { char *hostname = removeDQueueHeadNode(containerhosts); rm_pfree(PCONTEXT, hostname); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a9344ab6/src/backend/resourcemanager/resourcepool.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c index 6d51dfa..931b80f 100644 --- a/src/backend/resourcemanager/resourcepool.c +++ b/src/backend/resourcemanager/resourcepool.c @@ -218,8 +218,8 @@ void getBufferedHostName(char *hostname, char **buffhostname) } } -GRMContainer createGRMContainer(uint32_t id, - int32_t memory, +GRMContainer createGRMContainer(int64_t id, + int32_t memory, double core, char *hostname, SegResource segres) @@ -3009,7 +3009,7 @@ void dropAllGRMContainersFromSegment(SegResource segres) count++; - elog(LOG, "Resource manager decides to return container %d in host %s " + elog(LOG, "Resource manager decides to return container "INT64_FORMAT" in host %s " "in order to drop all resource pool's GRM containers.", ctn->ID, ctn->HostName); @@ -3389,7 +3389,7 @@ void timeoutIdleGRMResourceToRBByRatio(int ratioindex, addGRMContainerToToBeKicked(retcont); (*realretcontnum)++; - elog(LOG, "Resource manager decides to return container %d in host %s", + elog(LOG, "Resource manager decides to return container "INT64_FORMAT" in host %s", retcont->ID, retcont->HostName); validateResourcePoolStatus(false); @@ -3519,16 +3519,15 @@ void checkGRMContainerStatus(RB_GRMContainerStat ctnstats, int size) PCONTEXT, HASHTABLE_SLOT_VOLUME_DEFAULT, HASHTABLE_SLOT_VOLUME_DEFAULT_MAX, - HASHTABLE_KEYTYPE_UINT32, + HASHTABLE_KEYTYPE_CHARARRAY, NULL); for ( int i = 0 ; i < size ; ++i ) { - elog(DEBUG3, "Resource manager tracks container %d.", + elog(DEBUG3, "Resource manager tracks container "INT64_FORMAT".", ctnstats[i].ContainerID); - setHASHTABLENode(&stattbl, - TYPCONVERT(void *, ctnstats[i].ContainerID), - &ctnstats[i], - false); + SimpArray key; + setSimpleArrayRef(&key, (char *)&(ctnstats[i].ContainerID), sizeof(int64_t)); + setHASHTABLENode(&stattbl, &key, &ctnstats[i], false); ctnstats[i].isFound = false; } @@ -3552,8 +3551,9 @@ void checkGRMContainerStatus(RB_GRMContainerStat ctnstats, int size) while( cell != NULL ) { GRMContainer ctn = (GRMContainer)lfirst(cell); - PAIR pair = (PAIR)getHASHTABLENode(&(stattbl), - TYPCONVERT(void *, ctn->ID)); + SimpArray key; + setSimpleArrayRef(&key, (char *)&(ctn->ID), sizeof(int64_t)); + PAIR pair = (PAIR)getHASHTABLENode(&(stattbl), &key); RB_GRMContainerStat ctnstat = pair != NULL ? (RB_GRMContainerStat)(pair->Value): NULL; @@ -3594,8 +3594,8 @@ void checkGRMContainerStatus(RB_GRMContainerStat ctnstats, int size) reorderSegResourceAvailIndex(segres, PQUEMGR->RatioReverseIndex[ridx]); addGRMContainerToToBeKicked(ctn); - elog(LOG, "Resource manager decides to return container %d " - "in host %s because %s.", + elog(LOG, "Resource manager decides to return container "INT64_FORMAT + " in host %s because %s.", ctn->ID, ctn->HostName, ctnstat == NULL ? @@ -3607,7 +3607,7 @@ void checkGRMContainerStatus(RB_GRMContainerStat ctnstats, int size) } else { - elog(DEBUG3, "Resource manager set container %d found.", + elog(DEBUG3, "Resource manager set container "INT64_FORMAT" found.", ctnstat->ContainerID); /* * For all containers not tracked by hawq rm, they are marked @@ -4229,7 +4229,7 @@ void dumpResourcePoolHosts(const char *filename) { GRMContainer ctn = (GRMContainer)lfirst(cell); fprintf(fp, "\tRESOURCE_CONTAINER(" - "ID=%d:" + "ID="INT64_FORMAT":" "MemoryMB=%d:Core=%d:" "Life=%d:" "HostName=%s)\n",