hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject incubator-hawq git commit: HAWQ-603. Remove unused code for resource manager segment side
Date Wed, 30 Mar 2016 02:50:44 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master a10483753 -> 8c273195e


HAWQ-603. Remove unused code for resource manager segment side


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8c273195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8c273195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8c273195

Branch: refs/heads/master
Commit: 8c273195e97e25e4ddc8b437441ce46de7ebaede
Parents: a104837
Author: YI JIN <yjin@pivotal.io>
Authored: Wed Mar 30 13:50:34 2016 +1100
Committer: YI JIN <yjin@pivotal.io>
Committed: Wed Mar 30 13:50:34 2016 +1100

----------------------------------------------------------------------
 src/backend/resourcemanager/Makefile            |   3 +-
 src/backend/resourcemanager/include/dynrm.h     |   2 -
 .../resourcemanager/requesthandler_RMSEG.c      | 430 -------------------
 .../requesthandler_RMSEG_CGroup.c               | 261 +++++++++++
 src/backend/resourcemanager/resourcemanager.c   |   1 -
 .../resourcemanager/resourcemanager_RMSEG.c     |  15 +-
 6 files changed, 264 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/Makefile b/src/backend/resourcemanager/Makefile
index ad2d701..2b2cb05 100644
--- a/src/backend/resourcemanager/Makefile
+++ b/src/backend/resourcemanager/Makefile
@@ -34,7 +34,8 @@ OBJS = resourcepool.o requesthandler_ddl.o \
        hawqsite.o \
        requesthandler_RMSEG.o requesthandler.o \
        resourcemanager_RMSEG.o \
-       resourcemanager.o
+       resourcemanager.o \
+       requesthandler_RMSEG_CGroup.o
 
 SUBDIRS = communication utils resourcebroker resourceenforcer
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h
index 9ede38d..34f4b92 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -100,7 +100,6 @@ bool handleRMRequestAcquireResource(void **arg);
 bool handleRMRequestReturnResource(void **arg);
 bool handleRMSEGRequestIMAlive(void **arg);
 bool handleRMSEGRequestRUAlive(void **arg);
-bool handleRMSEGRequestTmpDir(void **arg);
 bool handleRMRequestAcquireResourceQuota(void **arg);
 bool handleRMRequestRefreshResource(void **arg);
 bool handleRMRequestSegmentIsDown(void **arg);
@@ -326,7 +325,6 @@ void updateStatusOfAllNodes(void);
 int  ResManagerMainSegment2ndPhase(void);
 int  initializeSocketServer_RMSEG(void);
 int  MainHandlerLoop_RMSEG(void);
-int  MainHandler_RMSEGDummyLoop(void);
 
 void checkAndBuildFailedTmpDirList(void);
 #endif //DYNAMIC_RESOURCE_MANAGEMENT_H

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/requesthandler_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_RMSEG.c b/src/backend/resourcemanager/requesthandler_RMSEG.c
index 12bd8ac..3f27c6d 100644
--- a/src/backend/resourcemanager/requesthandler_RMSEG.c
+++ b/src/backend/resourcemanager/requesthandler_RMSEG.c
@@ -39,14 +39,6 @@
 
 #define RMSEG_INBUILDHOST SMBUFF_HEAD(SegStat, &localsegstat)
 
-/* UINT32_T.MAX_VALUE = 4294967295 */
-#define MAX_DIGITS_OF_UINT32_T 10
-
-/* Format of cgroup timestamp: YYYYMMDD_HHMMSS */
-#define LENTH_CGROUP_TIMESTAMP 15
-
-char *buildCGroupNameString(int64 masterStartTime, uint32 connId);
-
 /*
  * Refresh localhost information, this is used to build message for IMAlive.
  */
@@ -386,428 +378,6 @@ void checkLocalPostmasterStatus(void)
 	PQfinish(conn);
 }
 
-static void
-checkTmpDirTable(uint32_t session_id, bool session_exists)
-{
-    if (!session_exists)
-    {
-        // New Session
-        uint32_t *data = rm_palloc0(DRMGlobalInstance->Context, sizeof(uint32_t));
-        *data = session_id;
-        insertDQueueHeadNode(&DRMGlobalInstance->TmpDirLRUList, (void *)data);
-    }
-
-    if (hash_get_num_entries(DRMGlobalInstance->LocalTmpDirTable) >=
-    	DRMGlobalInstance->TmpDirTableCapacity)
-    {
-        // Delete LRU session
-        bool            found;
-        TmpDirKey       tmpdir_key;
-        TmpDirEntry     *tmpdir_entry;
-
-        uint32_t *data = (uint32_t *)removeDQueueTailNode(&DRMGlobalInstance->TmpDirLRUList);
-        tmpdir_key.session_id = *data;
-        rm_pfree(DRMGlobalInstance->Context, data);
-        tmpdir_entry = hash_search(DRMGlobalInstance->LocalTmpDirTable,
-                                   (void *)&tmpdir_key,
-								   HASH_REMOVE,
-								   &found);
-        if (found)
-        {
-        	hash_destroy(tmpdir_entry->qe_tmp_dirs);
-        }
-    }
-    else
-    {
-    	// Update LRU data
-    	DQueueNode node = DRMGlobalInstance->TmpDirLRUList.Head;
-    	while (node != NULL)
-    	{
-    		if (*(uint32_t *)node->Data == session_id)
-    		{
-    			break;
-    		}
-    		node = node->Next;
-    	}
-
-    	if (node && node != DRMGlobalInstance->TmpDirLRUList.Head)
-    	{
-    		insertDQueueHeadNode(&DRMGlobalInstance->TmpDirLRUList, removeDQueueNode(&DRMGlobalInstance->TmpDirLRUList,
node));
-    	}
-    }
-}
-
-
-bool handleRMSEGRequestTmpDir(void **arg)
-{
-    ConnectionTrack conntrack 	= (ConnectionTrack)(*arg);
-	char			response[8];
-    uint32_t 		session_id 	= 0;
-    uint32_t 		command_id 	= 0;
-    int32_t 		qeidx 		= -1;
-    TmpDirKey       tmpdir_key;
-    TmpDirEntry    *tmpdir_entry;
-    QETmpDirKey     qe_tmpdir_key;
-    QETmpDirEntry  *qe_tmpdir_entry;
-    bool 			session_found;
-    bool 			new_command;
-    bool 			qeidx_found;
-    SelfMaintainBufferData sendbuff;
-
-    if (DRMGlobalInstance->NextLocalHostTempDirIdx < 0
-        || DRMGlobalInstance->NextLocalHostTempDirIdxForQD < 0) {
-
-        *(uint32_t *)response       = RM_STATUS_BAD_TMPDIR;
-        *(uint32_t *)(response + 4) = 0;
-
-        buildResponseIntoConnTrack(conntrack,
-					   	   	       response,
-								   sizeof(response),
-								   conntrack->MessageMark1,
-								   conntrack->MessageMark2,
-								   RESPONSE_RM_TMPDIR);
-
-        elog(LOG, "handleRMSEGRequestTmpDir, no existing tmp dirs in the "
-        			 "segment resource manager");
-    } else {
-
-        session_id = *((uint32_t *)(conntrack->MessageBuff.Buffer + 0));
-        command_id = *((uint32_t *)(conntrack->MessageBuff.Buffer + 4));
-        qeidx      = *((int32_t  *)(conntrack->MessageBuff.Buffer + 8));
-        initializeSelfMaintainBuffer(&sendbuff, PCONTEXT);
-
-        if (qeidx == -1)
-        {
-            // QE on master
-            int tmpdir_idx = 0;
-            if (session_id > 0)
-            {
-                tmpdir_idx = session_id % getDQueueLength(&DRMGlobalInstance->LocalHostTempDirectoriesForQD);
-            }
-           
-            SimpStringPtr tmpdir = (SimpStringPtr)
-                getDQueueNodeDataByIndex(&DRMGlobalInstance->LocalHostTempDirectoriesForQD,
tmpdir_idx);
-
-            appendSMBSimpStr(&sendbuff, tmpdir);
-            appendSelfMaintainBufferTill64bitAligned(&sendbuff);
-           
-            char tmpdir_string[TMPDIR_MAX_LENGTH] = {0};
-            memset(tmpdir_string, 0, TMPDIR_MAX_LENGTH);
-            memcpy(tmpdir_string, tmpdir->Str, tmpdir->Len);
-
-            elog(LOG, "handleRMSEGRequestTmpDir session_id:%u command_id:%u qe_idx:%d tmpdir:%s",
-                	 session_id,
-					 command_id,
-					 qeidx,
-					 tmpdir_string);
-        }
-        else
-        {
-            // QE on segment
-            tmpdir_key.session_id = session_id;
-            qe_tmpdir_key.qeidx = qeidx;
-
-            tmpdir_entry = (TmpDirEntry *)
-                           hash_search(DRMGlobalInstance->LocalTmpDirTable,
-                                       (void *)&tmpdir_key,
-                                       HASH_ENTER,
-                                       &session_found);
-            if (!session_found)
-            {
-                // New Session
-                new_command = true;
-            }
-            else
-            {
-                // Existing Session
-                if (command_id == tmpdir_entry->command_id)
-                {
-                    // In the same command
-                    new_command = false;
-                }
-                else
-                {
-                    // New command
-                    new_command = true;
-                    hash_destroy(tmpdir_entry->qe_tmp_dirs);
-                }
-            }
-
-            if (new_command)
-            {
-                tmpdir_entry->command_id = command_id;
-                HASHCTL ctl;
-                ctl.keysize = sizeof(QETmpDirKey);
-                ctl.entrysize = sizeof(QETmpDirEntry);
-                ctl.hcxt = DRMGlobalInstance->Context;
-                tmpdir_entry->qe_tmp_dirs =
-                        hash_create("Executor session temporary directory table",
-                                    16,
-                                    &ctl,
-                                    HASH_ELEM);
-            }
-
-            qe_tmpdir_entry = (QETmpDirEntry *)hash_search(tmpdir_entry->qe_tmp_dirs,
-                                                           (void *)&qe_tmpdir_key,
-                                                           HASH_ENTER,
-                                                           &qeidx_found);
-
-            if (!qeidx_found)
-            {
-                // New QE
-                SimpStringPtr tmpdir =
-                        (SimpStringPtr)
-                        getDQueueNodeDataByIndex(&DRMGlobalInstance->LocalHostTempDirectories,
DRMGlobalInstance->NextLocalHostTempDirIdx);
-                DRMGlobalInstance->NextLocalHostTempDirIdx=
-                        (DRMGlobalInstance->NextLocalHostTempDirIdx + 1) %
-                         getDQueueLength(&DRMGlobalInstance->LocalHostTempDirectories);
-                memset(qe_tmpdir_entry->tmpdir, 0, TMPDIR_MAX_LENGTH);
-                memcpy(qe_tmpdir_entry->tmpdir, tmpdir->Str, tmpdir->Len);
-            }
-
-            checkTmpDirTable(session_id, session_found);
-
-            appendSMBStr(&sendbuff, qe_tmpdir_entry->tmpdir);
-            appendSelfMaintainBufferTill64bitAligned(&sendbuff);
-
-            elog(LOG, "handleRMSEGRequestTmpDir session_id:%u command_id:%u qe_idx:%d tmpdir:%s",
-                	 session_id,
-					 command_id,
-					 qeidx,
-					 qe_tmpdir_entry->tmpdir);
-        }
-        
-        buildResponseIntoConnTrack(conntrack,
-                                   sendbuff.Buffer,
-                                   sendbuff.Cursor + 1,
-                                   conntrack->MessageMark1,
-                                   conntrack->MessageMark2,
-                                   RESPONSE_RM_TMPDIR);
-        destroySelfMaintainBuffer(&sendbuff);
-    }
-
-    conntrack->ResponseSent = false;
-	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
-	MEMORY_CONTEXT_SWITCH_BACK
-
-    return true;
-}
-
-char *buildCGroupNameString(int64 masterStartTime, uint32 connId)
-{
-	int tLength = strlen(timestamptz_to_str(masterStartTime));
-	char *tTime = (char *)rm_palloc0(PCONTEXT, tLength+1);
-	strcpy(tTime, timestamptz_to_str(masterStartTime));
-
-	char *sTime = (char *)rm_palloc0(PCONTEXT, LENTH_CGROUP_TIMESTAMP+1);
-	sprintf(sTime, "%.4s%.2s%.2s_%.2s%.2s%.2s",
-			   	   tTime,
-			   	   tTime + 5,
-			   	   tTime + 8,
-			   	   tTime + 11,
-			   	   tTime + 14,
-			   	   tTime + 17);
-
-	/* CGroup name follows the format: hawq-YYMMDD_HHMMSS-connCONNECTIONID */
-	char *cgroupName = (char*)rm_palloc0(PCONTEXT,
-									  	 sizeof("hawq-") -1 + strlen(sTime) +
-									  	 sizeof("-conn") -1 + MAX_DIGITS_OF_UINT32_T + 1);
-	sprintf(cgroupName, "hawq-%s-conn%d", sTime, connId);
-
-	rm_pfree(PCONTEXT, sTime);
-	rm_pfree(PCONTEXT, tTime);
-
-	return cgroupName;
-}
-/**
- * Handle QE MoveToGroup function call.
- */
-bool handleQEMoveToCGroup(void **arg)
-{
-	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
-	Assert(conntrack != NULL);
-
-	RPCResponseMoveToCGroupData	response;
-
-	int					segmentPid;
-	TimestampTz			masterStartTime;
-	uint32_t			connId;
-	int					segId;
-
-	RPCRequestMoveToCGroup request = (RPCRequestMoveToCGroup)
-									 (conntrack->MessageBuff.Buffer);
-	masterStartTime = request->MasterStartTime;
-	connId			= request->ConnID;
-	segId			= request->SegmentID;
-	segmentPid		= request->ProcID;
-
-	elog(DEBUG1, "Resource enforcer moves QE to CGroup: "
-	             "masterStartTime = %s, connId = %d, segId = %d, procId = %d",
-	             timestamptz_to_str(masterStartTime),
-	             connId, segmentPid, segmentPid);
-
-	response.Result = FUNC_RETURN_OK;
-	response.Reserved = 0;
-
-	buildResponseIntoConnTrack(conntrack,
-	                           (char *)&response,
-	                           sizeof(response),
-	                           conntrack->MessageMark1,
-	                           conntrack->MessageMark2,
-	                           RESPONSE_QE_MOVETOCGROUP);
-
-	conntrack->ResponseSent = false;
-	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
-	MEMORY_CONTEXT_SWITCH_BACK
-
-	/* Prepare cgroupName from masterStartTime, connId and segId information */
-	char *cgroupName = buildCGroupNameString(masterStartTime, connId);
-
-	/* Move To CGroup in thread */
-	ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)malloc(sizeof(ResourceEnforcementRequest));
-	task->type = MOVETO;
-	task->pid = segmentPid;
-	memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
-	strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
-	enqueue(g_queue_cgroup, (void *)task);
-
-	rm_pfree(PCONTEXT, cgroupName);
-
-	return true;
-}
-
-/**
- * Handle QE MoveOutGroup function call.
- */
-bool handleQEMoveOutCGroup(void **arg)
-{
-	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
-	Assert(conntrack != NULL);
-
-	RPCResponseMoveOutCGroupData response;
-
-	int				segmentPid;
-	TimestampTz		masterStartTime;
-	uint32_t		connId;
-	int				segId;
-
-	RPCRequestMoveOutCGroup request = (RPCRequestMoveOutCGroup)
-									  (conntrack->MessageBuff.Buffer);
-	masterStartTime = request->MasterStartTime;
-	connId			= request->ConnID;
-	segId			= request->SegmentID;
-	segmentPid		= request->ProcID;
-
-	elog(DEBUG1, "Resource enforcer moves QE out from CGroup: "
-	             "masterStartTime = %s, connId = %d, segId = %d, procId = %d",
-	             timestamptz_to_str(masterStartTime),
-	             connId, segmentPid, segmentPid);
-
-	response.Result = FUNC_RETURN_OK;
-	response.Reserved = 0;
-
-	buildResponseIntoConnTrack(conntrack,
-	                           (char *)&response,
-	                           sizeof(response),
-	                           conntrack->MessageMark1,
-	                           conntrack->MessageMark2,
-	                           RESPONSE_QE_MOVEOUTCGROUP);
-
-	conntrack->ResponseSent = false;
-	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
-	MEMORY_CONTEXT_SWITCH_BACK
-
-	/* Prepare cgroupName from masterStartTime, connId and segId information */
-	char *cgroupName = buildCGroupNameString(masterStartTime, connId);
-
-    /* Move Out CGroup in thread */
-    ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)
-    								   malloc(sizeof(ResourceEnforcementRequest));
-    task->type = MOVEOUT;
-    task->pid = segmentPid;
-    memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
-    strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
-    enqueue(g_queue_cgroup, (void *)task);
-
-	rm_pfree(PCONTEXT, cgroupName);
-
-	return true;
-}
-
-/**
- * Handle QE SetWeightGroup function call.
- */
-bool handleQESetWeightCGroup(void **arg)
-{
-	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
-	Assert(conntrack != NULL);
-
-	TimestampTz			masterStartTime;
-	uint32_t			connId;
-	int					segId;
-	int					segmentPid;
-	double					weight;
-
-	RPCRequestSetWeightCGroup request = (RPCRequestSetWeightCGroup)
-									    (conntrack->MessageBuff.Buffer);
-	masterStartTime = request->MasterStartTime;
-	connId			= request->ConnID;
-	segId			= request->SegmentID;
-	segmentPid		= request->ProcID;
-	weight			= request->Weight;
-
-	elog(DEBUG1, "Resource enforcer sets weight for QE in CGroup: "
-	             "masterStartTime = %s, connId = %d, segId = %d, "
-	             "procId = %d, weight = %lf",
-	             timestamptz_to_str(masterStartTime),
-	             connId, segmentPid, segmentPid, weight);
-
-	/* Prepare cgroupName from masterStartTime, connId and segId information */
-	char *cgroupName = buildCGroupNameString(masterStartTime, connId);
-
-	/* Build request instance and add request into the queue. */
-	ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)
-	                                   malloc(sizeof(ResourceEnforcementRequest));
-
-	if ( task == NULL ) {
-		elog(ERROR, "Resource enforcer fails to malloc "
-		            "resource enforcement request instance");
-	}
-	task->type = SETWEIGHT;
-	task->pid = segmentPid;
-	memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
-	strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
-	task->query_resource.vcore = weight;
-	if (enqueue(g_queue_cgroup, (void *)task) == -1 ) {
-		elog(ERROR, "Resource enforcer fails to add "
-		            "resource enforcement request into task queue");
-	}
-
-	RPCResponseSetWeightCGroupData	response;
-	response.Result   = FUNC_RETURN_OK;
-	response.Reserved = 0;
-
-	buildResponseIntoConnTrack(conntrack,
-	                           (char *)&response,
-	                           sizeof(response),
-	                           conntrack->MessageMark1,
-	                           conntrack->MessageMark2,
-	                           RESPONSE_QE_SETWEIGHTCGROUP);
-
-	conntrack->ResponseSent = false;
-	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
-	MEMORY_CONTEXT_SWITCH_BACK
-
-	rm_pfree(PCONTEXT, cgroupName);
-
-	return true;
-}
-
-
 /**
  * Handle IncreaseMemQuota request from resource manager server
  */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c b/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c
new file mode 100644
index 0000000..81900e8
--- /dev/null
+++ b/src/backend/resourcemanager/requesthandler_RMSEG_CGroup.c
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "envswitch.h"
+#include "dynrm.h"
+#include "time.h"
+
+#include "resourcemanager/resourcemanager.h"
+#include "communication/rmcomm_RMSEG2RM.h"
+#include "communication/rmcomm_RM2RMSEG.h"
+#include "communication/rmcomm_MessageHandler.h"
+#include "communication/rmcomm_QE_RMSEG_Protocol.h"
+#include "communication/rmcomm_RM_RMSEG_Protocol.h"
+#include "utils/simplestring.h"
+#include "utils/linkedlist.h"
+
+#include "utils/palloc.h"
+
+#include "gp-libpq-fe.h"
+#include "gp-libpq-int.h"
+
+#include "resourceenforcer/resourceenforcer_message.h"
+
+/* UINT32_T.MAX_VALUE = 4294967295 */
+#define MAX_DIGITS_OF_UINT32_T 10
+
+/* Format of cgroup timestamp: YYYYMMDD_HHMMSS */
+#define LENTH_CGROUP_TIMESTAMP 15
+
+char *buildCGroupNameString(int64 masterStartTime, uint32 connId);
+
+
+char *buildCGroupNameString(int64 masterStartTime, uint32 connId)
+{
+	int tLength = strlen(timestamptz_to_str(masterStartTime));
+	char *tTime = (char *)rm_palloc0(PCONTEXT, tLength+1);
+	strcpy(tTime, timestamptz_to_str(masterStartTime));
+
+	char *sTime = (char *)rm_palloc0(PCONTEXT, LENTH_CGROUP_TIMESTAMP+1);
+	sprintf(sTime, "%.4s%.2s%.2s_%.2s%.2s%.2s",
+			   	   tTime,
+			   	   tTime + 5,
+			   	   tTime + 8,
+			   	   tTime + 11,
+			   	   tTime + 14,
+			   	   tTime + 17);
+
+	/* CGroup name follows the format: hawq-YYMMDD_HHMMSS-connCONNECTIONID */
+	char *cgroupName = (char*)rm_palloc0(PCONTEXT,
+									  	 sizeof("hawq-") -1 + strlen(sTime) +
+									  	 sizeof("-conn") -1 + MAX_DIGITS_OF_UINT32_T + 1);
+	sprintf(cgroupName, "hawq-%s-conn%d", sTime, connId);
+
+	rm_pfree(PCONTEXT, sTime);
+	rm_pfree(PCONTEXT, tTime);
+
+	return cgroupName;
+}
+/**
+ * Handle QE MoveToGroup function call.
+ */
+bool handleQEMoveToCGroup(void **arg)
+{
+	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
+	Assert(conntrack != NULL);
+
+	RPCResponseMoveToCGroupData	response;
+
+	int					segmentPid;
+	TimestampTz			masterStartTime;
+	uint32_t			connId;
+	int					segId;
+
+	RPCRequestMoveToCGroup request = (RPCRequestMoveToCGroup)
+									 (conntrack->MessageBuff.Buffer);
+	masterStartTime = request->MasterStartTime;
+	connId			= request->ConnID;
+	segId			= request->SegmentID;
+	segmentPid		= request->ProcID;
+
+	elog(DEBUG1, "Resource enforcer moves QE to CGroup: "
+	             "masterStartTime = %s, connId = %d, segId = %d, procId = %d",
+	             timestamptz_to_str(masterStartTime),
+	             connId, segmentPid, segmentPid);
+
+	response.Result = FUNC_RETURN_OK;
+	response.Reserved = 0;
+
+	buildResponseIntoConnTrack(conntrack,
+	                           (char *)&response,
+	                           sizeof(response),
+	                           conntrack->MessageMark1,
+	                           conntrack->MessageMark2,
+	                           RESPONSE_QE_MOVETOCGROUP);
+
+	conntrack->ResponseSent = false;
+	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
+	MEMORY_CONTEXT_SWITCH_BACK
+
+	/* Prepare cgroupName from masterStartTime, connId and segId information */
+	char *cgroupName = buildCGroupNameString(masterStartTime, connId);
+
+	/* Move To CGroup in thread */
+	ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)malloc(sizeof(ResourceEnforcementRequest));
+	task->type = MOVETO;
+	task->pid = segmentPid;
+	memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
+	strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
+	enqueue(g_queue_cgroup, (void *)task);
+
+	rm_pfree(PCONTEXT, cgroupName);
+
+	return true;
+}
+
+/**
+ * Handle QE MoveOutGroup function call.
+ */
+bool handleQEMoveOutCGroup(void **arg)
+{
+	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
+	Assert(conntrack != NULL);
+
+	RPCResponseMoveOutCGroupData response;
+
+	int				segmentPid;
+	TimestampTz		masterStartTime;
+	uint32_t		connId;
+	int				segId;
+
+	RPCRequestMoveOutCGroup request = (RPCRequestMoveOutCGroup)
+									  (conntrack->MessageBuff.Buffer);
+	masterStartTime = request->MasterStartTime;
+	connId			= request->ConnID;
+	segId			= request->SegmentID;
+	segmentPid		= request->ProcID;
+
+	elog(DEBUG1, "Resource enforcer moves QE out from CGroup: "
+	             "masterStartTime = %s, connId = %d, segId = %d, procId = %d",
+	             timestamptz_to_str(masterStartTime),
+	             connId, segmentPid, segmentPid);
+
+	response.Result = FUNC_RETURN_OK;
+	response.Reserved = 0;
+
+	buildResponseIntoConnTrack(conntrack,
+	                           (char *)&response,
+	                           sizeof(response),
+	                           conntrack->MessageMark1,
+	                           conntrack->MessageMark2,
+	                           RESPONSE_QE_MOVEOUTCGROUP);
+
+	conntrack->ResponseSent = false;
+	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
+	MEMORY_CONTEXT_SWITCH_BACK
+
+	/* Prepare cgroupName from masterStartTime, connId and segId information */
+	char *cgroupName = buildCGroupNameString(masterStartTime, connId);
+
+    /* Move Out CGroup in thread */
+    ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)
+    								   malloc(sizeof(ResourceEnforcementRequest));
+    task->type = MOVEOUT;
+    task->pid = segmentPid;
+    memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
+    strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
+    enqueue(g_queue_cgroup, (void *)task);
+
+	rm_pfree(PCONTEXT, cgroupName);
+
+	return true;
+}
+
+/**
+ * Handle QE SetWeightGroup function call.
+ */
+bool handleQESetWeightCGroup(void **arg)
+{
+	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
+	Assert(conntrack != NULL);
+
+	TimestampTz			masterStartTime;
+	uint32_t			connId;
+	int					segId;
+	int					segmentPid;
+	double					weight;
+
+	RPCRequestSetWeightCGroup request = (RPCRequestSetWeightCGroup)
+									    (conntrack->MessageBuff.Buffer);
+	masterStartTime = request->MasterStartTime;
+	connId			= request->ConnID;
+	segId			= request->SegmentID;
+	segmentPid		= request->ProcID;
+	weight			= request->Weight;
+
+	elog(DEBUG1, "Resource enforcer sets weight for QE in CGroup: "
+	             "masterStartTime = %s, connId = %d, segId = %d, "
+	             "procId = %d, weight = %lf",
+	             timestamptz_to_str(masterStartTime),
+	             connId, segmentPid, segmentPid, weight);
+
+	/* Prepare cgroupName from masterStartTime, connId and segId information */
+	char *cgroupName = buildCGroupNameString(masterStartTime, connId);
+
+	/* Build request instance and add request into the queue. */
+	ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)
+	                                   malloc(sizeof(ResourceEnforcementRequest));
+
+	if ( task == NULL ) {
+		elog(ERROR, "Resource enforcer fails to malloc "
+		            "resource enforcement request instance");
+	}
+	task->type = SETWEIGHT;
+	task->pid = segmentPid;
+	memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
+	strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
+	task->query_resource.vcore = weight;
+	if (enqueue(g_queue_cgroup, (void *)task) == -1 ) {
+		elog(ERROR, "Resource enforcer fails to add "
+		            "resource enforcement request into task queue");
+	}
+
+	RPCResponseSetWeightCGroupData	response;
+	response.Result   = FUNC_RETURN_OK;
+	response.Reserved = 0;
+
+	buildResponseIntoConnTrack(conntrack,
+	                           (char *)&response,
+	                           sizeof(response),
+	                           conntrack->MessageMark1,
+	                           conntrack->MessageMark2,
+	                           RESPONSE_QE_SETWEIGHTCGROUP);
+
+	conntrack->ResponseSent = false;
+	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
+	MEMORY_CONTEXT_SWITCH_BACK
+
+	rm_pfree(PCONTEXT, cgroupName);
+
+	return true;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 6175cf2..8fae27b 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -453,7 +453,6 @@ int ResManagerMainServer2ndPhase(void)
 	registerMessageHandler(REQUEST_RM_IMALIVE               , handleRMSEGRequestIMAlive);
 	registerMessageHandler(REQUEST_QD_DDL_MANIPULATERESQUEUE, handleRMDDLRequestManipulateResourceQueue);
 	registerMessageHandler(REQUEST_QD_DDL_MANIPULATEROLE	, handleRMDDLRequestManipulateRole);
-	registerMessageHandler(REQUEST_RM_TMPDIR				, handleRMSEGRequestTmpDir);
 	registerMessageHandler(REQUEST_QD_ACQUIRE_RESOURCE_QUOTA, handleRMRequestAcquireResourceQuota);
 	registerMessageHandler(REQUEST_QD_REFRESH_RESOURCE      , handleRMRequestRefreshResource);
 	registerMessageHandler(REQUEST_QD_SEGMENT_ISDOWN        , handleRMRequestSegmentIsDown);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c273195/src/backend/resourcemanager/resourcemanager_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager_RMSEG.c b/src/backend/resourcemanager/resourcemanager_RMSEG.c
index ca43f87..d95aad4 100644
--- a/src/backend/resourcemanager/resourcemanager_RMSEG.c
+++ b/src/backend/resourcemanager/resourcemanager_RMSEG.c
@@ -40,7 +40,6 @@ int ResManagerMainSegment2ndPhase(void)
 	registerMessageHandler(REQUEST_QE_SETWEIGHTCGROUP, handleQESetWeightCGroup);
 	registerMessageHandler(REQUEST_RM_INCREASE_MEMORY_QUOTA, handleRMIncreaseMemoryQuota);
 	registerMessageHandler(REQUEST_RM_DECREASE_MEMORY_QUOTA, handleRMDecreaseMemoryQuota);
-	registerMessageHandler(REQUEST_RM_TMPDIR, handleRMSEGRequestTmpDir);
 	registerMessageHandler(REQUEST_RM_RUALIVE, handleRMSEGRequestRUAlive);
 
 
@@ -49,8 +48,7 @@ int ResManagerMainSegment2ndPhase(void)
 	 **************************************************************************/
 	res = initializeSocketServer_RMSEG();
 	if ( res != FUNC_RETURN_OK ) {
-		elog(LOG, "Fail to initialize socket server. Segment sleeps for ever.");
-		MainHandler_RMSEGDummyLoop();
+		elog(FATAL, "Fail to initialize socket server.");
 	}
 
 	/*
@@ -222,17 +220,6 @@ int MainHandlerLoop_RMSEG(void)
 	return res;
 }
 
-int MainHandler_RMSEGDummyLoop(void)
-{
-	while( DRMGlobalInstance->ResManagerMainKeepRun ) {
-		sleep(1000000);
-	}
-
-	elog(RMLOG, "Dummy resource manager main event handler exits.");
-
-	return FUNC_RETURN_OK;
-}
-
 /*
  *  Check if this temporary directory is OK to read or write.
  *  If not, it's probably due to disk error.


Mime
View raw message