hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject [1/3] incubator-hawq git commit: HAWQ-234. Improve HAWQ resource manager resource allocation algorithm and RPC framework
Date Thu, 17 Dec 2015 02:20:34 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master cc7ab4b29 -> 3e46d3d9b


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index ad408ad..772af47 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -99,17 +99,16 @@ int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
 typedef int  (* computeQueryQuotaByPolicy )(DynResourceQueueTrack,
 											int32_t *,
 											int32_t *,
-											int32_t);
+											int32_t,
+											char *,
+											int);
 
-int computeQueryQuota_EVEN( DynResourceQueueTrack	track,
-							int32_t			   	   *segnum,
-							int32_t			   	   *segnummin,
-							int32_t					segnumlimit);
-
-int computeQueryQuota_FIFO( DynResourceQueueTrack	track,
-							int32_t			   	   *segnum,
-							int32_t			   	   *segnummin,
-							int32_t					segnumlimit);
+int computeQueryQuota_EVEN(DynResourceQueueTrack	track,
+						   int32_t			   	   *segnum,
+						   int32_t			   	   *segnummin,
+						   int32_t					segnumlimit,
+						   char				   	   *errorbuf,
+						   int						errorbufsize);
 
 int32_t min(int32_t a, int32_t b);
 int32_t max(int32_t a, int32_t b);
@@ -117,7 +116,7 @@ computeQueryQuotaByPolicy AllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
 	computeQueryQuota_EVEN
 };
 
-int computeQueryQuota( ConnectionTrack conn);
+int computeQueryQuota(ConnectionTrack conn, char *errorbuf, int errorbufsize);
 
 /*------------------------------------------
  * The resource distribution functions.
@@ -165,7 +164,8 @@ void markMemoryCoreRatioWaterMark(DQueue 		marks,
 								  double 		core);
 
 void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack,
-										  uint32_t 		  reason);
+										  uint32_t 		  reason,
+										  char			 *errorbuf);
 
 RESOURCEPROBLEM isResourceAcceptable(ConnectionTrack conn, int segnumact);
 
@@ -2053,7 +2053,9 @@ void generateUserReport( const char   *userid,
  * NOTE: In order to facilitate test automation, currently all undefined users
  * 	 	 are assigned to 'default' queue.
  */
-int registerConnectionByUserID( ConnectionTrack			 conntrack)
+int registerConnectionByUserID(ConnectionTrack  conntrack,
+							   char			   *errorbuf,
+							   int				errorbufsize)
 {
 	int 					res 		  = FUNC_RETURN_OK;
 	UserInfo 				userinfo 	  = NULL;
@@ -2076,29 +2078,24 @@ int registerConnectionByUserID( ConnectionTrack			 conntrack)
 	}
 	else
 	{
-		elog(LOG, "No user %s defined for registering connection.",
-				  conntrack->UserID);
+		snprintf(errorbuf, errorbufsize,
+				 "role %s is not defined for registering connection",
+				 conntrack->UserID);
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
 		res = RESQUEMGR_NO_USERID;
 		goto exit;
 	}
 
 	if ( queuetrack == NULL )
 	{
-		elog(LOG, "Resource manager fails to find target resource queue for user %s.",
-				  conntrack->UserID);
+		snprintf(errorbuf, errorbufsize,
+				 "no resource queue assigned for role %s",
+				 conntrack->UserID);
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
 		res = RESQUEMGR_NO_ASSIGNEDQUEUE;
 		goto exit;
 	}
 
-//	/* Acquire new connection. */
-//	if (queuetrack->CurConnCounter >= queuetrack->QueueInfo->ParallelCount) {
-//		res = RESQUEMGR_PARALLEL_FULL;
-//		elog(DEBUG5, "Queue %s is full connected with %d connections.",
-//				     queuetrack->QueueInfo->Name,
-//				     queuetrack->CurConnCounter);
-//		goto exit;
-//	}
-
 	queuetrack->CurConnCounter++;
 
 	conntrack->User 		= (void *)userinfo;
@@ -2107,6 +2104,9 @@ int registerConnectionByUserID( ConnectionTrack			 conntrack)
 	conntrack->LastActTime  = conntrack->RegisterTime;
 
 	transformConnectionTrackProgress(conntrack, CONN_PP_REGISTER_DONE);
+
+	elog(LOG, "ConnID %d. Connection is registered.", conntrack->ConnID);
+
 exit:
 	if ( res != FUNC_RETURN_OK )
 	{
@@ -2145,7 +2145,7 @@ void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout)
 /*
  * Cancel one queued resource allocation request.
  */
-void cancelResourceAllocRequest(ConnectionTrack conntrack)
+void cancelResourceAllocRequest(ConnectionTrack conntrack, char *errorbuf)
 {
 	if ( conntrack->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
 	{
@@ -2165,103 +2165,61 @@ void cancelResourceAllocRequest(ConnectionTrack conntrack)
 	/* Unlock session in deadlock */
 	unlockSessionResource(&(queuetrack->DLDetector), conntrack->SessionID);
 
-	buildTimeoutResponseForQueuedRequest(conntrack, RESQUEMGR_NORESOURCE_TIMEOUT);
+	buildTimeoutResponseForQueuedRequest(conntrack,
+										 RESQUEMGR_NORESOURCE_TIMEOUT,
+										 errorbuf);
 }
 
 /* Acquire resource from queue. */
-int acquireResourceFromResQueMgr(ConnectionTrack conntrack)
+int acquireResourceFromResQueMgr(ConnectionTrack  conntrack,
+								 char 			 *errorbuf,
+								 int 			  errorbufsize)
 {
-	int						res			  	= FUNC_RETURN_OK;
-
-	DynResourceQueueTrack   queuetrack	  	= conntrack->QueueTrack;
-
-	if ( queuetrack->ClusterSegNumberMax == 0 )
-	{
-		elog(LOG, "The queue %s has no resource available to run queries.",
-				  queuetrack->QueueInfo->Name);
-		return RESQUEMGR_NO_RESOURCE;
-	}
+	int						res			= FUNC_RETURN_OK;
+	DynResourceQueueTrack	queuetrack	= conntrack->QueueTrack;
 
 	/* Call quota logic to make decision of resource for current query. */
-	res = computeQueryQuota(conntrack);
+	res = computeQueryQuota(conntrack, errorbuf, errorbufsize);
 
 	if ( res == FUNC_RETURN_OK )
 	{
 		if ( conntrack->StatNVSeg == 0 )
 		{
-			int32_t Rmax  = conntrack->SegNum;
-			int32_t RmaxL = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
-			int32_t Rmin  = conntrack->SegNumMin;
-			elog(LOG, "Original quota min seg num:%d, max seg num:%d",
-					  conntrack->SegNumMin,
-					  conntrack->SegNum);
-
-			/* Ensure quota [min,max] is between request [min,max] */
-			int32_t Gmax= conntrack->MaxSegCountFixed;
-			int32_t Gmin= conntrack->MinSegCountFixed;
-
-			if(Gmin==1)
-			{
-				/* case 1 */
-				conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
-				conntrack->SegNum = min(Gmax,RmaxL);
-				if(conntrack->SegNumMin > conntrack->SegNum)
-				{
-					return RESQUEMGR_NO_RESOURCE;
-				}
-			}
-			else if(Gmax == Gmin)
-			{
-				/* case 2 */
-				conntrack->SegNumMin = Gmax;
-				conntrack->SegNum = Gmax;
-				if(Rmax < Gmax)
-				{
-					return RESQUEMGR_NO_RESOURCE;
-				}
-			}
-			else
-			{
-				/* case 3 */
-				conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
-				conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
-				if(conntrack->SegNumMin > conntrack->SegNum)
-				{
-					return RESQUEMGR_NO_RESOURCE;
-				}
-			}
-
-			elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) resource.",
-					   conntrack->SegMemoryMB,
-					   conntrack->SegCore,
-					   conntrack->SegNum,
-					   conntrack->SegNumMin);
-
+			/*------------------------------------------------------------------
+			 * Adjust the number of virtual segments again based on
+			 * NVSEG_*_LIMITs and NVSEG_*_LIMIT_PERSEGs. This adjustment must
+			 * succeed.
+			 *------------------------------------------------------------------
+			 */
 			adjustResourceExpectsByQueueNVSegLimits(conntrack);
 
-			elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
-					  "resource after adjusting based on queue NVSEG limits.",
-					   conntrack->SegMemoryMB,
-					   conntrack->SegCore,
-					   conntrack->SegNum,
-					   conntrack->SegNumMin);
+			elog(LOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) x %d "
+					  "( MIN %d ) resource after adjusting based on queue NVSEG "
+					  "limits.",
+					  conntrack->ConnID,
+					  conntrack->SegMemoryMB,
+					  conntrack->SegCore,
+					  conntrack->SegNum,
+					  conntrack->SegNumMin);
 		}
 
 		/* Add request to the resource queue and return. */
-		res = addQueryResourceRequestToQueue(queuetrack, conntrack);
-		if ( res == FUNC_RETURN_OK )
-		{
-			transformConnectionTrackProgress(conntrack,
-											 CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
-			return res;
-		}
+		addQueryResourceRequestToQueue(queuetrack, conntrack);
+		transformConnectionTrackProgress(conntrack,
+										 CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
+		/* Exit on succeeding in adding request to the queue. */
+	}
+	else
+	{
+		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
+		transformConnectionTrackProgress(conntrack, CONN_PP_RESOURCE_ACQUIRE_FAIL);
 	}
-	elog(LOG, "Not accepted resource acquiring request.");
-	transformConnectionTrackProgress(conntrack, CONN_PP_RESOURCE_ACQUIRE_FAIL);
 	return res;
 }
 
-int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
+int acquireResourceQuotaFromResQueMgr(ConnectionTrack	conntrack,
+									  char			   *errorbuf,
+									  int				errorbufsize)
 {
 	int 					res 		= FUNC_RETURN_OK;
 	DynResourceQueueTrack   queuetrack	= conntrack->QueueTrack;
@@ -2278,149 +2236,147 @@ int acquireResourceQuotaFromResQueMgr(ConnectionTrack conntrack)
 	}
 	else
 	{
-		elog(LOG, "No user %s defined for registering connection. Assign to "
-				  "default queue.",
+		elog(LOG, "ConnID %d. No user %s defined for registering connection. "
+				  "Assign to pg_default queue.",
+				  conntrack->ConnID,
 				  conntrack->UserID);
 		queuetrack = PQUEMGR->DefaultTrack;
 		userinfo = NULL;
 	}
 
-	if ( queuetrack == NULL )
-	{
-		elog(LOG, "Resource manager fails to find target resource queue for user %s.",
-				  conntrack->UserID);
-		res = RESQUEMGR_NO_ASSIGNEDQUEUE;
-		goto exit;
-	}
+	Assert( queuetrack != NULL );
 
 	conntrack->QueueTrack = queuetrack;
 	conntrack->QueueID	  = queuetrack->QueueInfo->OID;
 
-	/* Compute query quota */
-	res = computeQueryQuota(conntrack);
+	/* Call quota logic to make decision of resource for current query. */
+	res = computeQueryQuota(conntrack, errorbuf, errorbufsize);
 
 	if ( res == FUNC_RETURN_OK )
 	{
 		if ( conntrack->StatNVSeg == 0 )
 		{
-			int32_t Rmax = conntrack->SegNum;
-			int32_t RmaxL =conntrack->VSegLimitPerSeg *	PRESPOOL->AvailNodeCount;
-			int32_t Rmin = conntrack->SegNumMin;
-			elog(LOG, "Original quota min seg num:%d, max seg num:%d",
-						conntrack->SegNumMin,
-						conntrack->SegNum);
-
-			/* Ensure quota [min,max] is between request [min,max] */
-			int32_t Gmax= conntrack->MaxSegCountFixed;
-			int32_t Gmin= conntrack->MinSegCountFixed;
-
-			if(Gmin==1)
-			{
-				/* case 1 */
-				conntrack->SegNumMin = min(min(Gmax,Rmin),RmaxL);
-				conntrack->SegNum = min(Gmax,RmaxL);
-				if(conntrack->SegNumMin > conntrack->SegNum)
-				{
-					return RESQUEMGR_NO_RESOURCE;
-				}
-			}
-			else if(Gmax == Gmin)
-			{
-				/* case 2 */
-				conntrack->SegNumMin = Gmax;
-				conntrack->SegNum = Gmax;
-				if(Rmax < Gmax)
-				{
-					return RESQUEMGR_NO_RESOURCE;
-				}
-			}
-			else
-			{
-				/* case 3 */
-				conntrack->SegNumMin = min(max(Gmin,Rmin),Gmax);
-				conntrack->SegNum = min(max(min(RmaxL,Gmax),Gmin),Rmax);
-				if(conntrack->SegNumMin > conntrack->SegNum)
-				{
-					return RESQUEMGR_NO_RESOURCE;
-				}
-			}
-
-			elog(LOG, "Expect (%d MB, %lf CORE) x %d ( min %d ) resource quota.",
-					   conntrack->SegMemoryMB,
-					   conntrack->SegCore,
-					   conntrack->SegNum,
-					   conntrack->SegNumMin);
-
+			/*------------------------------------------------------------------
+			 * The following logic consider the actual resource requirement from
+			 * dispatcher based on table size, workload, etc. The requirement is
+			 * described by (MinSegCountFixed, MaxSegCountFixed). The requirement
+			 * can be satisfied only when there is a non-empty intersect between
+			 * (MinSegCountFixed, MaxSegCountFixed) and (SegNumMin, SegNum).
+			 *------------------------------------------------------------------
+			 */
+			conntrack->SegNumMin = conntrack->MaxSegCountFixed < conntrack->SegNumMin ?
+								   conntrack->MinSegCountFixed :
+								   max(conntrack->SegNumMin, conntrack->MinSegCountFixed);
+
+			conntrack->SegNum = min(conntrack->SegNum, conntrack->MaxSegCountFixed);
+
+			Assert( conntrack->SegNumMin <= conntrack->SegNum );
+			elog(LOG, "ConnID %d. Query resource quota expects (%d MB, %lf CORE) x %d "
+					  "( MIN %d ) resource after adjusting based on query characters.",
+					  conntrack->ConnID,
+					  conntrack->SegMemoryMB,
+					  conntrack->SegCore,
+					  conntrack->SegNum,
+					  conntrack->SegNumMin);
+
+			/*------------------------------------------------------------------
+			 * Adjust the number of virtual segments again based on
+			 * NVSEG_*_LIMITs and NVSEG_*_LIMIT_PERSEGs. This adjustment must
+			 * succeed.
+			 *------------------------------------------------------------------
+			 */
 			adjustResourceExpectsByQueueNVSegLimits(conntrack);
 
-			elog(LOG, "Query resource expects (%d MB, %lf CORE) x %d ( min %d ) "
-					  "resource after adjusting based on queue NVSEG limits.",
-					   conntrack->SegMemoryMB,
-					   conntrack->SegCore,
-					   conntrack->SegNum,
-					   conntrack->SegNumMin);
+			elog(LOG, "ConnID %d. Query resource quota expects (%d MB, %lf CORE) x %d "
+					  "( MIN %d ) resource after adjusting based on queue NVSEG "
+					  "limits.",
+					  conntrack->ConnID,
+					  conntrack->SegMemoryMB,
+					  conntrack->SegCore,
+					  conntrack->SegNum,
+					  conntrack->SegNumMin);
 		}
 	}
 	else
 	{
-		elog(LOG, "Not accepted resource acquiring request.");
+		elog(WARNING, "ConnID %d. Not accepted resource quota request.",
+					  conntrack->ConnID);
 	}
-exit:
 	return res;
 }
 
 void adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack conntrack)
 {
 	DynResourceQueueTrack queuetrack = conntrack->QueueTrack;
+	DynResourceQueue	  queue		 = queuetrack->QueueInfo;
+	bool				  adjusted	 = false;
 
-	if ( queuetrack == NULL )
+	if ( queue->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N )
 	{
-		elog(WARNING, "Detected connection track without assigned queue. ConnID %d",
-					  conntrack->ConnID);
-		return;
+		if ( conntrack->SegNum >= queue->NVSegLowerLimit &&
+			 conntrack->SegNumMin < queue->NVSegLowerLimit )
+		{
+			conntrack->SegNumMin = queue->NVSegLowerLimit;
+			adjusted =true;
+			elog(RMLOG, "ConnID %d. Minimum vseg number adjusted to %d",
+						conntrack->ConnID,
+						conntrack->SegNumMin);
+		}
 	}
-
-	if ( queuetrack->QueueInfo->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N ||
-		 queuetrack->QueueInfo->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
+	else if ( queue->NVSegLowerLimitPerSeg >
+			  MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
 	{
-		if ( queuetrack->QueueInfo->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N  &&
-			 queuetrack->QueueInfo->NVSegLowerLimit > conntrack->SegNumMin &&
-			 queuetrack->QueueInfo->NVSegLowerLimit <= conntrack->SegNum )
+		int minnvseg = ceil(queuetrack->QueueInfo->NVSegLowerLimitPerSeg *
+							PRESPOOL->AvailNodeCount);
+		if ( conntrack->SegNum >= minnvseg && conntrack->SegNumMin < minnvseg )
 		{
-			conntrack->SegNumMin = queuetrack->QueueInfo->NVSegLowerLimit;
+			conntrack->SegNumMin = minnvseg;
+			adjusted =true;
+			elog(RMLOG, "ConnID %d. Minimum vseg number adjusted to %d",
+						conntrack->ConnID,
+						conntrack->SegNumMin);
 		}
+	}
 
-		if ( queuetrack->QueueInfo->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N  &&
-			 queuetrack->QueueInfo->NVSegUpperLimit >= conntrack->SegNumMin &&
-			 queuetrack->QueueInfo->NVSegUpperLimit < conntrack->SegNum )
+	if ( queue->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
+	{
+		if ( conntrack->SegNum > queue->NVSegUpperLimit )
 		{
-			conntrack->SegNum = queuetrack->QueueInfo->NVSegUpperLimit;
+			conntrack->SegNum = queue->NVSegUpperLimit;
+			adjusted =true;
+			elog(RMLOG, "ConnID %d. Maximum vseg number adjusted to %d",
+						conntrack->ConnID,
+						conntrack->SegNum);
 		}
 	}
-	else if ( queuetrack->QueueInfo->NVSegLowerLimitPerSeg > 0 ||
-			  queuetrack->QueueInfo->NVSegUpperLimitPerSeg > 0 )
+	else if ( queue->NVSegUpperLimitPerSeg >
+			  MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
 	{
-		if ( queuetrack->QueueInfo->NVSegLowerLimitPerSeg >
-				 MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N  )
+		int maxnvseg = ceil(queuetrack->QueueInfo->NVSegUpperLimitPerSeg *
+							PRESPOOL->AvailNodeCount);
+		if ( conntrack->SegNum > maxnvseg )
 		{
-			int minnvseg = ceil(queuetrack->QueueInfo->NVSegLowerLimitPerSeg *
-							    PRESPOOL->AvailNodeCount);
-			if ( minnvseg > conntrack->SegNumMin && minnvseg <= conntrack->SegNum)
-			{
-				conntrack->SegNumMin = minnvseg;
-			}
+			conntrack->SegNum = maxnvseg;
+			adjusted =true;
+			elog(RMLOG, "ConnID %d. Maximum vseg number adjusted to %d",
+						conntrack->ConnID,
+						conntrack->SegNum);
 		}
+	}
 
-		if ( queuetrack->QueueInfo->NVSegUpperLimitPerSeg >
-				 MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N  )
-		{
-			int maxnvseg = ceil(queuetrack->QueueInfo->NVSegUpperLimitPerSeg *
-							    PRESPOOL->AvailNodeCount);
-			if ( maxnvseg >= conntrack->SegNumMin && maxnvseg < conntrack->SegNum)
-			{
-				conntrack->SegNum = maxnvseg;
-			}
-		}
+	/*--------------------------------------------------------------------------
+	 * Finally, we must ensure that upper limits limit the minimum resource
+	 * quota. This means, the resource limits from NVSEG upper limits are always
+	 * respected.
+	 *--------------------------------------------------------------------------
+	 */
+	if ( conntrack->SegNumMin > conntrack->SegNum )
+	{
+		conntrack->SegNumMin = conntrack->SegNum;
+		elog(RMLOG, "ConnID %d. Minimum vseg number is forced to be equal to "
+					"maximum vseg number %d",
+					conntrack->ConnID,
+					conntrack->SegNumMin);
 	}
 }
 
@@ -3367,7 +3323,7 @@ void minusResourceBundleDataByBundle(ResourceBundle detail, ResourceBundle sourc
 /**
  * Compute the query quota.
  */
-int computeQueryQuota( ConnectionTrack conn)
+int computeQueryQuota(ConnectionTrack conn, char *errorbuf, int errorbufsize)
 {
 	Assert( conn != NULL );
 	Assert( conn->QueueTrack != NULL );
@@ -3379,8 +3335,7 @@ int computeQueryQuota( ConnectionTrack conn)
 	policy = track->QueueInfo->AllocatePolicy;
 	Assert( policy >= 0 && policy < RSQ_ALLOCATION_POLICY_COUNT );
 
-	/*
-	 *--------------------------------------------------------------------------
+	/*--------------------------------------------------------------------------
 	 * Get one segment resource quota. If statement level resource quota is not
 	 * specified, the queue vseg resource quota is derived, otherwise, statement
 	 * level resource quota. The resource memory/core ratio is not changed, thus
@@ -3404,10 +3359,15 @@ int computeQueryQuota( ConnectionTrack conn)
 		if ( conn->SegNumEqual > track->ClusterSegNumberMax )
 		{
 			res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
-			elog(WARNING, "ConnID %d expects too many virtual segments %d that is"
-						  "set by hawq_rm_stmt_nvseg.",
-						  conn->ConnID,
-						  conn->SegNum);
+			snprintf(errorbuf, errorbufsize,
+					 "statement resource quota %d MB x %d vseg exceeds resource "
+					 "queue maximum capacity %d MB",
+					 conn->StatVSegMemoryMB,
+					 conn->StatNVSeg,
+					 track->ClusterSegNumberMax *
+					 track->QueueInfo->SegResourceQuotaMemoryMB);
+
+			elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
 			return res;
 		}
 	}
@@ -3415,72 +3375,158 @@ int computeQueryQuota( ConnectionTrack conn)
 	{
 		conn->SegMemoryMB = track->QueueInfo->SegResourceQuotaMemoryMB;
 		conn->SegCore 	  = track->QueueInfo->SegResourceQuotaVCore;
-	}
 
-	/* Decide vseg number and minimum runnable vseg number. */
-	if ( conn->SegNumMin > conn->VSegLimit )
-	{
-		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
-		elog(WARNING, "ConnID %d expects too many virtual segments %d, "
-					  "cannot be more than %d",
-					  conn->ConnID,
-					  conn->SegNumMin,
-					  conn->VSegLimit);
-		return res;
-	}
-
-	if ( conn->SegNum > conn->VSegLimit )
-	{
-		conn->SegNum = conn->VSegLimit;
-	}
+		int vseglimit = 0;
+		/*----------------------------------------------------------------------
+		 * The limit of vseg number per segment is valid only when query does
+		 * not have fixed vseg number to request.
+		 *----------------------------------------------------------------------
+		 */
+		if ( conn->MinSegCountFixed != conn->MaxSegCountFixed )
+		{
+			vseglimit = conn->VSegLimit ?
+						conn->VSegLimit :
+						conn->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
+		}
+		else
+		{
+			vseglimit = conn->VSegLimit;
+		}
 
-	if ( conn->StatNVSeg <= 0 )
-	{
-		/* Compute total resource quota. */
+		/*----------------------------------------------------------------------
+		 * Compute total resource quota. This calculation already considers the
+		 * query vseg limit and vseg perseg limit.
+		 *----------------------------------------------------------------------
+		 */
 		res = AllocationPolicy[policy] (track,
 										&(conn->SegNum),
 										&(conn->SegNumMin),
-										conn->VSegLimit);
+										vseglimit ,
+										errorbuf,
+										errorbufsize);
+		if ( res != FUNC_RETURN_OK )
+		{
+			/* No setting error buffer here. We expect this is already set. */
+			return res;
+		}
 
-		/*
-		 * If fixed vseg count range is lower than estimated vseg count range
-		 * based on one allocation policy, we always respect the fixed range.
-		 */
 		if ( conn->SegNum < conn->MinSegCountFixed )
 		{
 			res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
-			elog(WARNING, "Expect too many virtual segments %d, cannot be more "
-						  "than %d",
-						  conn->MinSegCountFixed,
-						  conn->SegNum);
+			snprintf(errorbuf, errorbufsize,
+					 "minimum expected number of virtual segment %d is more than "
+					 "maximum possible number %d in queue %s",
+					 conn->MinSegCountFixed,
+					 conn->SegNum,
+					 track->QueueInfo->Name);
+
+			elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
 			return res;
 		}
 
-		conn->SegNumMin = conn->MinSegCountFixed;
-		conn->SegNum = conn->SegNum < conn->MaxSegCountFixed ?
-					   conn->SegNum :
-					   conn->MaxSegCountFixed;
+		elog(LOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) x %d "
+				  "(MIN %d) after checking queue capacity.",
+				  conn->ConnID,
+				  conn->SegMemoryMB,
+				  conn->SegCore,
+				  conn->SegNum,
+				  conn->SegNumMin);
+
+		/*------------------------------------------------------------------
+		 * The following logic consider the actual resource requirement from
+		 * dispatcher based on table size, workload, etc. The requirement is
+		 * described by (MinSegCountFixed, MaxSegCountFixed). The requirement
+		 * can be satisfied only when there is a non-empty intersect between
+		 * (MinSegCountFixed, MaxSegCountFixed) and (SegNumMin, SegNum).
+		 *------------------------------------------------------------------
+		 */
+		if ( conn->MinSegCountFixed < conn->MaxSegCountFixed )
+		{
+			conn->SegNumMin = conn->MaxSegCountFixed < conn->SegNumMin ?
+							  conn->MinSegCountFixed :
+							  max(conn->SegNumMin, conn->MinSegCountFixed);
+			conn->SegNum = min(conn->SegNum, conn->MaxSegCountFixed);
+		}
+		else
+		{
+			Assert(conn->SegNum >= conn->MaxSegCountFixed);
+			conn->SegNumMin = conn->MinSegCountFixed;
+			conn->SegNum	= conn->MaxSegCountFixed;
+		}
+
+		elog(LOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) x %d "
+				  "(MIN %d) after checking query expectation %d (MIN %d).",
+				  conn->ConnID,
+				  conn->SegMemoryMB,
+				  conn->SegCore,
+				  conn->SegNum,
+				  conn->SegNumMin,
+				  conn->MaxSegCountFixed,
+				  conn->MinSegCountFixed);
+
+	}
+
+	/*--------------------------------------------------------------------------
+	 * Decide vseg number and minimum runnable vseg number. User may set guc
+	 * rm_nvseg_perquery_limit at session level, this must be followed. Even
+	 * in case the vseg number is set by statement level resource quota.
+	 *
+	 * Another guc rm_nvseg_perquery_perseg_limit can also limit the number of
+	 * vseg for one statement execution.
+	 *--------------------------------------------------------------------------
+	 */
+	if ( conn->SegNumMin > conn->VSegLimit )
+	{
+		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
+
+		snprintf(errorbuf, errorbufsize,
+				 "expected minimum number of virtual segments %d exceeds the "
+				 "limit of number of virtual segments per query %d",
+				 conn->SegNumMin,
+				 conn->VSegLimit);
+
+		elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
+		return res;
 	}
 
-	elog(DEBUG3, "Expect cluster resource (%d MB, %lf CORE) x %d "
-				 "minimum runnable %d segment(s).",
-			     conn->SegMemoryMB,
-			     conn->SegCore,
-				 conn->SegNum,
-				 conn->SegNumMin);
+	/*--------------------------------------------------------------------------
+	 * The vseg number per segment limit is valid only when required vseg num
+	 * is a range containing more than one validate values. Generally, in case
+	 * querying one hash distributed table, hash bucket number of vseg is
+	 * required.
+	 *--------------------------------------------------------------------------
+	 */
+	if (conn->StatNVSeg == 0 &&
+		conn->MinSegCountFixed != conn->MaxSegCountFixed &&
+		conn->SegNumMin > conn->VSegLimitPerSeg * PRESPOOL->AvailNodeCount )
+	{
+		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
 
+		snprintf(errorbuf, errorbufsize,
+				 "expected minimum number of virtual segments %d exceeds the "
+				 "limit of number of virtual segments per query per segment %d "
+				 "in cluster having %d available segments",
+				 conn->SegNumMin,
+				 conn->VSegLimitPerSeg,
+				 PRESPOOL->AvailNodeCount);
+
+		elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
+		return res;
+	}
 	return FUNC_RETURN_OK;
 }
 
-/* Implementation of homogeneous resource allocation. */
+/* Implementation of even resource allocation. */
 int computeQueryQuota_EVEN(DynResourceQueueTrack	track,
 						   int32_t			   	   *segnum,
 						   int32_t			   	   *segnummin,
-						   int32_t					segnumlimit)
+						   int32_t					segnumlimit,
+						   char				   	   *errorbuf,
+						   int						errorbufsize)
 {
 	DynResourceQueue queue = track->QueueInfo;
 
-	/* Decide one connection should have how many segments reserved. */
+	/* Decide one connection should have how many virtual segments reserved. */
 	int reservsegnum = trunc(track->ClusterSegNumber / queue->ParallelCount);
 	reservsegnum = reservsegnum <= 0 ? 1 : reservsegnum;
 
@@ -3494,31 +3540,6 @@ int computeQueryQuota_EVEN(DynResourceQueueTrack	track,
 	return FUNC_RETURN_OK;
 }
 
-int computeQueryQuota_FIFO(DynResourceQueueTrack	 track,
-						   int32_t			   		*segnum,
-						   int32_t			   		*segnummin,
-						   int32_t					 segnumlimit)
-{
-	DynResourceQueue queue = track->QueueInfo;
-
-	/* Decide one connection should have how many segments reserved. */
-	int reservsegnum = trunc(track->ClusterSegNumber / queue->ParallelCount);
-	reservsegnum = reservsegnum <= 0 ? 1 : reservsegnum;
-
-	/*
-	 * FIFO allocation policy does not guarantee the concurrency specified in
-	 * active_statements. Always give as more resource as possible.
-	 */
-	*segnum    = track->ClusterSegNumberMax;
-	*segnummin = track->ClusterSegNumber;
-	*segnum    = segnumlimit < *segnum ?
-				 segnumlimit :
-				 *segnum;
-
-	Assert( *segnummin > 0 && *segnummin <= *segnum );
-	return FUNC_RETURN_OK;
-}
-
 int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
 								   ConnectionTrack		 conntrack)
 {
@@ -4337,6 +4358,7 @@ void buildAcquireResourceResponseMessage(ConnectionTrack conn)
 
 void detectAndDealWithDeadLock(DynResourceQueueTrack track)
 {
+	static char errorbuf[ERRORMESSAGE_SIZE];
 	uint32_t availmemorymb = track->ClusterMemoryMaxMB -
 						     track->DLDetector.LockedTotalMemoryMB;
 	double   availcore     = track->ClusterVCoreMax -
@@ -4373,7 +4395,13 @@ void detectAndDealWithDeadLock(DynResourceQueueTrack track)
 		}
 		if ( tail != NULL ) {
 			ConnectionTrack canceltrack = (ConnectionTrack)
-										  removeDQueueNode(&(track->QueryResRequests), tail);
+										  removeDQueueNode(&(track->QueryResRequests),
+												  	  	   tail);
+
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "session "INT64_FORMAT" deadlock is detected",
+					 canceltrack->SessionID);
+
 			Assert(canceltrack != NULL);
 			availmemorymb += strack->InUseTotalMemoryMB;
 			availcore     += strack->InUseTotalCore;
@@ -4383,16 +4411,23 @@ void detectAndDealWithDeadLock(DynResourceQueueTrack track)
 
 			/* Cancel this request. */
 			RPCResponseAcquireResourceFromRMERRORData errresponse;
-			/* Send error message. */
 			errresponse.Result   = RESQUEMGR_DEADLOCK_DETECTED;
 			errresponse.Reserved = 0;
 
-			buildResponseIntoConnTrack( canceltrack,
-										(char *)&errresponse,
-										sizeof(errresponse),
-										canceltrack->MessageMark1,
-										canceltrack->MessageMark2,
-										RESPONSE_QD_ACQUIRE_RESOURCE);
+			SelfMaintainBufferData responsedata;
+			initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+			appendSMBVar(&responsedata, errresponse);
+			appendSMBStr(&responsedata, errorbuf);
+			appendSelfMaintainBufferTill64bitAligned(&responsedata);
+
+			buildResponseIntoConnTrack(canceltrack,
+									   SMBUFF_CONTENT(&responsedata),
+									   getSMBContentSize(&responsedata),
+									   canceltrack->MessageMark1,
+									   canceltrack->MessageMark2,
+									   RESPONSE_QD_ACQUIRE_RESOURCE);
+			destroySelfMaintainBuffer(&responsedata);
+
 			transformConnectionTrackProgress(canceltrack,
 											 CONN_PP_RESOURCE_QUEUE_ALLOC_FAIL);
 
@@ -4407,6 +4442,7 @@ void detectAndDealWithDeadLock(DynResourceQueueTrack track)
 
 void timeoutDeadResourceAllocation(void)
 {
+	static char errorbuf[ERRORMESSAGE_SIZE];
 	uint64_t curmsec = gettime_microsec();
 
 	if ( curmsec - PQUEMGR->LastCheckingDeadAllocationTime <
@@ -4437,8 +4473,7 @@ void timeoutDeadResourceAllocation(void)
 			if ( curmsec - curcon->LastActTime >
 				 1000000L * rm_session_lease_timeout )
 			{
-				elog(LOG, "The allocated resource timeout is detected. "
-						  "ConnID %d",
+				elog(LOG, "ConnID %d. The allocated resource timeout is detected.",
 						  curcon->ConnID);
 				returnResourceToResQueMgr(curcon);
 				returnConnectionToQueue(curcon, true);
@@ -4447,6 +4482,10 @@ void timeoutDeadResourceAllocation(void)
 					curcon->CommBuffer->toClose = true;
 					curcon->CommBuffer->forcedClose = true;
 				}
+				else
+				{
+					returnConnectionTrack(curcon);
+				}
 			}
 			break;
 		}
@@ -4456,16 +4495,25 @@ void timeoutDeadResourceAllocation(void)
 			if ( curmsec - curcon->LastActTime >
 				 1000000L * rm_session_lease_timeout )
 			{
-				elog(LOG, "The queued resource request timeout is detected. "
-						  "ConnID %d",
+				elog(LOG, "ConnID %d. The queued resource request timeout is "
+						  "detected.",
 						  curcon->ConnID);
-				cancelResourceAllocRequest(curcon);
+
+				snprintf(errorbuf, sizeof(errorbuf),
+						 "queued resource request is timed out due to no session "
+						 "lease heart-beat received");
+
+				cancelResourceAllocRequest(curcon, errorbuf);
 				returnConnectionToQueue(curcon, true);
 				if ( curcon->CommBuffer != NULL )
 				{
 					curcon->CommBuffer->toClose = true;
 					curcon->CommBuffer->forcedClose = true;
 				}
+				else
+				{
+					returnConnectionTrack(curcon);
+				}
 			}
 			break;
 		}
@@ -4484,6 +4532,10 @@ void timeoutDeadResourceAllocation(void)
 					curcon->CommBuffer->toClose = true;
 					curcon->CommBuffer->forcedClose = true;
 				}
+				else
+				{
+					returnConnectionTrack(curcon);
+				}
 			}
 			break;
 		}
@@ -4497,6 +4549,7 @@ void timeoutDeadResourceAllocation(void)
 
 void timeoutQueuedRequest(void)
 {
+	static char errorbuf[ERRORMESSAGE_SIZE];
 	uint64_t curmsec = gettime_microsec();
 
 	if ( curmsec - PQUEMGR->LastCheckingQueuedTimeoutTime <
@@ -4529,10 +4582,14 @@ void timeoutQueuedRequest(void)
 
 		if ( curmsec - ct->ConnectTime > 1000000L * rm_resource_allocation_timeout )
 		{
-			elog(WARNING, "Waiting request timeout is detected due to no "
-						  "available cluster.");
+			snprintf(errorbuf, sizeof(errorbuf),
+					 "resource request is timed out due to no available cluster");
+
+			elog(WARNING, "ConnID %d. %s", ct->ConnID, errorbuf);
 			/* Build timeout response. */
-			buildTimeoutResponseForQueuedRequest(ct, RESQUEMGR_NOCLUSTER_TIMEOUT);
+			buildTimeoutResponseForQueuedRequest(ct,
+												 RESQUEMGR_NOCLUSTER_TIMEOUT,
+												 errorbuf);
 			transformConnectionTrackProgress(ct, CONN_PP_TIMEOUT_FAIL);
 		}
 		else
@@ -4611,8 +4668,8 @@ void timeoutQueuedRequest(void)
 				   (curmsec - curcon->HeadQueueTime >
 				 	 	1000000L * rm_resource_allocation_timeout) ) )
 			{
-				elog(LOG, "The queued resource request no resource timeout is "
-						  "detected. ConnID %d, the waiting time in head of the"
+				elog(LOG, "ConnID %d. The queued resource request no resource "
+						  "timeout is detected, the waiting time in head of the"
 						  "queue is "UINT64_FORMAT " global resource pending "
 						  "start time is "UINT64_FORMAT
 						  ", current time is "UINT64_FORMAT".",
@@ -4620,6 +4677,9 @@ void timeoutQueuedRequest(void)
 						  curmsec - curcon->HeadQueueTime,
 						  PQUEMGR->RatioTrackers[index]->TotalPendingStartTime,
 						  curmsec);
+
+				snprintf(errorbuf, sizeof(errorbuf),
+						 "queued resource request is timed out due to no resource");
 				tocancel = true;
 			}
 
@@ -4629,9 +4689,13 @@ void timeoutQueuedRequest(void)
 					 1000000L * rm_resource_allocation_timeout &&
 				 ((DynResourceQueueTrack)(curcon->QueueTrack))->NumOfRunningQueries == 0 )
 			{
-				elog(LOG, "The queued resource request timeout is detected due to "
-						  "resource fragment problem. ConnID %d",
+				elog(LOG, "ConnID %d. The queued resource request timeout is "
+						  "detected due to resource fragment problem.",
 						  curcon->ConnID);
+
+				snprintf(errorbuf, sizeof(errorbuf),
+						 "queued resource request is timed out due to resource "
+						 "fragment problem");
 				tocancel = true;
 			}
 
@@ -4650,12 +4714,16 @@ void timeoutQueuedRequest(void)
 				elog(LOG, "The queued resource request timeout is detected due to "
 						  "no enough cluster resource. ConnID %d",
 						  curcon->ConnID);
+
+				snprintf(errorbuf, sizeof(errorbuf),
+						 "queued resource request is timed out due to not enough "
+						 "cluster resource capacity");
 				tocancel = true;
 			}
 
 			if ( tocancel )
 			{
-				cancelResourceAllocRequest(curcon);
+				cancelResourceAllocRequest(curcon, errorbuf);
 				returnConnectionToQueue(curcon, true);
 			}
 		}
@@ -4665,17 +4733,28 @@ void timeoutQueuedRequest(void)
 	MEMORY_CONTEXT_SWITCH_BACK
 }
 
-void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack, uint32_t reason)
+void buildTimeoutResponseForQueuedRequest(ConnectionTrack conntrack,
+										  uint32_t 		  reason,
+										  char			 *errorbuf)
 {
-	RPCResponseAcquireResourceFromRMERRORData errresponse;
-	errresponse.Result   = reason;
-	errresponse.Reserved = 0;
+	SelfMaintainBufferData responsedata;
+	initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
+
+	RPCResponseAcquireResourceFromRMERRORData response;
+	response.Result   = reason;
+	response.Reserved = 0;
+
+	appendSMBVar(&responsedata, response);
+	appendSMBStr(&responsedata, errorbuf);
+	appendSelfMaintainBufferTill64bitAligned(&responsedata);
+
 	buildResponseIntoConnTrack( conntrack,
-								(char *)&errresponse,
-								sizeof(errresponse),
+								SMBUFF_CONTENT(&responsedata),
+								getSMBContentSize(&responsedata),
 								conntrack->MessageMark1,
 								conntrack->MessageMark2,
 								RESPONSE_QD_ACQUIRE_RESOURCE);
+	destroySelfMaintainBuffer(&responsedata);
 	conntrack->ResponseSent = false;
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/utils/network_utils.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/utils/network_utils.c b/src/backend/resourcemanager/utils/network_utils.c
index 8c2a8ae..8ffcb57 100644
--- a/src/backend/resourcemanager/utils/network_utils.c
+++ b/src/backend/resourcemanager/utils/network_utils.c
@@ -98,10 +98,9 @@ int getHostIPV4AddressesByHostNameAsString(MCTYPE 	 		context,
 		}
 		else if ( h_errno != TRY_AGAIN )
 		{
-			write_log("Fail to call gethostbyname() to get host %s, %s, herrno %d",
+			write_log("Failed to call gethostbyname() to get host %s, %s",
 					  hostname,
-					  hstrerror(h_errno),
-					  h_errno);
+					  hstrerror(h_errno));
 			break;
 		}
 		pg_usleep(NETWORK_RETRY_SLEEP_US);
@@ -109,7 +108,7 @@ int getHostIPV4AddressesByHostNameAsString(MCTYPE 	 		context,
 
 	if ( hent == NULL )
 	{
-		write_log("WARNING. Fail to resolve host %s.", hostname);
+		write_log("Failed to resolve host %s.", hostname);
 		return SYSTEM_CALL_ERROR;
 	}
 
@@ -211,7 +210,7 @@ int getLocalHostAllIPAddressesAsStrings(DQueue addresslist)
 
 	if ( getifaddrs(&ifaddr) == -1 )
 	{
-		elog(ERROR, "Fail to get interface addresses when calling getifaddrs(), "
+		elog(ERROR, "Failed to get interface addresses when calling getifaddrs(), "
 					"errno %d",
 					errno);
 	}
@@ -243,8 +242,8 @@ int getLocalHostAllIPAddressesAsStrings(DQueue addresslist)
 																  host);
 			insertDQueueTailNode(addresslist, newaddr);
 
-			elog(DEBUG3, "Resource manager discovered local host IPv4 address %s",
-						 ((AddressString)(newaddr->Address))->Address);
+			elog(LOG, "Resource manager discovered local host IPv4 address %s",
+					  ((AddressString)(newaddr->Address))->Address);
 		}
 	}
 
@@ -383,20 +382,15 @@ int setConnectionNonBlocked(int fd)
 	int flags = fcntl(fd, F_GETFL, 0);
 	if (flags == -1)
 	{
-	  elog(WARNING, "setConnectionNonBlocked() fcntl GETFL failed, fd %d (errno %d)",
-			  		fd,
-					errno);
-	  return SYSTEM_CALL_ERROR;
+		write_log("Failed to call fcntl GETFL, fd %d (errno %d)", fd, errno);
+		return SYSTEM_CALL_ERROR;
 	}
 	flags = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
 	if (flags == -1)
 	{
-	  elog(WARNING, "setConnectionNonBlocked() fcntl SETFL failed, fd %d (errno %d)",
-			  	    fd,
-					errno);
-	  return SYSTEM_CALL_ERROR;
+		write_log("Failed to call fcntl SETFL, fd %d (errno %d)", fd, errno);
+		return SYSTEM_CALL_ERROR;
 	}
-
 	return FUNC_RETURN_OK;
 }
 
@@ -420,16 +414,19 @@ int  connectToServerDomain(const char 	*sockpath,
 						   char			*filename)
 {
 	struct sockaddr_un  sockaddr;
-	int					fd;
-	int					len;
-	int					sockres;
+	int					fd			= 0;
+	int					len			= 0;
+	int					sockres		= 0;
 
-	*clientfd = -1;
+	*clientfd   = -1;
 	filename[0] = '\0';
 
 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
-	if ( fd < 0 ) {
-	  write_log("connectToServerDomain open socket failed (errno %d)", errno);
+	if ( fd < 0 )
+	{
+		write_log("Failed to open socket for connecting domain socket server "
+				  "(errno %d)",
+				  errno);
 		return UTIL_NETWORK_FAIL_CREATESOCKET;
 	}
 
@@ -445,10 +442,15 @@ int  connectToServerDomain(const char 	*sockpath,
 	strcpy(filename, sockaddr.sun_path);
 
 	sockres = bind(fd, (struct sockaddr *)&sockaddr, len);
-	if ( sockres < 0 ) {
-	  write_log("connectToServerDomain bind socket failed %s, fd %d (errno %d)", filename, fd, errno);
-	  closeConnectionDomain(&fd, filename);
-	  return UTIL_NETWORK_FAIL_BIND;
+	if ( sockres < 0 )
+	{
+		write_log("Failed to bind socket for connecting domain socket server "
+				  "%s (errno %d), close fd %d at once",
+				  filename,
+				  errno,
+				  fd);
+		closeConnectionDomain(&fd, filename);
+		return UTIL_NETWORK_FAIL_BIND;
 	}
 
 	memset( &sockaddr, 0, sizeof(struct sockaddr_un) );
@@ -456,21 +458,31 @@ int  connectToServerDomain(const char 	*sockpath,
 	sprintf(sockaddr.sun_path, "%s", sockpath);
 	len = offsetof(struct sockaddr_un, sun_path) + strlen(sockaddr.sun_path);
 
-	for ( int i = 0 ; i < DRM_SOCKET_CONN_RETRY ; ++i ) {
+	for ( int i = 0 ; i < DRM_SOCKET_CONN_RETRY ; ++i )
+	{
 		sockres = connect(fd, (struct sockaddr *)&sockaddr, len);
-		if ( sockres < 0 ) {
-		  write_log("connectToServerDomain connect failed, fd %d (errno %d)", fd, errno);
-		  pg_usleep(1000000); /* Sleep 2 seconds and retry. */
+		if ( sockres < 0 )
+		{
+			write_log("Failed to connect to domain socket server "
+					  "(retry %d, errno %d), fd %d",
+					  i,
+					  errno,
+					  fd);
+			pg_usleep(1000000); /* Sleep 1 seconds and retry. */
 		}
-		else {
-		  break;
+		else
+		{
+			break;
 		}
 	}
 
-	if ( sockres < 0 ) {
-	  write_log("connectToServerDomain connect failed after retry, fd %d (errno %d)", fd, errno);
-	  closeConnectionDomain(&fd, filename);
-	  return UTIL_NETWORK_FAIL_CONNECT;
+	if ( sockres < 0 )
+	{
+		write_log("Failed to connect to domain socket server after retries, "
+				  "close fd %d at once",
+				  fd);
+		closeConnectionDomain(&fd, filename);
+		return UTIL_NETWORK_FAIL_CONNECT;
 	}
 
 	*clientfd = fd;
@@ -495,17 +507,28 @@ int connectToServerRemote(const char *address, uint16_t port, int *clientfd)
 	int					fd		= 0;
 	int 		    	sockres = 0;
 	struct sockaddr_in 	server_addr;
-	struct hostent 	   *server  = gethostbyname(address);
+	struct hostent 	   *server  = NULL;
 
 	*clientfd = -1;
 
-	if ( server == NULL ) {
+	server = gethostbyname(address);
+	if ( server == NULL )
+	{
+		write_log("Failed to get host by name %s for connecting to a remote "
+				  "socket server %s:%d (error %s)",
+				  address,
+				  address,
+				  port,
+				  hstrerror(h_errno));
 		return UTIL_NETWORK_FAIL_GETHOST;
 	}
 
 	fd = socket(AF_INET, SOCK_STREAM, 0);
-	if ( fd < 0 ) {
-	  write_log("connectToServerRemote open socket failed (errno %d)", errno);
+	if ( fd < 0 )
+	{
+		write_log("Failed to open socket for connecting remote socket server "
+				  "(errno %d)",
+				  errno);
 		return UTIL_NETWORK_FAIL_CREATESOCKET;
 	}
 
@@ -518,21 +541,29 @@ int connectToServerRemote(const char *address, uint16_t port, int *clientfd)
 
 	while(true)
 	{
-	  sockres = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
-	  if( sockres < 0)
-	  {
-	    if (errno == EINTR)
-	    {
-	      continue;
-	    }
-	    else
-	    {
-	      write_log("connectToServerRemote connect failed, fd %d (errno %d)", fd, errno);
-	      closeConnectionRemote(&fd);
-	      return UTIL_NETWORK_FAIL_CONNECT;
-	    }
-	  }
-	  break;
+		sockres = connect(fd,
+						  (struct sockaddr *)&server_addr,
+						  sizeof(server_addr));
+		if( sockres < 0)
+		{
+			write_log("Failed to connect to remove socket server (errno %d), fd %d",
+					  errno,
+					  fd);
+
+			if (errno == EINTR)
+			{
+				continue;
+			}
+			else
+			{
+				write_log("Close fd %d at once due to not recoverable error "
+						  "detected.",
+						  fd);
+				closeConnectionRemote(&fd);
+				return UTIL_NETWORK_FAIL_CONNECT;
+			}
+		}
+		break;
 	}
 
 	*clientfd = fd;
@@ -552,9 +583,7 @@ void closeConnectionRemote(int *clientfd)
 	int ret = close(*clientfd);
 	if (ret < 0)
 	{
-		write_log("ERROR closeConnectionRemote() close FD %d failed, (errno %d)",
-				  *clientfd,
-				  errno);
+		write_log("Failed to close fd %d (errno %d)", *clientfd, errno);
 	}
 	*clientfd = -1;
 }
@@ -571,7 +600,7 @@ void closeConnectionDomain(int *clientfd, char *filename)
 		int ret = close(*clientfd);
 		if (ret < 0)
 		{
-			write_log("closeConnectionDomain close fd failed, fd %d (errno %d)", *clientfd, errno);
+			write_log("Failed to close fd %d (errno %d)", *clientfd, errno);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/resourcemanager/utils/simplestring.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/utils/simplestring.c b/src/backend/resourcemanager/utils/simplestring.c
index 3725359..fcbb538 100644
--- a/src/backend/resourcemanager/utils/simplestring.c
+++ b/src/backend/resourcemanager/utils/simplestring.c
@@ -221,7 +221,7 @@ int SimpleStringToPercentage(SimpStringPtr str, int8_t *value)
 
 }
 
-int  SimpleStringToStorageSizeMB(SimpStringPtr str, int32_t *value)
+int  SimpleStringToStorageSizeMB(SimpStringPtr str, uint32_t *value)
 {
 	int 	tail 	= strlen(str->Str) - 1;
 	int 	scanres = -1;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/tcop/postgres.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0c5b028..f6f0c54 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1094,40 +1094,44 @@ exec_mpp_query(const char *query_string,
     	if ( isCGroupEnabled("cpu") && isCGroupSetup("cpu") )
     	{
     		int res = FUNC_RETURN_OK;
-    		if ( !has_been_moved_to_cgroup )
-		{
-			res = MoveToCGroupForQE(
-					resource->master_start_time,
-					gp_session_id,
-					0,
-					getpid());
+    		char errorbuf[ERRORMESSAGE_SIZE];
 
-			if ( res != FUNC_RETURN_OK )
-			{
-				elog(ERROR, "Resource enforcer fails to move QE to CGroup "
-				            "with error %d",
-				            res);
-			}
-			has_been_moved_to_cgroup = true;
-			master_start_time = resource->master_start_time;
+    		if ( !has_been_moved_to_cgroup )
+    		{
+				res = MoveToCGroupForQE(resource->master_start_time,
+										gp_session_id,
+										0,
+										getpid(),
+										errorbuf,
+										sizeof(errorbuf));
+				if ( res != FUNC_RETURN_OK )
+				{
+					elog(ERROR, "Resource enforcer fails to move QE to CGroup, "
+								"%s",
+								errorbuf);
+				}
+				has_been_moved_to_cgroup = true;
+				master_start_time = resource->master_start_time;
     		}
 
     		int mySegIdx = GetQEIndex();
     		if (IsWriter() && resource->segment_vcore_writer[mySegIdx] == mySegIdx)
     		{
-			resource->segment_vcore *= resource->segment_vcore_agg[mySegIdx];
-			res = SetWeightCGroupForQE(master_start_time,
-						   gp_session_id,
-						   0,
-						   resource,
-						   getpid());
+				resource->segment_vcore *= resource->segment_vcore_agg[mySegIdx];
+				res = SetWeightCGroupForQE(master_start_time,
+										   gp_session_id,
+										   0,
+										   resource,
+										   getpid(),
+										   errorbuf,
+										   sizeof(errorbuf));
+	    		if ( res != FUNC_RETURN_OK )
+				{
+					elog(ERROR, "Resource enforcer fails to set CGroup weight "
+								"for QE, %s",
+					            errorbuf);
+				}
     		}
-    		if ( res != FUNC_RETURN_OK )
-			{
-				elog(ERROR, "Resource enforcer fails to set CGroup weight for QE "
-				            "with error %d",
-				            res);
-			}
     	}
     }
 
@@ -5372,6 +5376,7 @@ log_disconnections(int code, Datum arg __attribute__((unused)))
 
 void OnMoveOutCGroupForQE(void)
 {
+	char errorbuf[ERRORMESSAGE_SIZE];
 	if (Gp_role == GP_ROLE_EXECUTE)
 	{
 		if (isCGroupEnabled("cpu") && isCGroupSetup("cpu"))
@@ -5379,7 +5384,9 @@ void OnMoveOutCGroupForQE(void)
 			MoveOutCGroupForQE(master_start_time,
 							   gp_session_id,
 							   0,
-							   getpid());
+							   getpid(),
+							   errorbuf,
+							   sizeof(errorbuf));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/tcop/pquery.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 78446aa..125dca7 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -694,31 +694,21 @@ void GetResourceQuota(int		max_target_segment_num,
 					  uint32   *seg_memory_mb,
 					  double   *seg_core)
 {
-	char   errorbuf[1024];
-	int    errorcode = FUNC_RETURN_OK;
+	char   errorbuf[ERRORMESSAGE_SIZE];
 	uint64 useroid 	 = GetUserId();
 
 	int ret = acquireResourceQuotaFromRM(useroid,
 										 max_target_segment_num,
 										 min_target_segment_num,
-										 &errorcode,
 										 errorbuf,
 										 sizeof(errorbuf),
 										 seg_num,
 										 seg_num_min,
 										 seg_memory_mb,
 										 seg_core);
-	if ( ret != FUNC_RETURN_OK ) {
-		elog(ERROR, "The resource quota acquiring request is not accepted by "
-					"HAWQ RM. error code=%d, message =%s",
-					ret,
-					errorbuf);
-	}
-	else if ( errorcode != FUNC_RETURN_OK ) {
-		elog(ERROR, "The resource quota acquiring request fails. "
-					"error code=%d, message =%s",
-					errorcode,
-					errorbuf);
+	if ( ret != FUNC_RETURN_OK )
+	{
+		elog(ERROR, "%s", errorbuf);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/backend/utils/init/postinit.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 0b8c85e..e4d0752 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -432,7 +432,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
     {
         if (get_tmpdir_from_rm)
         {
-            getLocalTmpDirFromMasterRM();
+        	char errorbuf[ERRORMESSAGE_SIZE] = "";
+            int res = getLocalTmpDirFromMasterRM(errorbuf, sizeof(errorbuf));
+            if ( res != FUNC_RETURN_OK )
+            {
+            	elog(ERROR, "%s", errorbuf);
+            }
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3e46d3d9/src/include/catalog/pg_resqueue.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_resqueue.h b/src/include/catalog/pg_resqueue.h
index 6547a36..5e6ec3e 100644
--- a/src/include/catalog/pg_resqueue.h
+++ b/src/include/catalog/pg_resqueue.h
@@ -142,8 +142,8 @@ typedef FormData_pg_resqueue *Form_pg_resqueue;
 
 
 /* Create initial default resource queue */
-DATA(insert OID = 9800 ( "pg_root"    0      "-1"    "100%" "100%" 2 "even" _null_      0 0 0 1 _null_ _null_ "branch"));
-DATA(insert OID = 6055 ( "pg_default" 9800   "20"    "50%"  "50%"  2 "even" "mem:128mb" 0 0 0 1 _null_ _null_ _null_));
+DATA(insert OID = 9800 ( "pg_root"    0      "-1"    "100%" "100%" 2 "even" _null_      0 0 0 0 _null_ _null_ "branch"));
+DATA(insert OID = 6055 ( "pg_default" 9800   "20"    "50%"  "50%"  2 "even" "mem:128mb" 0 0 0 0 _null_ _null_ _null_));
 
 /*
  * The possible resource allocation policies.
@@ -159,7 +159,7 @@ enum RESOURCE_QUEUE_ALLOCATION_POLICY_INDEX {
 #define DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT				"0"
 #define DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT				"0"
 #define DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT		"0"
-#define DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT		"1"
+#define DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT		"0"
 
 #define DEFAULT_RESQUEUE_ALLOCPOLICY             		"even"
 #define DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA		    	"mem:128mb"
@@ -169,7 +169,7 @@ enum RESOURCE_QUEUE_ALLOCATION_POLICY_INDEX {
 #define DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT_N			0
 #define DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT_N			0
 #define DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N		0.0
-#define DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N		1.0
+#define DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N		0.0
 
 #define DEFAULT_RESQUEUE_ALLOCPOLICY_N					RSQ_ALLOCATION_POLICY_EVEN
 #define DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA_N			128


Mime
View raw message