Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A04EB18BD7 for ; Wed, 30 Mar 2016 02:50:50 +0000 (UTC) Received: (qmail 55814 invoked by uid 500); 30 Mar 2016 02:50:50 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 55775 invoked by uid 500); 30 Mar 2016 02:50:50 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 55766 invoked by uid 99); 30 Mar 2016 02:50:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Mar 2016 02:50:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id F30421802E7 for ; Wed, 30 Mar 2016 02:50:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id snlWt7Q91u_L for ; Wed, 30 Mar 2016 02:50:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id E1A235F23D for ; Wed, 30 Mar 2016 02:50:45 +0000 (UTC) Received: (qmail 55754 invoked by uid 99); 30 Mar 2016 02:50:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Mar 2016 02:50:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EB884DFF73; Wed, 30 Mar 2016 02:50:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yjin@apache.org To: commits@hawq.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-hawq git commit: HAWQ-603. Remove unused code for resource manager segment side Date: Wed, 30 Mar 2016 02:50:44 +0000 (UTC) Repository: incubator-hawq Updated Branches: refs/heads/master a10483753 -> 8c273195e HAWQ-603. Remove unused code for resource manager segment side Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8c273195 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8c273195 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8c273195 Branch: refs/heads/master Commit: 8c273195e97e25e4ddc8b437441ce46de7ebaede Parents: a104837 Author: YI JIN Authored: Wed Mar 30 13:50:34 2016 +1100 Committer: YI JIN Committed: Wed Mar 30 13:50:34 2016 +1100 ---------------------------------------------------------------------- src/backend/resourcemanager/Makefile | 3 +- src/backend/resourcemanager/include/dynrm.h | 2 - .../resourcemanager/requesthandler_RMSEG.c | 430 ------------------- .../requesthandler_RMSEG_CGroup.c | 261 +++++++++++ src/backend/resourcemanager/resourcemanager.c | 1 - .../resourcemanager/resourcemanager_RMSEG.c | 15 +- 6 files changed, 264 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/Makefile ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/Makefile b/src/backend/resourcemanager/Makefile index ad2d701..2b2cb05 100644 --- a/src/backend/resourcemanager/Makefile +++ b/src/backend/resourcemanager/Makefile @@ -34,7 +34,8 @@ OBJS = resourcepool.o requesthandler_ddl.o \ hawqsite.o \ requesthandler_RMSEG.o requesthandler.o \ resourcemanager_RMSEG.o \ - resourcemanager.o + resourcemanager.o \ + requesthandler_RMSEG_CGroup.o SUBDIRS = communication utils resourcebroker resourceenforcer http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/include/dynrm.h ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h index 9ede38d..34f4b92 100644 --- a/src/backend/resourcemanager/include/dynrm.h +++ b/src/backend/resourcemanager/include/dynrm.h @@ -100,7 +100,6 @@ bool handleRMRequestAcquireResource(void **arg); bool handleRMRequestReturnResource(void **arg); bool handleRMSEGRequestIMAlive(void **arg); bool handleRMSEGRequestRUAlive(void **arg); -bool handleRMSEGRequestTmpDir(void **arg); bool handleRMRequestAcquireResourceQuota(void **arg); bool handleRMRequestRefreshResource(void **arg); bool handleRMRequestSegmentIsDown(void **arg); @@ -326,7 +325,6 @@ void updateStatusOfAllNodes(void); int ResManagerMainSegment2ndPhase(void); int initializeSocketServer_RMSEG(void); int MainHandlerLoop_RMSEG(void); -int MainHandler_RMSEGDummyLoop(void); void checkAndBuildFailedTmpDirList(void); #endif //DYNAMIC_RESOURCE_MANAGEMENT_H http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/requesthandler_RMSEG.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/requesthandler_RMSEG.c b/src/backend/resourcemanager/requesthandler_RMSEG.c index 12bd8ac..3f27c6d 100644 --- a/src/backend/resourcemanager/requesthandler_RMSEG.c +++ b/src/backend/resourcemanager/requesthandler_RMSEG.c @@ -39,14 +39,6 @@ #define RMSEG_INBUILDHOST SMBUFF_HEAD(SegStat, &localsegstat) -/* UINT32_T.MAX_VALUE = 4294967295 */ -#define MAX_DIGITS_OF_UINT32_T 10 - -/* Format of cgroup timestamp: YYYYMMDD_HHMMSS */ -#define LENTH_CGROUP_TIMESTAMP 15 - -char *buildCGroupNameString(int64 masterStartTime, uint32 connId); - /* * Refresh localhost information, this is used to build message for IMAlive. */ @@ -386,428 +378,6 @@ void checkLocalPostmasterStatus(void) PQfinish(conn); } -static void -checkTmpDirTable(uint32_t session_id, bool session_exists) -{ - if (!session_exists) - { - // New Session - uint32_t *data = rm_palloc0(DRMGlobalInstance->Context, sizeof(uint32_t)); - *data = session_id; - insertDQueueHeadNode(&DRMGlobalInstance->TmpDirLRUList, (void *)data); - } - - if (hash_get_num_entries(DRMGlobalInstance->LocalTmpDirTable) >= - DRMGlobalInstance->TmpDirTableCapacity) - { - // Delete LRU session - bool found; - TmpDirKey tmpdir_key; - TmpDirEntry *tmpdir_entry; - - uint32_t *data = (uint32_t *)removeDQueueTailNode(&DRMGlobalInstance->TmpDirLRUList); - tmpdir_key.session_id = *data; - rm_pfree(DRMGlobalInstance->Context, data); - tmpdir_entry = hash_search(DRMGlobalInstance->LocalTmpDirTable, - (void *)&tmpdir_key, - HASH_REMOVE, - &found); - if (found) - { - hash_destroy(tmpdir_entry->qe_tmp_dirs); - } - } - else - { - // Update LRU data - DQueueNode node = DRMGlobalInstance->TmpDirLRUList.Head; - while (node != NULL) - { - if (*(uint32_t *)node->Data == session_id) - { - break; - } - node = node->Next; - } - - if (node && node != DRMGlobalInstance->TmpDirLRUList.Head) - { - insertDQueueHeadNode(&DRMGlobalInstance->TmpDirLRUList, removeDQueueNode(&DRMGlobalInstance->TmpDirLRUList, node)); - } - } -} - - -bool handleRMSEGRequestTmpDir(void **arg) -{ - ConnectionTrack conntrack = (ConnectionTrack)(*arg); - char response[8]; - uint32_t session_id = 0; - uint32_t command_id = 0; - int32_t qeidx = -1; - TmpDirKey tmpdir_key; - TmpDirEntry *tmpdir_entry; - QETmpDirKey qe_tmpdir_key; - QETmpDirEntry *qe_tmpdir_entry; - bool session_found; - bool new_command; - bool qeidx_found; - SelfMaintainBufferData sendbuff; - - if (DRMGlobalInstance->NextLocalHostTempDirIdx < 0 - || DRMGlobalInstance->NextLocalHostTempDirIdxForQD < 0) { - - *(uint32_t *)response = RM_STATUS_BAD_TMPDIR; - *(uint32_t *)(response + 4) = 0; - - buildResponseIntoConnTrack(conntrack, - response, - sizeof(response), - conntrack->MessageMark1, - conntrack->MessageMark2, - RESPONSE_RM_TMPDIR); - - elog(LOG, "handleRMSEGRequestTmpDir, no existing tmp dirs in the " - "segment resource manager"); - } else { - - session_id = *((uint32_t *)(conntrack->MessageBuff.Buffer + 0)); - command_id = *((uint32_t *)(conntrack->MessageBuff.Buffer + 4)); - qeidx = *((int32_t *)(conntrack->MessageBuff.Buffer + 8)); - initializeSelfMaintainBuffer(&sendbuff, PCONTEXT); - - if (qeidx == -1) - { - // QE on master - int tmpdir_idx = 0; - if (session_id > 0) - { - tmpdir_idx = session_id % getDQueueLength(&DRMGlobalInstance->LocalHostTempDirectoriesForQD); - } - - SimpStringPtr tmpdir = (SimpStringPtr) - getDQueueNodeDataByIndex(&DRMGlobalInstance->LocalHostTempDirectoriesForQD, tmpdir_idx); - - appendSMBSimpStr(&sendbuff, tmpdir); - appendSelfMaintainBufferTill64bitAligned(&sendbuff); - - char tmpdir_string[TMPDIR_MAX_LENGTH] = {0}; - memset(tmpdir_string, 0, TMPDIR_MAX_LENGTH); - memcpy(tmpdir_string, tmpdir->Str, tmpdir->Len); - - elog(LOG, "handleRMSEGRequestTmpDir session_id:%u command_id:%u qe_idx:%d tmpdir:%s", - session_id, - command_id, - qeidx, - tmpdir_string); - } - else - { - // QE on segment - tmpdir_key.session_id = session_id; - qe_tmpdir_key.qeidx = qeidx; - - tmpdir_entry = (TmpDirEntry *) - hash_search(DRMGlobalInstance->LocalTmpDirTable, - (void *)&tmpdir_key, - HASH_ENTER, - &session_found); - if (!session_found) - { - // New Session - new_command = true; - } - else - { - // Existing Session - if (command_id == tmpdir_entry->command_id) - { - // In the same command - new_command = false; - } - else - { - // New command - new_command = true; - hash_destroy(tmpdir_entry->qe_tmp_dirs); - } - } - - if (new_command) - { - tmpdir_entry->command_id = command_id; - HASHCTL ctl; - ctl.keysize = sizeof(QETmpDirKey); - ctl.entrysize = sizeof(QETmpDirEntry); - ctl.hcxt = DRMGlobalInstance->Context; - tmpdir_entry->qe_tmp_dirs = - hash_create("Executor session temporary directory table", - 16, - &ctl, - HASH_ELEM); - } - - qe_tmpdir_entry = (QETmpDirEntry *)hash_search(tmpdir_entry->qe_tmp_dirs, - (void *)&qe_tmpdir_key, - HASH_ENTER, - &qeidx_found); - - if (!qeidx_found) - { - // New QE - SimpStringPtr tmpdir = - (SimpStringPtr) - getDQueueNodeDataByIndex(&DRMGlobalInstance->LocalHostTempDirectories, DRMGlobalInstance->NextLocalHostTempDirIdx); - DRMGlobalInstance->NextLocalHostTempDirIdx= - (DRMGlobalInstance->NextLocalHostTempDirIdx + 1) % - getDQueueLength(&DRMGlobalInstance->LocalHostTempDirectories); - memset(qe_tmpdir_entry->tmpdir, 0, TMPDIR_MAX_LENGTH); - memcpy(qe_tmpdir_entry->tmpdir, tmpdir->Str, tmpdir->Len); - } - - checkTmpDirTable(session_id, session_found); - - appendSMBStr(&sendbuff, qe_tmpdir_entry->tmpdir); - appendSelfMaintainBufferTill64bitAligned(&sendbuff); - - elog(LOG, "handleRMSEGRequestTmpDir session_id:%u command_id:%u qe_idx:%d tmpdir:%s", - session_id, - command_id, - qeidx, - qe_tmpdir_entry->tmpdir); - } - - buildResponseIntoConnTrack(conntrack, - sendbuff.Buffer, - sendbuff.Cursor + 1, - conntrack->MessageMark1, - conntrack->MessageMark2, - RESPONSE_RM_TMPDIR); - destroySelfMaintainBuffer(&sendbuff); - } - - conntrack->ResponseSent = false; - MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) - PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); - MEMORY_CONTEXT_SWITCH_BACK - - return true; -} - -char *buildCGroupNameString(int64 masterStartTime, uint32 connId) -{ - int tLength = strlen(timestamptz_to_str(masterStartTime)); - char *tTime = (char *)rm_palloc0(PCONTEXT, tLength+1); - strcpy(tTime, timestamptz_to_str(masterStartTime)); - - char *sTime = (char *)rm_palloc0(PCONTEXT, LENTH_CGROUP_TIMESTAMP+1); - sprintf(sTime, "%.4s%.2s%.2s_%.2s%.2s%.2s", - tTime, - tTime + 5, - tTime + 8, - tTime + 11, - tTime + 14, - tTime + 17); - - /* CGroup name follows the format: hawq-YYMMDD_HHMMSS-connCONNECTIONID */ - char *cgroupName = (char*)rm_palloc0(PCONTEXT, - sizeof("hawq-") -1 + strlen(sTime) + - sizeof("-conn") -1 + MAX_DIGITS_OF_UINT32_T + 1); - sprintf(cgroupName, "hawq-%s-conn%d", sTime, connId); - - rm_pfree(PCONTEXT, sTime); - rm_pfree(PCONTEXT, tTime); - - return cgroupName; -} -/** - * Handle QE MoveToGroup function call. - */ -bool handleQEMoveToCGroup(void **arg) -{ - ConnectionTrack conntrack = (ConnectionTrack)(*arg); - Assert(conntrack != NULL); - - RPCResponseMoveToCGroupData response; - - int segmentPid; - TimestampTz masterStartTime; - uint32_t connId; - int segId; - - RPCRequestMoveToCGroup request = (RPCRequestMoveToCGroup) - (conntrack->MessageBuff.Buffer); - masterStartTime = request->MasterStartTime; - connId = request->ConnID; - segId = request->SegmentID; - segmentPid = request->ProcID; - - elog(DEBUG1, "Resource enforcer moves QE to CGroup: " - "masterStartTime = %s, connId = %d, segId = %d, procId = %d", - timestamptz_to_str(masterStartTime), - connId, segmentPid, segmentPid); - - response.Result = FUNC_RETURN_OK; - response.Reserved = 0; - - buildResponseIntoConnTrack(conntrack, - (char *)&response, - sizeof(response), - conntrack->MessageMark1, - conntrack->MessageMark2, - RESPONSE_QE_MOVETOCGROUP); - - conntrack->ResponseSent = false; - MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) - PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); - MEMORY_CONTEXT_SWITCH_BACK - - /* Prepare cgroupName from masterStartTime, connId and segId information */ - char *cgroupName = buildCGroupNameString(masterStartTime, connId); - - /* Move To CGroup in thread */ - ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)malloc(sizeof(ResourceEnforcementRequest)); - task->type = MOVETO; - task->pid = segmentPid; - memset(task->cgroup_name, 0, sizeof(task->cgroup_name)); - strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1); - enqueue(g_queue_cgroup, (void *)task); - - rm_pfree(PCONTEXT, cgroupName); - - return true; -} - -/** - * Handle QE MoveOutGroup function call. - */ -bool handleQEMoveOutCGroup(void **arg) -{ - ConnectionTrack conntrack = (ConnectionTrack)(*arg); - Assert(conntrack != NULL); - - RPCResponseMoveOutCGroupData response; - - int segmentPid; - TimestampTz masterStartTime; - uint32_t connId; - int segId; - - RPCRequestMoveOutCGroup request = (RPCRequestMoveOutCGroup) - (conntrack->MessageBuff.Buffer); - masterStartTime = request->MasterStartTime; - connId = request->ConnID; - segId = request->SegmentID; - segmentPid = request->ProcID; - - elog(DEBUG1, "Resource enforcer moves QE out from CGroup: " - "masterStartTime = %s, connId = %d, segId = %d, procId = %d", - timestamptz_to_str(masterStartTime), - connId, segmentPid, segmentPid); - - response.Result = FUNC_RETURN_OK; - response.Reserved = 0; - - buildResponseIntoConnTrack(conntrack, - (char *)&response, - sizeof(response), - conntrack->MessageMark1, - conntrack->MessageMark2, - RESPONSE_QE_MOVEOUTCGROUP); - - conntrack->ResponseSent = false; - MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) - PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); - MEMORY_CONTEXT_SWITCH_BACK - - /* Prepare cgroupName from masterStartTime, connId and segId information */ - char *cgroupName = buildCGroupNameString(masterStartTime, connId); - - /* Move Out CGroup in thread */ - ResourceEnforcementRequest *task = (ResourceEnforcementRequest *) - malloc(sizeof(ResourceEnforcementRequest)); - task->type = MOVEOUT; - task->pid = segmentPid; - memset(task->cgroup_name, 0, sizeof(task->cgroup_name)); - strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1); - enqueue(g_queue_cgroup, (void *)task); - - rm_pfree(PCONTEXT, cgroupName); - - return true; -} - -/** - * Handle QE SetWeightGroup function call. - */ -bool handleQESetWeightCGroup(void **arg) -{ - ConnectionTrack conntrack = (ConnectionTrack)(*arg); - Assert(conntrack != NULL); - - TimestampTz masterStartTime; - uint32_t connId; - int segId; - int segmentPid; - double weight; - - RPCRequestSetWeightCGroup request = (RPCRequestSetWeightCGroup) - (conntrack->MessageBuff.Buffer); - masterStartTime = request->MasterStartTime; - connId = request->ConnID; - segId = request->SegmentID; - segmentPid = request->ProcID; - weight = request->Weight; - - elog(DEBUG1, "Resource enforcer sets weight for QE in CGroup: " - "masterStartTime = %s, connId = %d, segId = %d, " - "procId = %d, weight = %lf", - timestamptz_to_str(masterStartTime), - connId, segmentPid, segmentPid, weight); - - /* Prepare cgroupName from masterStartTime, connId and segId information */ - char *cgroupName = buildCGroupNameString(masterStartTime, connId); - - /* Build request instance and add request into the queue. */ - ResourceEnforcementRequest *task = (ResourceEnforcementRequest *) - malloc(sizeof(ResourceEnforcementRequest)); - - if ( task == NULL ) { - elog(ERROR, "Resource enforcer fails to malloc " - "resource enforcement request instance"); - } - task->type = SETWEIGHT; - task->pid = segmentPid; - memset(task->cgroup_name, 0, sizeof(task->cgroup_name)); - strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1); - task->query_resource.vcore = weight; - if (enqueue(g_queue_cgroup, (void *)task) == -1 ) { - elog(ERROR, "Resource enforcer fails to add " - "resource enforcement request into task queue"); - } - - RPCResponseSetWeightCGroupData response; - response.Result = FUNC_RETURN_OK; - response.Reserved = 0; - - buildResponseIntoConnTrack(conntrack, - (char *)&response, - sizeof(response), - conntrack->MessageMark1, - conntrack->MessageMark2, - RESPONSE_QE_SETWEIGHTCGROUP); - - conntrack->ResponseSent = false; - MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) - PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); - MEMORY_CONTEXT_SWITCH_BACK - - rm_pfree(PCONTEXT, cgroupName); - - return true; -} - - /** * Handle IncreaseMemQuota request from resource manager server */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c b/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c new file mode 100644 index 0000000..81900e8 --- /dev/null +++ b/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "envswitch.h" +#include "dynrm.h" +#include "time.h" + +#include "resourcemanager/resourcemanager.h" +#include "communication/rmcomm_RMSEG2RM.h" +#include "communication/rmcomm_RM2RMSEG.h" +#include "communication/rmcomm_MessageHandler.h" +#include "communication/rmcomm_QE_RMSEG_Protocol.h" +#include "communication/rmcomm_RM_RMSEG_Protocol.h" +#include "utils/simplestring.h" +#include "utils/linkedlist.h" + +#include "utils/palloc.h" + +#include "gp-libpq-fe.h" +#include "gp-libpq-int.h" + +#include "resourceenforcer/resourceenforcer_message.h" + +/* UINT32_T.MAX_VALUE = 4294967295 */ +#define MAX_DIGITS_OF_UINT32_T 10 + +/* Format of cgroup timestamp: YYYYMMDD_HHMMSS */ +#define LENTH_CGROUP_TIMESTAMP 15 + +char *buildCGroupNameString(int64 masterStartTime, uint32 connId); + + +char *buildCGroupNameString(int64 masterStartTime, uint32 connId) +{ + int tLength = strlen(timestamptz_to_str(masterStartTime)); + char *tTime = (char *)rm_palloc0(PCONTEXT, tLength+1); + strcpy(tTime, timestamptz_to_str(masterStartTime)); + + char *sTime = (char *)rm_palloc0(PCONTEXT, LENTH_CGROUP_TIMESTAMP+1); + sprintf(sTime, "%.4s%.2s%.2s_%.2s%.2s%.2s", + tTime, + tTime + 5, + tTime + 8, + tTime + 11, + tTime + 14, + tTime + 17); + + /* CGroup name follows the format: hawq-YYMMDD_HHMMSS-connCONNECTIONID */ + char *cgroupName = (char*)rm_palloc0(PCONTEXT, + sizeof("hawq-") -1 + strlen(sTime) + + sizeof("-conn") -1 + MAX_DIGITS_OF_UINT32_T + 1); + sprintf(cgroupName, "hawq-%s-conn%d", sTime, connId); + + rm_pfree(PCONTEXT, sTime); + rm_pfree(PCONTEXT, tTime); + + return cgroupName; +} +/** + * Handle QE MoveToGroup function call. + */ +bool handleQEMoveToCGroup(void **arg) +{ + ConnectionTrack conntrack = (ConnectionTrack)(*arg); + Assert(conntrack != NULL); + + RPCResponseMoveToCGroupData response; + + int segmentPid; + TimestampTz masterStartTime; + uint32_t connId; + int segId; + + RPCRequestMoveToCGroup request = (RPCRequestMoveToCGroup) + (conntrack->MessageBuff.Buffer); + masterStartTime = request->MasterStartTime; + connId = request->ConnID; + segId = request->SegmentID; + segmentPid = request->ProcID; + + elog(DEBUG1, "Resource enforcer moves QE to CGroup: " + "masterStartTime = %s, connId = %d, segId = %d, procId = %d", + timestamptz_to_str(masterStartTime), + connId, segmentPid, segmentPid); + + response.Result = FUNC_RETURN_OK; + response.Reserved = 0; + + buildResponseIntoConnTrack(conntrack, + (char *)&response, + sizeof(response), + conntrack->MessageMark1, + conntrack->MessageMark2, + RESPONSE_QE_MOVETOCGROUP); + + conntrack->ResponseSent = false; + MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) + PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); + MEMORY_CONTEXT_SWITCH_BACK + + /* Prepare cgroupName from masterStartTime, connId and segId information */ + char *cgroupName = buildCGroupNameString(masterStartTime, connId); + + /* Move To CGroup in thread */ + ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)malloc(sizeof(ResourceEnforcementRequest)); + task->type = MOVETO; + task->pid = segmentPid; + memset(task->cgroup_name, 0, sizeof(task->cgroup_name)); + strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1); + enqueue(g_queue_cgroup, (void *)task); + + rm_pfree(PCONTEXT, cgroupName); + + return true; +} + +/** + * Handle QE MoveOutGroup function call. + */ +bool handleQEMoveOutCGroup(void **arg) +{ + ConnectionTrack conntrack = (ConnectionTrack)(*arg); + Assert(conntrack != NULL); + + RPCResponseMoveOutCGroupData response; + + int segmentPid; + TimestampTz masterStartTime; + uint32_t connId; + int segId; + + RPCRequestMoveOutCGroup request = (RPCRequestMoveOutCGroup) + (conntrack->MessageBuff.Buffer); + masterStartTime = request->MasterStartTime; + connId = request->ConnID; + segId = request->SegmentID; + segmentPid = request->ProcID; + + elog(DEBUG1, "Resource enforcer moves QE out from CGroup: " + "masterStartTime = %s, connId = %d, segId = %d, procId = %d", + timestamptz_to_str(masterStartTime), + connId, segmentPid, segmentPid); + + response.Result = FUNC_RETURN_OK; + response.Reserved = 0; + + buildResponseIntoConnTrack(conntrack, + (char *)&response, + sizeof(response), + conntrack->MessageMark1, + conntrack->MessageMark2, + RESPONSE_QE_MOVEOUTCGROUP); + + conntrack->ResponseSent = false; + MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) + PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); + MEMORY_CONTEXT_SWITCH_BACK + + /* Prepare cgroupName from masterStartTime, connId and segId information */ + char *cgroupName = buildCGroupNameString(masterStartTime, connId); + + /* Move Out CGroup in thread */ + ResourceEnforcementRequest *task = (ResourceEnforcementRequest *) + malloc(sizeof(ResourceEnforcementRequest)); + task->type = MOVEOUT; + task->pid = segmentPid; + memset(task->cgroup_name, 0, sizeof(task->cgroup_name)); + strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1); + enqueue(g_queue_cgroup, (void *)task); + + rm_pfree(PCONTEXT, cgroupName); + + return true; +} + +/** + * Handle QE SetWeightGroup function call. + */ +bool handleQESetWeightCGroup(void **arg) +{ + ConnectionTrack conntrack = (ConnectionTrack)(*arg); + Assert(conntrack != NULL); + + TimestampTz masterStartTime; + uint32_t connId; + int segId; + int segmentPid; + double weight; + + RPCRequestSetWeightCGroup request = (RPCRequestSetWeightCGroup) + (conntrack->MessageBuff.Buffer); + masterStartTime = request->MasterStartTime; + connId = request->ConnID; + segId = request->SegmentID; + segmentPid = request->ProcID; + weight = request->Weight; + + elog(DEBUG1, "Resource enforcer sets weight for QE in CGroup: " + "masterStartTime = %s, connId = %d, segId = %d, " + "procId = %d, weight = %lf", + timestamptz_to_str(masterStartTime), + connId, segmentPid, segmentPid, weight); + + /* Prepare cgroupName from masterStartTime, connId and segId information */ + char *cgroupName = buildCGroupNameString(masterStartTime, connId); + + /* Build request instance and add request into the queue. */ + ResourceEnforcementRequest *task = (ResourceEnforcementRequest *) + malloc(sizeof(ResourceEnforcementRequest)); + + if ( task == NULL ) { + elog(ERROR, "Resource enforcer fails to malloc " + "resource enforcement request instance"); + } + task->type = SETWEIGHT; + task->pid = segmentPid; + memset(task->cgroup_name, 0, sizeof(task->cgroup_name)); + strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1); + task->query_resource.vcore = weight; + if (enqueue(g_queue_cgroup, (void *)task) == -1 ) { + elog(ERROR, "Resource enforcer fails to add " + "resource enforcement request into task queue"); + } + + RPCResponseSetWeightCGroupData response; + response.Result = FUNC_RETURN_OK; + response.Reserved = 0; + + buildResponseIntoConnTrack(conntrack, + (char *)&response, + sizeof(response), + conntrack->MessageMark1, + conntrack->MessageMark2, + RESPONSE_QE_SETWEIGHTCGROUP); + + conntrack->ResponseSent = false; + MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) + PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack); + MEMORY_CONTEXT_SWITCH_BACK + + rm_pfree(PCONTEXT, cgroupName); + + return true; +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/resourcemanager.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c index 6175cf2..8fae27b 100644 --- a/src/backend/resourcemanager/resourcemanager.c +++ b/src/backend/resourcemanager/resourcemanager.c @@ -453,7 +453,6 @@ int ResManagerMainServer2ndPhase(void) registerMessageHandler(REQUEST_RM_IMALIVE , handleRMSEGRequestIMAlive); registerMessageHandler(REQUEST_QD_DDL_MANIPULATERESQUEUE, handleRMDDLRequestManipulateResourceQueue); registerMessageHandler(REQUEST_QD_DDL_MANIPULATEROLE , handleRMDDLRequestManipulateRole); - registerMessageHandler(REQUEST_RM_TMPDIR , handleRMSEGRequestTmpDir); registerMessageHandler(REQUEST_QD_ACQUIRE_RESOURCE_QUOTA, handleRMRequestAcquireResourceQuota); registerMessageHandler(REQUEST_QD_REFRESH_RESOURCE , handleRMRequestRefreshResource); registerMessageHandler(REQUEST_QD_SEGMENT_ISDOWN , handleRMRequestSegmentIsDown); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/resourcemanager_RMSEG.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcemanager_RMSEG.c b/src/backend/resourcemanager/resourcemanager_RMSEG.c index ca43f87..d95aad4 100644 --- a/src/backend/resourcemanager/resourcemanager_RMSEG.c +++ b/src/backend/resourcemanager/resourcemanager_RMSEG.c @@ -40,7 +40,6 @@ int ResManagerMainSegment2ndPhase(void) registerMessageHandler(REQUEST_QE_SETWEIGHTCGROUP, handleQESetWeightCGroup); registerMessageHandler(REQUEST_RM_INCREASE_MEMORY_QUOTA, handleRMIncreaseMemoryQuota); registerMessageHandler(REQUEST_RM_DECREASE_MEMORY_QUOTA, handleRMDecreaseMemoryQuota); - registerMessageHandler(REQUEST_RM_TMPDIR, handleRMSEGRequestTmpDir); registerMessageHandler(REQUEST_RM_RUALIVE, handleRMSEGRequestRUAlive); @@ -49,8 +48,7 @@ int ResManagerMainSegment2ndPhase(void) **************************************************************************/ res = initializeSocketServer_RMSEG(); if ( res != FUNC_RETURN_OK ) { - elog(LOG, "Fail to initialize socket server. Segment sleeps for ever."); - MainHandler_RMSEGDummyLoop(); + elog(FATAL, "Fail to initialize socket server."); } /* @@ -222,17 +220,6 @@ int MainHandlerLoop_RMSEG(void) return res; } -int MainHandler_RMSEGDummyLoop(void) -{ - while( DRMGlobalInstance->ResManagerMainKeepRun ) { - sleep(1000000); - } - - elog(RMLOG, "Dummy resource manager main event handler exits."); - - return FUNC_RETURN_OK; -} - /* * Check if this temporary directory is OK to read or write. * If not, it's probably due to disk error.