hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject [2/3] incubator-hawq git commit: HAWQ-234. Improve HAWQ resource manager resource allocation algorithm and RPC framework
Date Thu, 17 Dec 2015 02:20:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index c0eae8d..f2e8c93 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -63,18 +63,22 @@ extern char *UnixSocketDir;		  /* Reference from global configure.         */
  */
 bool handleRMRequestConnectionReg(void **arg)
 {
-	ConnectionTrack conntrack = (ConnectionTrack )(*arg);
-	RPCResponseHeadRegisterConnectionInRMByStrData response;
+	static char 			errorbuf[ERRORMESSAGE_SIZE];
+	int						res			= FUNC_RETURN_OK;
+	ConnectionTrack 		conntrack	= (ConnectionTrack )(*arg);
+	SelfMaintainBufferData 	responsedata;
+	RPCResponseRegisterConnectionInRMByStrData response;
+
+	initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
 
 	/* Parse request. */
 	strncpy(conntrack->UserID,
-			conntrack->MessageBuff.Buffer,
+			SMBUFF_CONTENT(&(conntrack->MessageBuff)),
 			sizeof(conntrack->UserID)-1);
 	conntrack->UserID[sizeof(conntrack->UserID)-1] = '\0';
 
-
 	/* Handle the request. */
-	int res = registerConnectionByUserID(conntrack);
+	res = registerConnectionByUserID(conntrack, errorbuf, sizeof(errorbuf));
 	if ( res == FUNC_RETURN_OK )
 	{
 		/* Allocate connection id and track this connection. */
@@ -82,37 +86,49 @@ bool handleRMRequestConnectionReg(void **arg)
 		if ( res == FUNC_RETURN_OK )
 		{
 			trackConnectionTrack(conntrack);
-			elog(DEBUG5, "Resource manager tracked connection, ID=%d, Progress=%d.",
-					     conntrack->ConnID,
-					     conntrack->Progress);
+			elog(LOG, "ConnID %d. Resource manager tracked connection.",
+					  conntrack->ConnID);
 			response.Result = FUNC_RETURN_OK;
 			response.ConnID = conntrack->ConnID;
 		}
 		else
 		{
+			Assert( res == CONNTRACK_CONNID_FULL );
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "cannot accept more resource context instance");
+			elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
 			/* No connection id resource. Return occupation in resource queue. */
 			returnConnectionToQueue(conntrack, false);
-			elog(LOG, "Resource manager can not accept more connections.");
 			response.Result = res;
 			response.ConnID = INVALID_CONNID;
 		}
 	}
 	else
 	{
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
 		response.Result = res;
 		response.ConnID = INVALID_CONNID;
 	}
 
-	buildResponseIntoConnTrack( conntrack,
-				   	   	   	    (char *)&response,
-								sizeof(response),
-								conntrack->MessageMark1,
-								conntrack->MessageMark2,
-								RESPONSE_QD_CONNECTION_REG);
+	/* Build response. */
+	appendSMBVar(&responsedata, response);
+	if ( response.Result != FUNC_RETURN_OK )
+	{
+		appendSMBStr(&responsedata, errorbuf);
+		appendSelfMaintainBufferTill64bitAligned(&responsedata);
+	}
+
+	buildResponseIntoConnTrack(conntrack,
+				   	   	   	   SMBUFF_CONTENT(&responsedata),
+							   getSMBContentSize(&responsedata),
+							   conntrack->MessageMark1,
+							   conntrack->MessageMark2,
+							   RESPONSE_QD_CONNECTION_REG);
 
-	elog(DEBUG3, "One connection is registered. ConnID=%d, result=%d\n",
-				conntrack->ConnID,
-			    response.Result);
+	destroySelfMaintainBuffer(&responsedata);
+	elog(DEBUG3, "ConnID %d. One connection register result %d.",
+				 conntrack->ConnID,
+				 response.Result);
 
 	conntrack->ResponseSent = false;
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
@@ -127,70 +143,87 @@ bool handleRMRequestConnectionReg(void **arg)
  */
 bool handleRMRequestConnectionRegByOID(void **arg)
 {
-	int		   		res 	  = FUNC_RETURN_OK;
-	ConnectionTrack conntrack = (ConnectionTrack )(*arg);
-	bool			exist	  = false;
+	static char 		   errorbuf[ERRORMESSAGE_SIZE];
+	int		   			   res			= FUNC_RETURN_OK;
+	ConnectionTrack 	   conntrack	= (ConnectionTrack )(*arg);
+	bool				   exist		= false;
+	SelfMaintainBufferData responsedata;
+	RPCResponseRegisterConnectionInRMByOIDData response;
+
+	initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
 
-	RPCResponseHeadRegisterConnectionInRMByOIDData response;
-	RPCRequestHeadRegisterConnectionInRMByOID request =
-		(RPCRequestHeadRegisterConnectionInRMByOID)
-		(conntrack->MessageBuff.Buffer);
+	RPCRequestRegisterConnectionInRMByOID request =
+		SMBUFF_HEAD(RPCRequestRegisterConnectionInRMByOID,
+					&(conntrack->MessageBuff));
 
 	/* Get user name from oid. */
 	UserInfo reguser = getUserByUserOID(request->UseridOid, &exist);
-	if ( !exist ) {
-		elog(LOG, "User oid "INT64_FORMAT" is not found. Temporarily set to "
-				  "defaultuser.",
-				  request->UseridOid);
-		memcpy(conntrack->UserID, "defaultuser", sizeof("defaultuser"));
+	if ( !exist )
+	{
+		res = RESQUEMGR_NO_USERID;
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "user oid " INT64_FORMAT "does not exist",
+				 request->UseridOid);
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
+		goto exit;
 	}
-	else {
-		elog(DEBUG5, "User %s with oid "INT64_FORMAT" is found for registering.",
-				     reguser->Name,
-					 request->UseridOid);
-
+	else
+	{
 		/* Set user id string into connection track. */
 		strncpy(conntrack->UserID, reguser->Name, sizeof(conntrack->UserID)-1);
 	}
 
 	/* Handle the request. */
-	res = registerConnectionByUserID(conntrack);
+	res = registerConnectionByUserID(conntrack, errorbuf, sizeof(errorbuf));
 
-	if ( res == FUNC_RETURN_OK ) {
+	if ( res == FUNC_RETURN_OK )
+	{
 		/* Allocate connection id and track this connection. */
 		res = useConnectionID(&(conntrack->ConnID));
-		if ( res == FUNC_RETURN_OK ) {
+		if ( res == FUNC_RETURN_OK )
+		{
 			trackConnectionTrack(conntrack);
-			elog(DEBUG5, "Resource manager tracked connection, ID=%d, Progress=%d.",
-					     conntrack->ConnID,
-					     conntrack->Progress);
+			elog(LOG, "ConnID %d. Resource manager tracked connection.",
+					  conntrack->ConnID);
 			response.Result = FUNC_RETURN_OK;
 			response.ConnID = conntrack->ConnID;
 		}
 		else {
+			Assert( res == CONNTRACK_CONNID_FULL );
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "cannot accept more resource context instance");
+			elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
 			/* No connection id resource. Return occupation in resource queue. */
 			returnConnectionToQueue(conntrack, false);
-			elog(LOG, "Resource manager can not accept more connections.");
 			response.Result = res;
 			response.ConnID = INVALID_CONNID;
 		}
 	}
 	else {
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
 		response.Result = res;
 		response.ConnID = INVALID_CONNID;
 	}
 
+exit:
 	/* Build message saved in the connection track instance. */
-	buildResponseIntoConnTrack( conntrack,
-				   	   	   	    (char *)&response,
-								sizeof(response),
-								conntrack->MessageMark1,
-								conntrack->MessageMark2,
-								RESPONSE_QD_CONNECTION_REG_OID);
+	appendSMBVar(&responsedata, response);
+	if ( response.Result != FUNC_RETURN_OK )
+	{
+		appendSMBStr(&responsedata, errorbuf);
+		appendSelfMaintainBufferTill64bitAligned(&responsedata);
+	}
 
-	elog(DEBUG3, "One connection is registered through OID. ConnID=%d, result=%d\n",
+	buildResponseIntoConnTrack(conntrack,
+				   	   	   	   SMBUFF_CONTENT(&responsedata),
+							   getSMBContentSize(&responsedata),
+							   conntrack->MessageMark1,
+							   conntrack->MessageMark2,
+							   RESPONSE_QD_CONNECTION_REG_OID);
+	destroySelfMaintainBuffer(&responsedata);
+	elog(DEBUG3, "ConnID %d. One connection register result %d (OID).",
 				 conntrack->ConnID,
-			     response.Result);
+				 response.Result);
 
 	conntrack->ResponseSent = false;
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
@@ -205,62 +238,77 @@ bool handleRMRequestConnectionRegByOID(void **arg)
  */
 bool handleRMRequestConnectionUnReg(void **arg)
 {
+	static char 	 errorbuf[ERRORMESSAGE_SIZE];
 	int      		 res	 	= FUNC_RETURN_OK;
 	ConnectionTrack *conntrack 	= (ConnectionTrack *)arg;
 
 	RPCRequestHeadUnregisterConnectionInRM request =
-		(RPCRequestHeadUnregisterConnectionInRM)((*conntrack)->MessageBuff.Buffer);
-	elog(DEBUG5, "HAWQ RM :: Connection id %d try to unregister.",
-				 request->ConnID);
+		SMBUFF_HEAD(RPCRequestHeadUnregisterConnectionInRM,
+					&((*conntrack)->MessageBuff));
+
+	elog(DEBUG3, "ConnID %d. Try to unregister.", request->ConnID);
 
 	if ( (*conntrack)->ConnID == INVALID_CONNID )
 	{
 		res = retrieveConnectionTrack((*conntrack), request->ConnID);
 		if ( res != FUNC_RETURN_OK )
 		{
-			elog(LOG, "Not valid resource context with id %d.", request->ConnID);
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "the resource context is invalid or timed out");
+			elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
 			goto sendresponse;
 		}
-		elog(DEBUG5, "HAWQ RM :: Fetched existing connection track "
-					 "ID=%d, Progress=%d.",
+		elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
 					 (*conntrack)->ConnID,
 					 (*conntrack)->Progress);
 	}
 
 	/* Get connection ID. */
-	request = (RPCRequestHeadUnregisterConnectionInRM)
-			  ((*conntrack)->MessageBuff.Buffer);
-
-	elog(DEBUG5, "HAWQ RM :: Connection id %d unregisters connection.",
-				 request->ConnID);
+	request = SMBUFF_HEAD(RPCRequestHeadUnregisterConnectionInRM,
+			  	  	  	  &((*conntrack)->MessageBuff));
 
 	if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) )
 	{
-		elog(DEBUG5, "HAWQ RM :: Wrong connection status for unregistering. "
-					"Current connection status is %d.",
-					(*conntrack)->Progress);
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "wrong resource context status for unregistering, %d",
+				 (*conntrack)->Progress);
+
+		elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
 		res = REQUESTHANDLER_WRONG_CONNSTAT;
 		goto sendresponse;
 	}
 
 	returnConnectionToQueue(*conntrack, false);
 
-	elog(DEBUG3, "One connection is unregistered. ConnID=%d", (*conntrack)->ConnID);
+	elog(LOG, "ConnID %d. Connection is unregistered.", (*conntrack)->ConnID);
 
 sendresponse:
 	{
-		RPCResponseHeadUnregisterConnectionInRMData response;
+		SelfMaintainBufferData responsedata;
+		initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+
+		RPCResponseUnregisterConnectionInRMData response;
 		response.Result   = res;
 		response.Reserved = 0;
+		appendSMBVar(&responsedata, response);
+
+		if ( response.Result != FUNC_RETURN_OK )
+		{
+			appendSMBStr(&responsedata, errorbuf);
+			appendSelfMaintainBufferTill64bitAligned(&responsedata);
+		}
+
 		/* Build message saved in the connection track instance. */
 		buildResponseIntoConnTrack((*conntrack),
-							 	   (char *)&response,
-								   sizeof(response),
+							 	   SMBUFF_CONTENT(&responsedata),
+								   getSMBContentSize(&responsedata),
 								   (*conntrack)->MessageMark1,
 								   (*conntrack)->MessageMark2,
 								   RESPONSE_QD_CONNECTION_UNREG);
+		destroySelfMaintainBuffer(&responsedata);
 
-		if ( res == CONNTRACK_NO_CONNID ) {
+		if ( res == CONNTRACK_NO_CONNID )
+		{
 			transformConnectionTrackProgress((*conntrack),
 											 CONN_PP_TRANSFORM_ERROR);
 		}
@@ -269,7 +317,6 @@ sendresponse:
 		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 		PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
 		MEMORY_CONTEXT_SWITCH_BACK
-
 	}
 
 	return true;
@@ -277,63 +324,18 @@ sendresponse:
 
 /**
  * Handle the request of ACQUIRE QUERY RESOURCE
- *
- * The previous connection status can be:
- * 		CONN_REGISTER_DONE 				: Register is done.
- *
- *		RESOURCE_QUEUE_ALLOC_WAIT		: Request is already submitted, skip
- *										  current content and keep retrieved
- *										  request.
- *
- *		RESOURCE_QUEUE_ALLOC_DONE		: Request is processed,but the response
- *										  is not sent due to some errors. Keep
- *										  retrieved request and allocation. Send
- *										  the response directly.
- *
- * Request format:
- *		uint32_t 	conn_id
- *		uint32_t 	scansizemb
- *		uint32_t 	preferred node count ( can be 0 )
- *		uint32_t 	seg count fix
- *		int64_t*n 	node scan size
- *		char	 	hostnames splited by '\0'
- *		uint8_t		pad if hostnames are not 64bit aligned.
- *
- * Response format:
- * 		No response if everything goes well. The request is only submitted to
- * 		resource queues to wait for the resource.
- *
- * 		If some error rises, this function build response as below.
- *
- * 		uint32_t error code
- * 		uint32_t reserved.
- *
- * Return:
- * 		FUNC_RETURN_OK		Succeed submitted the allocation request.
- * 		CONNTRACK_NO_CONNID Wrong connection track id.
- *
  **/
 bool handleRMRequestAcquireResource(void **arg)
 {
-	/*
-	 * If the request is received before refreshing resource queue capacity,
-	 * temporarily ignore this request and let framework through back to the
-	 * request list.
-	 */
-	if ( PQUEMGR->RootTrack != NULL &&
-		 PQUEMGR->RootTrack->ClusterSegNumberMax == 0 )
-	{
-		return false;
-	}
-
-	int      		 res		= FUNC_RETURN_OK;
-	ConnectionTrack *conntrack  = (ConnectionTrack *)arg;
+	static char		 errorbuf[ERRORMESSAGE_SIZE];
+	int				 res		= FUNC_RETURN_OK;
+	ConnectionTrack *conntrack	= (ConnectionTrack *)arg;
 
 	RPCRequestHeadAcquireResourceFromRM request =
-		(RPCRequestHeadAcquireResourceFromRM)((*conntrack)->MessageBuff.Buffer);
+		SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
+					&((*conntrack)->MessageBuff));
 
-	elog(DEBUG5, "HAWQ RM :: Connection id %d acquires query resource. "
-				 "Session id "INT64_FORMAT,
+	elog(DEBUG3, "ConnID %d. Acquires query resource. Session id "INT64_FORMAT,
 				 request->ConnID,
 				 request->SessionID);
 
@@ -342,53 +344,61 @@ bool handleRMRequestAcquireResource(void **arg)
 		res = retrieveConnectionTrack((*conntrack), request->ConnID);
 		if ( res != FUNC_RETURN_OK )
 		{
-			elog(LOG, "Not valid resource context with id %d.", request->ConnID);
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "the resource context may be timed out");
+			elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
 			goto sendresponse;
 		}
-		elog(DEBUG5, "HAWQ RM :: Fetched existing connection track "
-					 "ID=%d, Progress=%d.",
+		elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
 					 (*conntrack)->ConnID,
 					 (*conntrack)->Progress);
 	}
 
-	request = (RPCRequestHeadAcquireResourceFromRM)((*conntrack)->MessageBuff.Buffer);
-	if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
-	{
-		elog(DEBUG5, "HAWQ RM :: The connection track already has allocated "
-					 "resource. Send again. ConnID=%d",
-					 request->ConnID);
-		goto sendagain;
-	}
-	else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
-	{
-		elog(DEBUG5, "HAWQ RM :: The connection track already accepted "
-					 "acquire resource request. Ignore. ConnID=%d",
-					 request->ConnID);
-		goto sendignore;
-	}
-	else if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE )
+	request = SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
+						  &((*conntrack)->MessageBuff));
+	if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE )
 	{
-		elog(DEBUG5, "HAWQ RM :: Wrong connection status for acquiring resource. "
-					 "Current connection status is %d.",
-					 (*conntrack)->Progress);
 		res = REQUESTHANDLER_WRONG_CONNSTAT;
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "the resource context status is invalid");
+		elog(WARNING, "ConnID %d. %s", (*conntrack)->ConnID, errorbuf);
 		goto sendresponse;
 	}
 
-	/* Check if HAWQ has enough alive segments. */
+	/*--------------------------------------------------------------------------
+	 * We firstly check if the cluster has too many unavailable segments, which
+	 * is measured by rm_rejectrequest_nseg_limit. The expected cluster size is
+	 * loaded from couting hosts in $GPHOME/etc/slaves. Resource manager rejects
+	 * query  resource requests at once if currently there are more than
+	 * rm_rejectrequest_nseg_limit segments unavailable.
+	 *
+	 * NOTE: If the number of hosts in $GPHOME/etc/slaves is not greater than
+	 * 		 rm_rejectrequest_nseg_limit, this guc value is treated as 0, i.e.
+	 * 		 all the segments must be available before accepting query resource
+	 * 		 requests.
+	 *--------------------------------------------------------------------------
+	 */
+	Assert(PRESPOOL->SlavesHostCount > 0);
+	int rejectlimit = PRESPOOL->SlavesHostCount <= rm_rejectrequest_nseg_limit ?
+					  0 :
+					  rm_rejectrequest_nseg_limit;
+
 	int unavailcount = PRESPOOL->SlavesHostCount - PRESPOOL->AvailNodeCount;
-	if ( unavailcount > rm_rejectrequest_nseg_limit )
+	if ( unavailcount > rejectlimit )
 	{
-		elog(WARNING, "Resource manager finds %d segments not available yet, all "
-					  "resource allocation requests are rejected.",
-					  unavailcount);
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "%d of %d segments %s unavailable",
+				 unavailcount,
+				 PRESPOOL->SlavesHostCount,
+				 unavailcount == 1 ? "is" : "are");
+		elog(WARNING, "ConnID %d. %s", (*conntrack)->ConnID, errorbuf);
 		res = RESOURCEPOOL_TOO_MANY_UAVAILABLE_HOST;
 		goto sendresponse;
 	}
 
 	/* Get scan size. */
-	request = (RPCRequestHeadAcquireResourceFromRM)
-			  ((*conntrack)->MessageBuff.Buffer);
+	request = SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
+			  	  	  	  &((*conntrack)->MessageBuff));
 
 	(*conntrack)->SliceSize				= request->SliceSize;
 	(*conntrack)->IOBytes				= request->IOBytes;
@@ -404,75 +414,72 @@ bool handleRMRequestAcquireResource(void **arg)
 	/* Get preferred nodes. */
 	buildSegPreferredHostInfo((*conntrack));
 
-	elog(DEBUG3, "Expect resource. ConnID=%d, Scanning "INT64_FORMAT" io bytes "
-				 "by %d slices with %d preferred segments. Each segment has "
-				 "maximum %d vseg, query has maximum %d vseg.",
-				 (*conntrack)->ConnID,
-				 (*conntrack)->IOBytes,
-				 (*conntrack)->SliceSize,
-				 (*conntrack)->SegPreferredHostCount,
-				 (*conntrack)->VSegLimitPerSeg,
-				 (*conntrack)->VSegLimit);
+	elog(RMLOG, "ConnID %d. Session ID " INT64_FORMAT
+				"Scanning "INT64_FORMAT" io bytes "
+				"by %d slices with %d preferred segments. "
+				"Expect %d vseg (MIN %d). "
+				"Each segment has maximum %d vseg. "
+				"Query has maximum %d vseg. "
+				"Statement quota %d MB x %d vseg",
+				(*conntrack)->ConnID,
+				(*conntrack)->SessionID,
+				(*conntrack)->IOBytes,
+				(*conntrack)->SliceSize,
+				(*conntrack)->SegPreferredHostCount,
+				(*conntrack)->MaxSegCountFixed,
+				(*conntrack)->MinSegCountFixed,
+				(*conntrack)->VSegLimitPerSeg,
+				(*conntrack)->VSegLimit,
+				(*conntrack)->StatVSegMemoryMB,
+				(*conntrack)->StatNVSeg);
 
 	if ( (*conntrack)->StatNVSeg > 0 )
 	{
-		elog(LOG, "Statement level resource quota is active. "
-				  "ConnID=%d, Expect resource. "
-				  "Total %d vsegs, each vseg has %d MB memory quota.",
+		elog(LOG, "ConnID %d. Statement level resource quota is active. "
+				  "Expect resource ( %d MB ) x %d.",
 				  (*conntrack)->ConnID,
-				  (*conntrack)->StatNVSeg,
-				  (*conntrack)->StatVSegMemoryMB);
+				  (*conntrack)->StatVSegMemoryMB,
+				  (*conntrack)->StatNVSeg);
 	}
 
-	res = acquireResourceFromResQueMgr((*conntrack));
+	res = acquireResourceFromResQueMgr((*conntrack), errorbuf, sizeof(errorbuf));
 	if ( res != FUNC_RETURN_OK )
 	{
 		goto sendresponse;
 	}
 	(*conntrack)->ResRequestTime = gettime_microsec();
 	(*conntrack)->LastActTime    = (*conntrack)->ResRequestTime;
-sendignore:
-	return true;  /* No need to send response now. The resource will be dispatched
-					 when dispatching resource for each resource queue. */
+
+	return true;
+
 sendresponse:
 	{
-		RPCResponseAcquireResourceFromRMERRORData errresponse;
 		/* Send error message. */
+		RPCResponseAcquireResourceFromRMERRORData errresponse;
 		errresponse.Result   = res;
 		errresponse.Reserved = 0;
 
-		buildResponseIntoConnTrack( (*conntrack),
-									(char *)&errresponse,
-									sizeof(errresponse),
-									(*conntrack)->MessageMark1,
-									(*conntrack)->MessageMark2,
-									RESPONSE_QD_ACQUIRE_RESOURCE);
+		SelfMaintainBufferData responsedata;
+		initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+		appendSMBVar(&responsedata, errresponse);
+		appendSMBStr(&responsedata, errorbuf);
+		appendSelfMaintainBufferTill64bitAligned(&responsedata);
 
-		if ( res == CONNTRACK_NO_CONNID )
-		{
-			transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
-		}
+		buildResponseIntoConnTrack((*conntrack),
+								   SMBUFF_CONTENT(&responsedata),
+								   getSMBContentSize(&responsedata),
+								   (*conntrack)->MessageMark1,
+								   (*conntrack)->MessageMark2,
+								   RESPONSE_QD_ACQUIRE_RESOURCE);
+		destroySelfMaintainBuffer(&responsedata);
 
-		(*conntrack)->ResponseSent = false;
-		{
-			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-			PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
-			MEMORY_CONTEXT_SWITCH_BACK
-		}
-		return true;
-	}
+		transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
 
-sendagain:
-	/* Let resource allocation response be sent again. */
-	buildAcquireResourceResponseMessage((*conntrack));
-	(*conntrack)->ResponseSent = false;
-	{
+		(*conntrack)->ResponseSent = false;
 		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 		PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
 		MEMORY_CONTEXT_SWITCH_BACK
 	}
-	/* TODO: A problem here... The message buffer has been refreshed to a request,
-	 *  	 the response must be built again before sending out. */
 	return true;
 }
 
@@ -483,69 +490,71 @@ sendagain:
  */
 bool handleRMRequestReturnResource(void **arg)
 {
+	static char		errorbuf[ERRORMESSAGE_SIZE];
 	int      		res		    = FUNC_RETURN_OK;
 	ConnectionTrack *conntrack  = (ConnectionTrack *)arg;
 
 	RPCRequestHeadReturnResource request =
-		(RPCRequestHeadReturnResource)((*conntrack)->MessageBuff.Buffer);
+		SMBUFF_HEAD(RPCRequestHeadReturnResource, &((*conntrack)->MessageBuff));
 
-	elog(DEBUG5, "HAWQ RM :: Connection id %d returns query resource.",
-				 request->ConnID);
+	elog(DEBUG3, "ConnID %d. Returns query resource.", request->ConnID);
 
 	if ( (*conntrack)->ConnID == INVALID_CONNID )
 	{
 		res = retrieveConnectionTrack((*conntrack), request->ConnID);
 		if ( res != FUNC_RETURN_OK )
 		{
-			elog(WARNING, "Not valid resource context with id %d.", request->ConnID);
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "the resource context may be timed out");
+			elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
 			goto sendresponse;
 		}
-		elog(DEBUG5, "HAWQ RM :: Fetched existing connection track "
-					 "ID=%d, Progress=%d.",
+		elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
 					 (*conntrack)->ConnID,
 					 (*conntrack)->Progress);
 	}
 
 	/* Get connection ID. */
-	request = (RPCRequestHeadReturnResource)
-			  ((*conntrack)->MessageBuff.Buffer);
-	elog(DEBUG5, "HAWQ RM :: Connection id %d returns query resource.",
-				 request->ConnID);
+	request = SMBUFF_HEAD(RPCRequestHeadReturnResource,
+						  &((*conntrack)->MessageBuff));
 
-	if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE )
-	{
-		elog(DEBUG5, "HAWQ RM :: The resource has been returned or has not been "
-					 "acquired.");
-		goto sendresponse;
-	}
-	else if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
+	if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
 	{
-		elog(DEBUG5, "HAWQ RM :: Wrong connection status for acquiring resource. "
-					 "Current connection status is %d.",
-					 (*conntrack)->Progress);
 		res = REQUESTHANDLER_WRONG_CONNSTAT;
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "the resource context status is invalid");
+		elog(WARNING, "ConnID %d. %s", (*conntrack)->ConnID, errorbuf);
 		goto sendresponse;
 	}
 
 	/* Return the resource. */
 	returnResourceToResQueMgr(*conntrack);
 
-	elog(DEBUG3, "Return resource. ConnID=%d", (*conntrack)->ConnID);
+	elog(LOG, "ConnID %d. Returned resource.", (*conntrack)->ConnID);
 
 sendresponse:
 	{
-		elog(DEBUG3, "Return resource result %d.", res);
+		SelfMaintainBufferData responsedata;
+		initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
 
 		RPCResponseHeadReturnResourceData response;
 		response.Result   = res;
 		response.Reserved = 0;
-		buildResponseIntoConnTrack( (*conntrack),
-	                                (char *)&response,
-									sizeof(response),
-									(*conntrack)->MessageMark1,
-									(*conntrack)->MessageMark2,
-									RESPONSE_QD_RETURN_RESOURCE );
+		appendSMBVar(&responsedata, response);
 
+		if ( response.Result != FUNC_RETURN_OK )
+		{
+			appendSMBStr(&responsedata, errorbuf);
+			appendSelfMaintainBufferTill64bitAligned(&responsedata);
+		}
+
+		buildResponseIntoConnTrack((*conntrack),
+	                               SMBUFF_CONTENT(&responsedata),
+								   getSMBContentSize(&responsedata),
+								   (*conntrack)->MessageMark1,
+								   (*conntrack)->MessageMark2,
+								   RESPONSE_QD_RETURN_RESOURCE );
+		destroySelfMaintainBuffer(&responsedata);
 		if ( res == CONNTRACK_NO_CONNID )
 		{
 			transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
@@ -556,7 +565,6 @@ sendresponse:
 		PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
 		MEMORY_CONTEXT_SWITCH_BACK
 	}
-
 	return true;
 }
 
@@ -566,24 +574,23 @@ sendresponse:
 bool handleRMSEGRequestIMAlive(void **arg)
 {
 	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
-	elog(DEBUG5, "Resource manager receives segment heart-beat information.");
+	elog(RMLOG, "Resource manager receives segment heart-beat information.");
 
 	SelfMaintainBufferData machinereport;
 	initializeSelfMaintainBuffer(&machinereport,PCONTEXT);
-	SegStat segstat = (SegStat)(conntrack->MessageBuff.Buffer +
+	SegStat segstat = (SegStat)(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
   	  	    					sizeof(RPCRequestHeadIMAliveData));
 	generateSegStatReport(segstat, &machinereport);
 
-	elog(DEBUG3, "Resource manager received segment machine information. %s",
-				 machinereport.Buffer);
-
+	elog(RMLOG, "Resource manager received segment machine information, %s",
+				SMBUFF_CONTENT(&machinereport));
 	destroySelfMaintainBuffer(&machinereport);
 
 	/* Get hostname and ip address from the connection's sockaddr */
 	char*    		fts_client_ip     = NULL;
 	uint32_t 		fts_client_ip_len = 0;
-	struct in_addr 	fts_client_addr;
 	struct hostent* fts_client_host   = NULL;
+	struct in_addr 	fts_client_addr;
 
 	fts_client_ip = conntrack->ClientAddrDotStr;
 	fts_client_ip_len = strlen(fts_client_ip);
@@ -592,7 +599,7 @@ bool handleRMSEGRequestIMAlive(void **arg)
 	Assert(fts_client_host != NULL);
 
 	/* Get the received machine id instance start address. */
-	SegInfo fts_client_seginfo = (SegInfo)(conntrack->MessageBuff.Buffer +
+	SegInfo fts_client_seginfo = (SegInfo)(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
 										   sizeof(RPCRequestHeadIMAliveData) +
 										   offsetof(SegStatData, Info));
 
@@ -602,7 +609,8 @@ bool handleRMSEGRequestIMAlive(void **arg)
 
 	/* Copy machine id header. */
 	prepareSelfMaintainBuffer(&newseginfo, sizeof(SegInfoData), false);
-	memcpy(newseginfo.Buffer, fts_client_seginfo, sizeof(SegInfoData));
+	memcpy(SMBUFF_CONTENT(&(newseginfo)),
+		   fts_client_seginfo, sizeof(SegInfoData));
 	jumpforwardSelfMaintainBuffer(&newseginfo, sizeof(SegInfoData));
 
 	/* Put client ip address's offset and attribute */
@@ -612,19 +620,19 @@ bool handleRMSEGRequestIMAlive(void **arg)
 
 	uint16_t addrattr = HOST_ADDRESS_CONTENT_STRING;
 
-	SegInfo newseginfoptr = (SegInfo)(newseginfo.Buffer);
+	SegInfo newseginfoptr = SMBUFF_HEAD(SegInfo, &newseginfo);
 	newseginfoptr->AddressAttributeOffset = sizeof(SegInfoData);
-	newseginfoptr->AddressContentOffset = addroffset;
-	newseginfoptr->HostAddrCount = fts_client_seginfo->HostAddrCount + 1;
+	newseginfoptr->AddressContentOffset   = addroffset;
+	newseginfoptr->HostAddrCount 		  = fts_client_seginfo->HostAddrCount + 1;
 
 	uint32_t addContentOffset= addroffset;
 
 	appendSMBVar(&newseginfo,addroffset);
 	appendSMBVar(&newseginfo,addrattr);
 
-	elog(DEBUG5, "Resource manager received IMAlive message, this segment's IP "
-				 "address count: %d\n",
-				 fts_client_seginfo->HostAddrCount);
+	elog(RMLOG, "Resource manager received IMAlive message, this segment's IP "
+				"address count: %d",
+				fts_client_seginfo->HostAddrCount);
 
 	/* iterate all the offset/attribute in machineIdData from client */
 	for (int i = 0; i < fts_client_seginfo->HostAddrCount; i++) {
@@ -662,9 +670,9 @@ bool handleRMSEGRequestIMAlive(void **arg)
 	appendSMBStr(&newseginfo,fts_client_ip);
 	appendSelfMaintainBufferTill64bitAligned(&newseginfo);
 
-	elog(DEBUG5, "Resource manager received IMAlive message, "
-				 "this segment's IP address: %s\n",
-				 fts_client_ip);
+	elog(RMLOG, "Resource manager received IMAlive message, "
+				"this segment's IP address: %s\n",
+				fts_client_ip);
 
 	/* Put other ip addresses' content directly. */
 	appendSelfMaintainBuffer(&newseginfo,
@@ -674,35 +682,35 @@ bool handleRMSEGRequestIMAlive(void **arg)
 							 fts_client_seginfo->AddressContentOffset);
 
 	/* fill in hostname */
-	newseginfoptr = (SegInfo)(newseginfo.Buffer);
-	newseginfoptr->HostNameOffset = newseginfo.Cursor+1;
+	newseginfoptr = SMBUFF_HEAD(SegInfo, &(newseginfo));
+	newseginfoptr->HostNameOffset = getSMBContentSize(&newseginfo);
 	appendSMBStr(&newseginfo,fts_client_host->h_name);
 	appendSelfMaintainBufferTill64bitAligned(&newseginfo);
 
-	newseginfoptr = (SegInfo)(newseginfo.Buffer);
+	newseginfoptr = SMBUFF_HEAD(SegInfo, &(newseginfo));
 	newseginfoptr->HostNameLen = strlen(fts_client_host->h_name);
-	newseginfoptr->Size      = newseginfo.Cursor+1;
+	newseginfoptr->Size      = getSMBContentSize(&newseginfo);
 	/* reported by segment, set GRM host/rack as NULL */
 	newseginfoptr->GRMHostNameLen = 0;
 	newseginfoptr->GRMHostNameOffset = 0;
 	newseginfoptr->GRMRackNameLen = 0;
 	newseginfoptr->GRMRackNameOffset = 0;
 
-	elog(DEBUG5, "Resource manager received IMAlive message, "
-				 "this segment's hostname: %s\n",
-				 fts_client_host->h_name);
+	elog(RMLOG, "Resource manager received IMAlive message, "
+				"this segment's hostname: %s\n",
+				fts_client_host->h_name);
 
 	/* build segment status information instance and add to resource pool */
 	SegStat newsegstat = (SegStat)
 						 rm_palloc0(PCONTEXT,
 								  	offsetof(SegStatData, Info) +
-									newseginfo.Cursor + 1);
+									getSMBContentSize(&newseginfo));
 	/* Copy old segment status information. */
 	memcpy((char *)newsegstat, segstat, offsetof(SegStatData, Info));
 	/* Copy new segment info information. */
 	memcpy((char *)newsegstat + offsetof(SegStatData, Info),
-		   newseginfo.Buffer,
-		   newseginfo.Cursor+1);
+		   SMBUFF_CONTENT(&newseginfo),
+		   getSMBContentSize(&newseginfo));
 
 	destroySelfMaintainBuffer(&newseginfo);
 
@@ -710,7 +718,8 @@ bool handleRMSEGRequestIMAlive(void **arg)
 	newsegstat->GRMAvailable 	= RESOURCE_SEG_STATUS_UNSET;
 	newsegstat->FTSAvailable 	= RESOURCE_SEG_STATUS_AVAILABLE;
 
-	if ( addHAWQSegWithSegStat(newsegstat) != FUNC_RETURN_OK ) {
+	if ( addHAWQSegWithSegStat(newsegstat) != FUNC_RETURN_OK )
+	{
 		/* Should be a duplciate host. */
 		rm_pfree(PCONTEXT, newsegstat);
 	}
@@ -745,45 +754,35 @@ bool handleRMSEGRequestIMAlive(void **arg)
 
 bool handleRMRequestAcquireResourceQuota(void **arg)
 {
-
-	/*
-	 * If the request is received before refreshing resource queue capacity,
-	 * temporarily ignore this request and let framework through back to the
-	 * request list.
-	 */
-	if ( PQUEMGR->RootTrack != NULL &&
-		 PQUEMGR->RootTrack->ClusterSegNumberMax == 0 ) {
-		return false;
-	}
-
+	static char		 errorbuf[ERRORMESSAGE_SIZE];
 	int      		 res		= FUNC_RETURN_OK;
 	ConnectionTrack  conntrack  = (ConnectionTrack)(*arg);
 	bool			 exist		= false;
 
 	RPCRequestHeadAcquireResourceQuotaFromRMByOID request =
-		(RPCRequestHeadAcquireResourceQuotaFromRMByOID)
-		(conntrack->MessageBuff.Buffer);
+		SMBUFF_HEAD(RPCRequestHeadAcquireResourceQuotaFromRMByOID,
+					&(conntrack->MessageBuff));
 
-	elog(DEBUG5, "HAWQ RM :: User "INT64_FORMAT" acquires query resource quota "
-				 "with %d split to process. Fixed segment number %d.",
-				 request->UseridOid,
-				 request->MaxSegCountFix,
-				 request->MinSegCountFix);
+	elog(LOG, "ConnID %d. User "INT64_FORMAT" acquires query resource quota "
+			  "with expected %d vseg (MIN %d).",
+			  conntrack->ConnID,
+			  request->UseridOid,
+			  request->MaxSegCountFix,
+			  request->MinSegCountFix);
 
 	/* Get user name from oid. */
 	UserInfo reguser = getUserByUserOID(request->UseridOid, &exist);
-	if ( !exist ) {
-		elog(LOG, "User oid "INT64_FORMAT" is not found. Temporarily set to "
-				  "defaultuser.",
-				  request->UseridOid);
-		memcpy(conntrack->UserID, "defaultuser", sizeof("defaultuser"));
+	if ( !exist )
+	{
+		res = RESQUEMGR_NO_USERID;
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "user oid " INT64_FORMAT "does not exist",
+				 request->UseridOid);
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
+		goto errorexit;
 	}
-	else {
-		elog(DEBUG5, "User %s with oid "INT64_FORMAT" is found for acquiring "
-					 "resource.",
-				     reguser->Name,
-					 request->UseridOid);
-
+	else
+	{
 		/* Set user id string into connection track. */
 		strncpy(conntrack->UserID, reguser->Name, sizeof(conntrack->UserID)-1);
 	}
@@ -793,14 +792,16 @@ bool handleRMRequestAcquireResourceQuota(void **arg)
 	conntrack->VSegLimitPerSeg	= request->VSegLimitPerSeg;
 	conntrack->VSegLimit		= request->VSegLimit;
 
-	res = acquireResourceQuotaFromResQueMgr(conntrack);
+	res = acquireResourceQuotaFromResQueMgr(conntrack, errorbuf, sizeof(errorbuf));
 
-	if ( res == FUNC_RETURN_OK ) {
+	if ( res == FUNC_RETURN_OK )
+	{
 		RPCResponseHeadAcquireResourceQuotaFromRMByOIDData response;
 
 		DynResourceQueueTrack queuetrack =
 			getQueueTrackByQueueOID(reguser->QueueOID, &exist);
-		if ( exist ) {
+		if ( exist )
+		{
 			memcpy(response.QueueName,
 				   queuetrack->QueueInfo->Name,
 				   sizeof(response.QueueName));
@@ -823,22 +824,36 @@ bool handleRMRequestAcquireResourceQuota(void **arg)
 									conntrack->MessageMark1,
 									conntrack->MessageMark2,
 									RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA);
+		conntrack->ResponseSent = false;
+		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+		PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
+		MEMORY_CONTEXT_SWITCH_BACK
+		return true;
 	}
-	else {
-		RPCResponseHeadAcquireResourceQuotaFromRMByOIDERRORData errresponse;
-		errresponse.Result   = res;
-		errresponse.Reserved = 0;
-		buildResponseIntoConnTrack( conntrack,
-									(char *)&errresponse,
-									sizeof(errresponse),
-									conntrack->MessageMark1,
-									conntrack->MessageMark2,
-									RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA);
+	else
+	{
+		elog(WARNING, "%s", errorbuf);
+	}
+errorexit:
+	{
+		SelfMaintainBufferData responsedata;
+		initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+
+		RPCResponseHeadAcquireResourceQuotaFromRMByOIDERRORData response;
+		response.Result   = res;
+		response.Reserved = 0;
+		buildResponseIntoConnTrack(conntrack,
+								   SMBUFF_CONTENT(&responsedata),
+								   getSMBContentSize(&responsedata),
+								   conntrack->MessageMark1,
+								   conntrack->MessageMark2,
+								   RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA);
+		destroySelfMaintainBuffer(&responsedata);
+		conntrack->ResponseSent = false;
+		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+		PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
+		MEMORY_CONTEXT_SWITCH_BACK
 	}
-	conntrack->ResponseSent = false;
-	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
-	MEMORY_CONTEXT_SWITCH_BACK
 	return true;
 }
 
@@ -850,11 +865,11 @@ bool handleRMRequestRefreshResource(void **arg)
 	uint64_t		 curmsec    = gettime_microsec();
 
 	RPCRequestHeadRefreshResourceHeartBeat request =
-		(RPCRequestHeadRefreshResourceHeartBeat)
-		(conntrack->MessageBuff.Buffer);
+		SMBUFF_HEAD(RPCRequestHeadRefreshResourceHeartBeat,
+					&(conntrack->MessageBuff));
 
 	uint32_t *connids = (uint32_t *)
-						(conntrack->MessageBuff.Buffer +
+						(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
 						 sizeof(RPCRequestHeadRefreshResourceHeartBeatData));
 
 	elog(DEBUG3, "Resource manager refreshes %d ConnIDs.", request->ConnIDCount);
@@ -866,12 +881,13 @@ bool handleRMRequestRefreshResource(void **arg)
 		if ( res == FUNC_RETURN_OK )
 		{
 			oldct->LastActTime = curmsec;
-			elog(DEBUG3, "Refreshed resource of connection id %d", connids[i]);
+			elog(RMLOG, "Refreshed resource context connection id %d", connids[i]);
 		}
 		else
 		{
-			elog(DEBUG3, "Can not find connection id %d for resource refreshing.",
-					     connids[i]);
+			elog(WARNING, "Can not find resource context connection id %d for "
+						  "resource refreshing.",
+					      connids[i]);
 		}
 	}
 
@@ -890,7 +906,6 @@ bool handleRMRequestRefreshResource(void **arg)
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
 	MEMORY_CONTEXT_SWITCH_BACK
-
 	return true;
 }
 
@@ -899,67 +914,75 @@ bool handleRMRequestSegmentIsDown(void **arg)
 	int      		 res		 = FUNC_RETURN_OK;
 	ConnectionTrack  conntrack   = (ConnectionTrack)(*arg);
 	/* Get host name that is down. */
-	char 			*hostname    = conntrack->MessageBuff.Buffer;
+	char 			*hostname    = SMBUFF_CONTENT(&(conntrack->MessageBuff));
 	int				 hostnamelen = 0;
 	int32_t		 	 segid       = SEGSTAT_ID_INVALID;
 
-	while( (hostname - conntrack->MessageBuff.Buffer <= conntrack->MessageBuff.Cursor) &&
-		   *hostname != '\0' ) {
-
+	while( (hostname - SMBUFF_CONTENT(&(conntrack->MessageBuff)) <
+			getSMBContentSize(&(conntrack->MessageBuff))) &&
+		   *hostname != '\0' )
+	{
 		hostnamelen = strlen(hostname);
 		res = getSegIDByHostName(hostname, hostnamelen, &segid);
-		if ( res == FUNC_RETURN_OK )  {
-
+		if ( res == FUNC_RETURN_OK )
+		{
 			/* Get resourceinfo of the expected host. */
 			SegResource segres = getSegResource(segid);
 			Assert( segres != NULL );
 
-			if ( !IS_SEGSTAT_FTSAVAILABLE(segres->Stat) ) {
-				elog(LOG, "Resource manager does not probe the status of host %s "
-						  "because it is down already.",
-						  hostname);
+			if ( !IS_SEGSTAT_FTSAVAILABLE(segres->Stat) )
+			{
+				elog(WARNING, "Resource manager does not probe the status of "
+							  "host %s because it is down already.",
+							  hostname);
 			}
-			else if ( segres->RUAlivePending ) {
+			else if ( segres->RUAlivePending )
+			{
 				elog(LOG, "Resource manager does not probe the status of host %s "
 						  "because it is in RUAlive pending status already.",
 						  hostname);
 			}
-			else {
-
-				elog(DEBUG3, "Resource manager probes the status of host %s by "
-						 	 "sending RUAlive request.",
-							 hostname);
+			else
+			{
+				elog(RMLOG, "Resource manager probes the status of host %s by "
+						 	"sending RUAlive request.",
+							hostname);
 
 		        res = sendRUAlive(hostname);
 		        /* IN THIS CASE, the segment is considered as down. */
-		        if (res != FUNC_RETURN_OK) {
-
-		        	/*
+		        if (res != FUNC_RETURN_OK)
+		        {
+		        	/*----------------------------------------------------------
 		        	 * This call makes resource manager able to adjust queue and
 		        	 * mem/core trackers' capacity.
+		        	 *----------------------------------------------------------
 		        	 */
-		        	setSegResHAWQAvailability(segres, RESOURCE_SEG_STATUS_UNAVAILABLE);
+		        	setSegResHAWQAvailability(segres,
+		        							  RESOURCE_SEG_STATUS_UNAVAILABLE);
 
 		        	/* Make resource pool remove unused containers */
 		        	returnAllGRMResourceFromSegment(segres);
 		        	/* Set the host down in gp_segment_configuration table */
-		        	update_segment_status(segres->Stat->ID + REGISTRATION_ORDER_OFFSET, SEGMENT_STATUS_DOWN);
+		        	update_segment_status(segres->Stat->ID + REGISTRATION_ORDER_OFFSET,
+		        						  SEGMENT_STATUS_DOWN);
 		        	/* Set the host down. */
 		        	elog(LOG, "Resource manager sets host %s from up to down "
 		        			  "due to not reaching host.", hostname);
 		        }
-		        else {
-		        	elog(DEBUG3, "Resource manager triggers RUAlive request to "
-		        				 "host %s.",
-								 hostname);
+		        else
+		        {
+		        	elog(RMLOG, "Resource manager triggered RUAlive request to "
+		        				"host %s.",
+								hostname);
 		        }
 			}
 		}
 		else {
-			elog(LOG, "Resource manager cannot find host %s to check status, "
-					  "skip it.",
-					  hostname);
+			elog(WARNING, "Resource manager cannot find host %s to check status, "
+					  	  "skip it.",
+						  hostname);
 		}
+
 		hostname = hostname + strlen(hostname) + 1; /* Try next */
 	}
 
@@ -984,16 +1007,26 @@ bool handleRMRequestSegmentIsDown(void **arg)
 
 bool handleRMRequestTmpDir(void **arg)
 {
-	int      		 res		= FUNC_RETURN_OK;
-    ConnectionTrack conntrack   = (ConnectionTrack)(*arg);
+	static char errorbuf[ERRORMESSAGE_SIZE];
+    ConnectionTrack conntrack = (ConnectionTrack)(*arg);
 
     RPCResponseTmpDirForQDData response;
 
     if (DRMGlobalInstance->NextLocalHostTempDirIdx < 0) 
     {
-        response.Result       = RM_STATUS_BAD_TMPDIR;
-        response.tmpdir[0]    = '\0';
-        response.Reserved     = 0;
+    	SelfMaintainBufferData responsedata;
+    	initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+
+        response.Result	   = RM_STATUS_BAD_TMPDIR;
+        response.tmpdir[0] = '\0';
+        response.Reserved  = 0;
+
+        snprintf(errorbuf, sizeof(errorbuf),
+        		 "no available temporary directory in resource manager");
+
+        appendSMBVar(&responsedata, response);
+        appendSMBVar(&responsedata, errorbuf);
+        appendSelfMaintainBufferTill64bitAligned(&responsedata);
 
         buildResponseIntoConnTrack(conntrack,
                                    (char *)&response,
@@ -1001,17 +1034,18 @@ bool handleRMRequestTmpDir(void **arg)
                                    conntrack->MessageMark1,
                                    conntrack->MessageMark2,
                                    RESPONSE_QD_TMPDIR);
-
-        elog(LOG, "handleRMRequestTmpDir, no existing tmp dirs in the "
-                     "master resource manager");
+        elog(WARNING, "%s", errorbuf);
+        destroySelfMaintainBuffer(&responsedata);
     }
     else
     {
-        response.Result       = res;
-        response.Reserved     = 0;
+        response.Result	  = FUNC_RETURN_OK;
+        response.Reserved = 0;
         
-        SimpStringPtr tmpdir = (SimpStringPtr)
-        getDQueueNodeDataByIndex(&DRMGlobalInstance->LocalHostTempDirectoriesForQD, DRMGlobalInstance->NextLocalHostTempDirIdxForQD);
+        SimpStringPtr tmpdir =
+        	(SimpStringPtr)getDQueueNodeDataByIndex(
+        					   &DRMGlobalInstance->LocalHostTempDirectoriesForQD,
+							   DRMGlobalInstance->NextLocalHostTempDirIdxForQD);
 
         DRMGlobalInstance->NextLocalHostTempDirIdxForQD =
                     (DRMGlobalInstance->NextLocalHostTempDirIdxForQD + 1) %
@@ -1027,7 +1061,8 @@ bool handleRMRequestTmpDir(void **arg)
                                    conntrack->MessageMark2,
                                    RESPONSE_QD_TMPDIR);
         
-        elog(LOG, "handleRMRequestTmpDir, get temporary directory:%s from master resource manager", tmpdir->Str);
+        elog(LOG, "Resource manager assigned temporary directory %s",
+        		  tmpdir->Str);
     }
 
     conntrack->ResponseSent = false;
@@ -1047,7 +1082,7 @@ bool handleRMRequestDumpResQueueStatus(void **arg)
 
     response = (RPCResponseResQueueStatus)palloc(responseLen);
 
-    response->Result 	= 0;
+    response->Result 	= FUNC_RETURN_OK;
     response->queuenum 	= list_length(PQUEMGR->Queues);
 
     int i = 0;
@@ -1057,14 +1092,14 @@ bool handleRMRequestDumpResQueueStatus(void **arg)
     	DynResourceQueueTrack quetrack = lfirst(cell);
 
         sprintf(response->queuedata[i].name, "%s", quetrack->QueueInfo->Name);
-        response->queuedata[i].segmem = quetrack->QueueInfo->SegResourceQuotaMemoryMB;
-        response->queuedata[i].segcore = quetrack->QueueInfo->SegResourceQuotaVCore;
-        response->queuedata[i].segsize = quetrack->ClusterSegNumber;
+        response->queuedata[i].segmem     = quetrack->QueueInfo->SegResourceQuotaMemoryMB;
+        response->queuedata[i].segcore    = quetrack->QueueInfo->SegResourceQuotaVCore;
+        response->queuedata[i].segsize    = quetrack->ClusterSegNumber;
         response->queuedata[i].segsizemax = quetrack->ClusterSegNumberMax;
-        response->queuedata[i].inusemem = quetrack->TotalUsed.MemoryMB;
-        response->queuedata[i].inusecore = quetrack->TotalUsed.Core;
-        response->queuedata[i].holders = quetrack->NumOfRunningQueries;
-        response->queuedata[i].waiters = quetrack->QueryResRequests.NodeCount;
+        response->queuedata[i].inusemem   = quetrack->TotalUsed.MemoryMB;
+        response->queuedata[i].inusecore  = quetrack->TotalUsed.Core;
+        response->queuedata[i].holders    = quetrack->NumOfRunningQueries;
+        response->queuedata[i].waiters    = quetrack->QueryResRequests.NodeCount;
 
         /* Generate if resource queue paused dispatching resource. */
         if ( quetrack->troubledByFragment )
@@ -1090,58 +1125,70 @@ bool handleRMRequestDumpResQueueStatus(void **arg)
     }
     
     buildResponseIntoConnTrack(conntrack,
-                                   (char *)response,
-                                   responseLen,
-                                   conntrack->MessageMark1,
-                                   conntrack->MessageMark2,
-                                   RESPONSE_QD_DUMP_RESQUEUE_STATUS);
-
+                               (char *)response,
+                               responseLen,
+                               conntrack->MessageMark1,
+                               conntrack->MessageMark2,
+                               RESPONSE_QD_DUMP_RESQUEUE_STATUS);
     conntrack->ResponseSent = false;
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
 	MEMORY_CONTEXT_SWITCH_BACK
-
-    
     return true;
 }
 
 bool handleRMRequestDumpStatus(void **arg)
 {
+	static char errorbuf[ERRORMESSAGE_SIZE];
     ConnectionTrack conntrack   = (ConnectionTrack)(*arg);
     RPCResponseDumpStatusData response;
 
-    uint32_t type = *((uint32_t *)(conntrack->MessageBuff.Buffer + 0));
-    char *dump_file  = (char  *)(conntrack->MessageBuff.Buffer + 8);
+    RPCRequestDumpStatus request = SMBUFF_HEAD(RPCRequestDumpStatus,
+    										   &(conntrack->MessageBuff));
 
-    elog(LOG, "handleRMRequestDumpStatus type:%u dump_file:%s", type, dump_file);
+    elog(DEBUG3, "Resource manager dump type %u data to file %s",
+    			 request->type,
+				 request->dump_file);
 
-    switch (type)
+    response.Result   = FUNC_RETURN_OK;
+    response.Reserved = 0;
+    switch (request->type)
     {
     case 1:
-        dumpConnectionTracks(dump_file);
-        response.Result       = FUNC_RETURN_OK;
+        dumpConnectionTracks(request->dump_file);
         break;
     case 2:
-        dumpResourceQueueStatus(dump_file);
-        response.Result       = FUNC_RETURN_OK;
+        dumpResourceQueueStatus(request->dump_file);
         break;
     case 3:
-        dumpResourcePoolHosts(dump_file);
-        response.Result       = FUNC_RETURN_OK;
+        dumpResourcePoolHosts(request->dump_file);
         break;
     default:
-        response.Result       = RM_STATUS_BAD_DUMP_TYPE;
+        response.Result = RM_STATUS_BAD_DUMP_TYPE;
+        snprintf(errorbuf, sizeof(errorbuf),
+        		 "wrong dump type index %d",
+				 request->type);
+        elog(WARNING, "%s", errorbuf);
         break;
     }
 
-    response.Reserved     = 0;
+    SelfMaintainBufferData responsedata;
+    initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+
+    appendSMBVar(&responsedata, response);
+    if ( response.Result != FUNC_RETURN_OK )
+    {
+    	appendSMBVar(&responsedata, errorbuf);
+    	appendSelfMaintainBufferTill64bitAligned(&responsedata);
+    }
 
     buildResponseIntoConnTrack(conntrack,
-                                   (char *)&response,
-                                   sizeof(response),
-                                   conntrack->MessageMark1,
-                                   conntrack->MessageMark2,
-                                   RESPONSE_QD_DUMP_STATUS);
+                               SMBUFF_CONTENT(&responsedata),
+							   getSMBContentSize(&responsedata),
+							   conntrack->MessageMark1,
+                               conntrack->MessageMark2,
+                               RESPONSE_QD_DUMP_STATUS);
+    destroySelfMaintainBuffer(&responsedata);
 
     conntrack->ResponseSent = false;
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
@@ -1174,7 +1221,8 @@ bool handleRMRequestDummy(void **arg)
 bool handleRMRequestQuotaControl(void **arg)
 {
 	ConnectionTrack conntrack = (ConnectionTrack)(*arg);
-	RPCRequestQuotaControl request = (RPCRequestQuotaControl)(conntrack->MessageBuff.Buffer);
+	RPCRequestQuotaControl request =
+		SMBUFF_HEAD(RPCRequestQuotaControl, &(conntrack->MessageBuff));
 	Assert(request->Phase >= 0 && request->Phase < QUOTA_PHASE_COUNT);
 	bool oldvalue = PRESPOOL->pausePhase[request->Phase];
 	PRESPOOL->pausePhase[request->Phase] = request->Pause;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
index 419c356..f5932ff 100644
--- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
+++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c
@@ -678,6 +678,9 @@ int handleRB2RM_ClusterReport(void)
 	 */
 	returnAllGRMResourceFromGRMUnavailableSegments();
 
+	/* Refresh available node count. */
+	refreshAvailableNodeCount();
+
 	/* Update GRM resource queue capacity. */
 	PQUEMGR->GRMQueueCapacity	 	= response.QueueCapacity;
 	PQUEMGR->GRMQueueCurCapacity 	= response.QueueCurCapacity;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index ff3748f..c1e39a7 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -504,6 +504,9 @@ int ResManagerMainServer2ndPhase(void)
 	/******* TILL NOW, resource manager starts providing services *******/
 	elog(LOG, "HAWQ RM process works now.");
 
+	/* Check slaves file firstly to ensure we have expected cluster size. */
+	checkSlavesFile();
+
     /* Start request handler to provide services. */
     res = MainHandlerLoop();
     /* res is returned to the caller. */
@@ -2802,6 +2805,7 @@ int  loadHostInformationIntoResourcePool(void)
 
 extern Datum dump_resource_manager_status(PG_FUNCTION_ARGS)
 {
+	static char errorbuf[ERRORMESSAGE_SIZE];
     int type = PG_GETARG_INT32(0);
     char message[1024] = {0};
     char dump_file[1024] = {0};
@@ -2831,7 +2835,7 @@ extern Datum dump_resource_manager_status(PG_FUNCTION_ARGS)
         PG_RETURN_TEXT_P(cstring_to_text(message));    
     }
 
-    dumpResourceManagerStatus(type, dump_file); 
+    dumpResourceManagerStatus(type, dump_file, errorbuf, sizeof(errorbuf));
 
     PG_RETURN_TEXT_P(cstring_to_text(message));    
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 931b80f..28e874d 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -1133,7 +1133,10 @@ int setSegResHAWQAvailability( SegResource segres, uint8_t newstatus)
 							  segres->Stat->GRMTotalCore);
 
 		addNewResourceToResourceManagerByBundle(&(segres->Allocated));
-		PRESPOOL->AvailNodeCount++;
+		if ( DRMGlobalInstance->ImpType == NONE_HAWQ2 )
+		{
+			PRESPOOL->AvailNodeCount++;
+		}
 	}
 
 	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
@@ -1153,7 +1156,7 @@ int setSegResHAWQAvailability( SegResource segres, uint8_t newstatus)
 
 int setSegResGLOBAvailability( SegResource segres, uint8_t newstatus)
 {
-	return setSegStatGLOBAvailability(segres->Stat, newstatus);
+	int res = setSegStatGLOBAvailability(segres->Stat, newstatus);
 }
 
 /* Generate HAWQ host report. */
@@ -2174,9 +2177,6 @@ int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
 	int32_t			segid		  		= SEGSTAT_ID_INVALID;
 	GRMContainerSet containerset  		= NULL;
 	int				nodecountleft 		= nodecount;
-	int				impossiblecount   	= 0;
-	bool			skipchosenmachine 	= true;
-	int 			fullcount 			= nodetree->NodeIndex->NodeCount;
 	int 			clustersize 		= PRESPOOL->AvailNodeCount;
 	/* This hash saves all selected hosts containing at least one segment.    */
 	HASHTABLEData	vsegcnttbl;
@@ -4110,6 +4110,30 @@ void refreshSlavesFileHostSize(FILE *fp)
 
 }
 
+void refreshAvailableNodeCount(void)
+{
+	List 	 *allsegs 	= NULL;
+	ListCell *cell 		= NULL;
+	getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegs);
+
+	int oldcount = PRESPOOL->AvailNodeCount;
+	PRESPOOL->AvailNodeCount = 0;
+	foreach(cell, allsegs)
+	{
+		PAIR pair = (PAIR)lfirst(cell);
+		SegResource segres = (SegResource)(pair->Value);
+		Assert( segres != NULL );
+
+		if ( IS_SEGSTAT_FTSAVAILABLE(segres->Stat) &&
+			 IS_SEGSTAT_GRMAVAILABLE(segres->Stat) )
+		{
+			PRESPOOL->AvailNodeCount++;
+		}
+	}
+
+	freePAIRRefList(&(PRESPOOL->Segments), &allsegs);
+}
+
 void getSegResResourceCountersByMemCoreCounters(SegResource  resinfo,
 												int32_t		*allocmem,
 												double		*alloccore,


Mime
View raw message