hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject [2/2] incubator-hawq git commit: HAWQ-281. User can alter any resource queue no matter whether its busy
Date Sun, 27 Dec 2015 23:17:36 GMT
HAWQ-281. User can alter any resource queue no matter whether its busy


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/46ccd584
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/46ccd584
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/46ccd584

Branch: refs/heads/master
Commit: 46ccd584ea4ae1698711e415502babc7d3184912
Parents: fc89b45
Author: YI JIN <yjin@pivotal.io>
Authored: Mon Dec 28 10:17:19 2015 +1100
Committer: YI JIN <yjin@pivotal.io>
Committed: Mon Dec 28 10:17:19 2015 +1100

----------------------------------------------------------------------
 src/backend/cdb/cdbvars.c                       |    1 +
 src/backend/resourcemanager/conntrack.c         |  135 +-
 src/backend/resourcemanager/include/conntrack.h |   14 +-
 src/backend/resourcemanager/include/dynrm.h     |    2 -
 src/backend/resourcemanager/include/errorcode.h |    1 +
 .../resourcemanager/include/resqueuedeadlock.h  |   37 +-
 .../resourcemanager/include/resqueuemanager.h   |   49 +-
 .../resourcemanager/requesthandler_ddl.c        |  178 ++-
 src/backend/resourcemanager/resourcemanager.c   |    2 +-
 src/backend/resourcemanager/resourcepool.c      |    1 -
 src/backend/resourcemanager/resqueuedeadlock.c  |  140 +-
 src/backend/resourcemanager/resqueuemanager.c   | 1200 +++++++++++++-----
 src/backend/utils/misc/guc.c                    |    9 +
 src/include/cdb/cdbvars.h                       |    1 +
 14 files changed, 1298 insertions(+), 472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index 078957f..c2406c6 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -325,6 +325,7 @@ int		rm_stmt_nvseg;
 
 int		rm_min_resource_perseg;
 bool	rm_force_fifo_queue;
+bool	rm_force_alterqueue_cancel_queued_request;
 
 bool	rm_session_lease_heartbeat_enable;
 int     rm_session_lease_timeout; 			/* How many seconds to wait before

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/conntrack.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/conntrack.c b/src/backend/resourcemanager/conntrack.c
index 75756f4..972aa48 100644
--- a/src/backend/resourcemanager/conntrack.c
+++ b/src/backend/resourcemanager/conntrack.c
@@ -22,13 +22,6 @@
 #include "communication/rmcomm_MessageHandler.h"
 #include "communication/rmcomm_QD_RM_Protocol.h"
 
-void createEmptyConnectionTrack(ConnectionTrack *track);
-void freeUsedConnectionTrack(ConnectionTrack track);
-
-void setConnectionTrackMessageBuffer(ConnectionTrack  track,
-									 char 			 *content,
-									 int 			  size);
-
 /* Initialize connection track manager. */
 void initializeConnectionTrackManager(void)
 {
@@ -101,51 +94,58 @@ void createEmptyConnectionTrack(ConnectionTrack *track)
 	/* Create new entry in connection track. */
 	(*track) = rm_palloc0(PCONTEXT, sizeof(ConnectionTrackData));
 
-	(*track)->ConnID 		 			= INVALID_CONNID;
-	(*track)->SessionID					= -1;
-	(*track)->RegisterTime   			= 0;
 	(*track)->ConnectTime	 			= 0;
-	(*track)->ResAllocTime	 			= 0;
+	(*track)->RegisterTime   			= 0;
 	(*track)->ResRequestTime 			= 0;
+	(*track)->ResAllocTime	 			= 0;
 	(*track)->LastActTime	 			= 0;
 	(*track)->HeadQueueTime				= 0;
+
 	(*track)->ClientAddrLen  			= 0;
 	(*track)->ClientSocket   			= 0;
-	(*track)->MessageID	     	 		= 0;
+
+	(*track)->MessageSize	 	 		= 0;
 	(*track)->MessageMark1   	 		= 0;
 	(*track)->MessageMark2   	 		= 0;
-	(*track)->MessageSize	 	 		= 0;
+	(*track)->MessageID	     	 		= 0;
+	initializeSelfMaintainBuffer(&((*track)->MessageBuff), PCONTEXT);
 	(*track)->MessageReceiveTime 		= 0;
+
+	(*track)->ConnID 		 			= INVALID_CONNID;
+	(*track)->QueueID			 		= 0;
 	(*track)->Progress		 	 		= CONN_PP_INFO_NOTSET;
 	(*track)->ResponseSent	 	 		= false;
-	(*track)->SegCore		 	 		= -1.0;
+	(*track)->SessionID					= -1;
+
 	(*track)->SegMemoryMB	 	 		= -1;
+	(*track)->SegCore		 	 		= -1.0;
 	(*track)->SegIOBytes				= 0;
 	(*track)->SegNum			 		= -1;
 	(*track)->SegNumMin					= -1;
 	(*track)->SegNumActual				= -1;
+	(*track)->SegNumEqual				= 0;
+	(*track)->SegPreferredHostCount 	= 0;
+	(*track)->SegPreferredHostNames 	= NULL;
+	(*track)->SegPreferredScanSizeMB 	= NULL;
+	(*track)->SliceSize					= 0;
+	(*track)->IOBytes					= 0;
 	(*track)->MaxSegCountFixed			= 0;
 	(*track)->MinSegCountFixed			= 0;
 	(*track)->VSegLimitPerSeg			= -1;
 	(*track)->VSegLimit					= -1;
 	(*track)->StatVSegMemoryMB			= 0;
 	(*track)->StatNVSeg					= 0;
-	(*track)->SegNumEqual				= 0;
-	(*track)->SliceSize					= 0;
-	(*track)->IOBytes					= 0;
-	(*track)->QueueID			 		= 0;
-	(*track)->User				 		= NULL;
+	(*track)->Resource					= NULL;
+
 	(*track)->QueueTrack		 		= NULL;
-	(*track)->SegPreferredHostCount 	= 0;
-	(*track)->SegPreferredHostNames 	= NULL;
-	(*track)->SegPreferredScanSizeMB 	= NULL;
+	(*track)->User				 		= NULL;
+
 	(*track)->isOld						= false;
+
 	(*track)->troubledByFragment		= false;
 	(*track)->troubledByFragmentTimestamp = 0;
-	(*track)->CommBuffer				= NULL;
-	(*track)->Resource					= NULL;
 
-	initializeSelfMaintainBuffer(&((*track)->MessageBuff), PCONTEXT);
+	(*track)->CommBuffer				= NULL;
 }
 
 void freeUsedConnectionTrack(ConnectionTrack track)
@@ -720,3 +720,88 @@ void buildResponseIntoConnTrack(ConnectionTrack      conntrack,
 	conntrack->ResponseSent = false;
 	setConnectionTrackMessageBuffer(conntrack, content, size);
 }
+
+void copyAllocWaitingConnectionTrack(ConnectionTrack source,
+									 ConnectionTrack target)
+{
+	target->ConnectTime					= source->ConnectTime;
+	target->RegisterTime   				= source->RegisterTime;
+	target->ResRequestTime 				= source->ResRequestTime;
+	target->ResAllocTime	 			= 0;
+	target->LastActTime	 				= source->LastActTime;
+	target->HeadQueueTime				= source->HeadQueueTime;
+
+	memcpy(&(target->ClientAddr),
+		   &(source->ClientAddr),
+		   sizeof(struct sockaddr_in));
+	target->ClientAddrLen  				= source->ClientAddrLen;
+	target->ClientSocket   				= source->ClientSocket;
+	memcpy(target->ClientAddrDotStr,
+		   source->ClientAddrDotStr,
+		   sizeof(target->ClientAddrDotStr));
+	target->ClientAddrPort				= source->ClientAddrPort;
+
+	target->MessageSize	 	 			= source->MessageSize;
+	target->MessageMark1   	 			= source->MessageMark1;
+	target->MessageMark2   	 			= source->MessageMark2;
+	target->MessageID	     	 		= source->MessageID;
+	appendSelfMaintainBuffer(&(target->MessageBuff),
+							 SMBUFF_CONTENT(&(source->MessageBuff)),
+							 getSMBContentSize(&(source->MessageBuff)));
+	target->MessageReceiveTime 			= source->MessageReceiveTime;
+
+	target->ConnID 		 				= source->ConnID;
+	memcpy(target->UserID, source->UserID, sizeof(target->UserID));
+	target->QueueID			 			= source->QueueID;
+
+	target->Progress		 	 		= source->Progress;
+	target->ResponseSent	 	 		= source->ResponseSent;
+	target->SessionID					= source->SessionID;
+
+	target->SegMemoryMB	 	 			= -1;
+	target->SegCore		 	 			= -1.0;
+	target->SegIOBytes					= 0;
+	target->SegNum			 			= -1;
+	target->SegNumMin					= -1;
+	target->SegNumActual				= -1;
+	target->SegNumEqual					= 0;
+	target->SegPreferredHostCount 		= source->SegPreferredHostCount;
+	target->SegPreferredHostNames 		= NULL;
+	target->SegPreferredScanSizeMB 		= NULL;
+	target->SliceSize					= source->SliceSize;
+	target->IOBytes						= source->IOBytes;
+	target->MaxSegCountFixed			= source->MaxSegCountFixed;
+	target->MinSegCountFixed			= source->MinSegCountFixed;
+	target->VSegLimitPerSeg				= source->VSegLimitPerSeg;
+	target->VSegLimit					= source->VSegLimit;
+	target->StatVSegMemoryMB			= source->StatVSegMemoryMB;
+	target->StatNVSeg					= source->StatNVSeg;
+	target->Resource					= NULL;
+
+	target->QueueTrack		 			= source->QueueTrack;
+	target->User				 		= source->User;
+
+	target->isOld						= false;
+
+	target->troubledByFragment		= false;
+	target->troubledByFragmentTimestamp = 0;
+
+	target->CommBuffer				= NULL;
+
+	/*
+	 * We have copied the resource request content, so we can call this API to
+	 * build up preferred host list.
+	 */
+	buildSegPreferredHostInfo(target);
+}
+
+void copyResourceQuotaConnectionTrack(ConnectionTrack source,
+									  ConnectionTrack target)
+{
+	target->SegMemoryMB		= source->SegMemoryMB;
+	target->SegCore			= source->SegCore;
+	target->SegIOBytes		= source->SegIOBytes;
+	target->SegNum			= source->SegNum;
+	target->SegNumMin		= source->SegNumMin;
+	target->SegNumEqual		= source->SegNumEqual;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/include/conntrack.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/conntrack.h b/src/backend/resourcemanager/include/conntrack.h
index 365272d..f1799dd 100644
--- a/src/backend/resourcemanager/include/conntrack.h
+++ b/src/backend/resourcemanager/include/conntrack.h
@@ -105,7 +105,6 @@ struct ConnectionTrackData
 
 	int						Progress;		/* The processing progress.		  */
 	bool			    	ResponseSent;
-
 	int64_t					SessionID;
 
 	int32_t					SegMemoryMB;
@@ -167,6 +166,7 @@ typedef struct ConnectionTrackManagerData  ConnectionTrackManagerData;
 /* Initialize connection track manager. */
 void initializeConnectionTrackManager(void);
 
+void createEmptyConnectionTrack(ConnectionTrack *track);
 /* Use connection id. */
 int useConnectionID(int32_t *connid);
 /* Return connection id. */
@@ -194,6 +194,12 @@ void freeSegPreferredHostInfo(ConnectionTrack track);
 
 void setAllAllocatedResourceInConnectionTracksOld(void);
 
+void copyAllocWaitingConnectionTrack(ConnectionTrack source,
+									 ConnectionTrack target);
+
+void copyResourceQuotaConnectionTrack(ConnectionTrack source,
+									  ConnectionTrack target);
+
 void dumpConnectionTracks(const char *filename);
 
 /* Build response message into Connection Track instance. */
@@ -203,4 +209,10 @@ void buildResponseIntoConnTrack(ConnectionTrack  conntrack,
 								uint8_t  		 mark1,
 								uint8_t  		 mark2,
 								uint16_t 		 messageid);
+
+void setConnectionTrackMessageBuffer(ConnectionTrack  track,
+									 char 			 *content,
+									 int 			  size);
+
+void freeUsedConnectionTrack(ConnectionTrack track);
 #endif /*DYNAMIC_RESOURCE_MANAGEMENT_CONNECTION_TRACK_H*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h
index 3d66a2c..250cb75 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -334,6 +334,4 @@ int  initializeSocketServer_RMSEG(void);
 int  MainHandlerLoop_RMSEG(void);
 int  MainHandler_RMSEGDummyLoop(void);
 
-#define ELOG_ERRBUF_MESSAGE(level,buffer) elog((level), "%s", (buffer));
-
 #endif //DYNAMIC_RESOURCE_MANAGEMENT_H

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/include/errorcode.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/errorcode.h b/src/backend/resourcemanager/include/errorcode.h
index d844314..c968abc 100644
--- a/src/backend/resourcemanager/include/errorcode.h
+++ b/src/backend/resourcemanager/include/errorcode.h
@@ -94,6 +94,7 @@ enum DRM_ERROR_CODE {
 	RESQUEMGR_NOCLUSTER_TIMEOUT,
 	RESQUEMGR_NORESOURCE_TIMEOUT,
 	RESQUEMGR_WRONG_RES_QUOTA_EXP,
+	RESQUEMGR_ALTERQUEUE_CONFILICT,
 
 	REQUESTHANDLER_START_TAG = 200,
 	REQUESTHANDLER_WAIT_RESOURCE,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/include/resqueuedeadlock.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuedeadlock.h b/src/backend/resourcemanager/include/resqueuedeadlock.h
index c59a4c0..6a7fdcc 100644
--- a/src/backend/resourcemanager/include/resqueuedeadlock.h
+++ b/src/backend/resourcemanager/include/resqueuedeadlock.h
@@ -20,9 +20,10 @@
 #ifndef DYNAMIC_RESOURCE_MANAGEMENT_RESOURCE_QUEUE_DEADLOCK_DETECTOR_H
 #define DYNAMIC_RESOURCE_MANAGEMENT_RESOURCE_QUEUE_DEADLOCK_DETECTOR_H
 #include "envswitch.h"
+#include "rmcommon.h"
 #include "utils/hashtable.h"
 
-/******************************************************************************
+/*------------------------------------------------------------------------------
  *
  * +------------------------------+                         +------------------+
  * | ResqueueDeadLockDetectorData |---hashlist ref (1:N)--->| SessionTrackData |
@@ -39,7 +40,7 @@
  * quota plus locked resource can not be more than the maximum limit of this
  * resource queue.
  *
- * -----------------------------------------------------------------------------
+ *------------------------------------------------------------------------------
  * resource negotiation actions that updates this detector.
  *
  * Register connection : No action.
@@ -74,14 +75,14 @@
  * 							   Minus resource usage in the session, the session
  * 							   maybe locked or not locked.
  *
- ******************************************************************************/
+ *------------------------------------------------------------------------------
+ */
 
 struct SessionTrackData
 {
-	int64_t			SessionID;
-	uint32_t		InUseTotalMemoryMB;
-	double      	InUseTotalCore;
-	bool			Locked;
+	int64_t				SessionID;
+	ResourceBundleData 	InUseTotal;
+	bool				Locked;
 };
 
 typedef struct SessionTrackData  SessionTrackData;
@@ -89,12 +90,10 @@ typedef struct SessionTrackData *SessionTrack;
 
 struct ResqueueDeadLockDetectorData
 {
-	HASHTABLEData	Sessions;			/* Hash of SessionTrack. */
-	uint32_t		InUseTotalMemoryMB;
-	double			InUseTotalCore;
-	uint32_t		LockedTotalMemoryMB;
-	double			LockedTotalCore;
-	void           *ResqueueTrack;
+	HASHTABLEData		Sessions;					/* Hash of SessionTrack. */
+	ResourceBundleData	InUseTotal;
+	ResourceBundleData	LockedTotal;
+	void			   *ResqueueTrack;
 };
 
 typedef struct ResqueueDeadLockDetectorData  ResqueueDeadLockDetectorData;
@@ -106,17 +105,16 @@ void initializeResqueueDeadLockDetector(ResqueueDeadLockDetector detector,
 int createSession(ResqueueDeadLockDetector detector,
 				  int64_t 				   sessionid,
 				  SessionTrack			  *sessiontrack);
-int removeSession(ResqueueDeadLockDetector detector, int64_t sessionid);
 
 int addSessionInUseResource(ResqueueDeadLockDetector detector,
 							int64_t 				 sessionid,
 							uint32_t 				 memorymb,
 							double 					 core);
 
-int minusSessionInUserResource(ResqueueDeadLockDetector detector,
-							   int64_t 					sessionid,
-							   uint32_t 				memorymb,
-							   double 					core);
+int minusSessionInUseResource(ResqueueDeadLockDetector	detector,
+							  int64_t					sessionid,
+							  uint32_t 					memorymb,
+							  double 					core);
 
 void createAndLockSessionResource(ResqueueDeadLockDetector detector,
 								  int64_t 				   sessionid);
@@ -128,4 +126,7 @@ SessionTrack findSession(ResqueueDeadLockDetector detector,
 						 int64_t 				  sessionid);
 
 void resetResourceDeadLockDetector(ResqueueDeadLockDetector detector);
+
+void copyResourceDeadLockDetectorWithoutLocking(ResqueueDeadLockDetector source,
+												ResqueueDeadLockDetector target);
 #endif /* DYNAMIC_RESOURCE_MANAGEMENT_RESOURCE_QUEUE_DEADLOCK_DETECTOR_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index 5fe523b..fc22a37 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -81,7 +81,7 @@ enum RESOURCE_QUEUE_TABLE_ATTR_INDEX {
 	RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER,
 	RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA,
 	RSQ_TBL_ATTR_ALLOCATION_POLICY,
-	RSQ_TBL_ATTR_RESORUCE_OVERCOMMIT_FACTOR,
+	RSQ_TBL_ATTR_RESOURCE_OVERCOMMIT_FACTOR,
 	RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT,
 	RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT,
 	RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG,
@@ -142,7 +142,7 @@ struct DynResourceQueueData {
     double				ClusterMemoryPer;		/* Specified percentage exp.  */
     double				ClusterVCorePer;		/* Specified percentage exp.  */
 
-    char                Name[72];				/* Expect maximum 64 bytes.   */
+    char                Name[64];				/* Expect maximum 64 bytes.   */
 };
 
 typedef struct DynResourceQueueData *DynResourceQueue;
@@ -194,6 +194,10 @@ struct DynResourceQueueTrackData {
 	int					  NumOfRunningQueries;  /* Number of running queries. */
 
 	ResqueueDeadLockDetectorData DLDetector;	/* Deadlock detector.         */
+
+	DynResourceQueueTrack	ShadowQueueTrack;	/* The shadow instance for
+												   saving temporary status when
+												   altering this queue.		  */
 };
 
 /**
@@ -366,12 +370,13 @@ int parseResourceQueueAttributes( List 			 	*attributes,
 								  char 				*errorbuf,
 								  int   			 errorbufsize);
 
-int updateResourceQueueAttributes(List 			 	*attributes,
-								  DynResourceQueue 	 queue,
-								  char 				*errorbuf,
-								  int   			 errorbufsize);
+int updateResourceQueueAttributesInShadow(List 			 		*attributes,
+								  	  	  DynResourceQueueTrack	 queue,
+										  char					*errorbuf,
+										  int					 errorbufsize);
 
-void freeDynResourceQueueTrack(DynResourceQueueTrack track);
+void shallowFreeResourceQueueTrack(DynResourceQueueTrack track);
+void deepFreeResourceQueueTrack(DynResourceQueueTrack track);
 
 int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue  queue,
 											   char				*errorbuf,
@@ -493,6 +498,36 @@ void resetAllDeadLockDetector(void);
 /* Set forced number of GRM containers to return before dispatching. */
 void setForcedReturnGRMContainerCount(void);
 
+int computeQueryQuota(ConnectionTrack conn, char *errorbuf, int errorbufsize);
+
+void adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack conntrack);
+
+int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
+								   ConnectionTrack		 conntrack);
+
+void buildQueueTrackShadows(DynResourceQueueTrack	toaltertrack,
+							List 				  **qhavingshadow);
+void buildQueueTrackShadow(DynResourceQueueTrack toaltertrack);
+
+void cleanupQueueTrackShadows(List **qhavingshadow);
+
+int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
+													  char *errorbuf,
+													  int	errorbufsize);
+
+int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack  quetrack,
+												   char 				 *errorbuf,
+												   int					  errorbufsize);
+
+int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack);
+
+void applyResourceQueueTrackChangesFromShadows(List *quehavingshadow);
+
+void cancelQueryRequestToBreakDeadLockInShadow(DynResourceQueueTrack shadowtrack,
+											   DQueueNode			 iter,
+											   uint32_t				 expmemorymb,
+											   uint32_t				 availmemorymb);
+
 /* Dump resource queue status to file system. */
 void dumpResourceQueueStatus(const char *filename);
 #endif /* DYNAMIC_RESOURCE_MANAGEMENT_RESOURCE_QUEUE_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/requesthandler_ddl.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_ddl.c b/src/backend/resourcemanager/requesthandler_ddl.c
index 7133378..46c05ab 100644
--- a/src/backend/resourcemanager/requesthandler_ddl.c
+++ b/src/backend/resourcemanager/requesthandler_ddl.c
@@ -35,6 +35,8 @@
 #include "gp-libpq-fe.h"
 #include "gp-libpq-int.h"
 
+#include "conntrack.h"
+
 int updateResqueueCatalog(int					 action,
 					      DynResourceQueueTrack  queuetrack,
 						  List					*rsqattr);
@@ -87,19 +89,17 @@ const char* PG_Resqueue_Column_Names[Natts_pg_resqueue] = {
  */
 bool handleRMDDLRequestManipulateResourceQueue(void **arg)
 {
+	static char 			errorbuf[ERRORMESSAGE_SIZE];
 	int      				res		 		= FUNC_RETURN_OK;
 	uint32_t				ddlres   		= FUNC_RETURN_OK;
 	ConnectionTrack        *conntrack       = (ConnectionTrack *)arg;
 	DynResourceQueueTrack 	newtrack 		= NULL;
 	DynResourceQueueTrack   todroptrack		= NULL;
-	DynResourceQueueTrack   toupdatetrack	= NULL;
 	SelfMaintainBufferData  responsebuff;
-	static char 			errorbuf[1024] 	= "";
 	bool					exist 			= false;
 	List 				   *fineattr		= NULL;
 	List 				   *rsqattr			= NULL;
 	DynResourceQueue 		newqueue 		= NULL;
-	DynResourceQueue        oldqueue        = NULL;
 
 	/* Check context and retrieve the connection track based on connection id.*/
 	RPCRequestHeadManipulateResQueue request = (RPCRequestHeadManipulateResQueue)
@@ -177,8 +177,9 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
 	foreach(cell, rsqattr)
 	{
 		KVProperty attribute = lfirst(cell);
-		elog(DEBUG3, "Resource manager received DDL Request: %s=%s",
-				     attribute->Key.Str, attribute->Val.Str);
+		elog(RMLOG, "Resource manager received DDL Request: %s=%s",
+				    attribute->Key.Str,
+					attribute->Val.Str);
 	}
 
 	/* Shallow parse the 'withlist' attributes. */
@@ -189,7 +190,7 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
 	if (res != FUNC_RETURN_OK)
 	{
 		ddlres = res;
-		elog(WARNING, "Cannot recognize DDL attribute because %s", errorbuf);
+		elog(WARNING, "Cannot recognize DDL attribute, %s", errorbuf);
 		goto senderr;
 	}
 
@@ -197,8 +198,9 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
 	foreach(cell, fineattr)
 	{
 		KVProperty attribute = lfirst(cell);
-		elog(LOG, "DDL parsed request: %s=%s", attribute->Key.Str, attribute->Val.Str);
-
+		elog(RMLOG, "DDL parsed request: %s=%s",
+				    attribute->Key.Str,
+				    attribute->Val.Str);
 	}
 
 	/* Add into resource queue hierarchy to validate the request. */
@@ -285,6 +287,41 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
 			break;
 
 		case MANIPULATE_RESQUEUE_ALTER:
+		{
+			/*------------------------------------------------------------------
+			 * The strategy of altering one resource queue is, firstly, we always
+			 * allow user to alter a resource queue no matter whether it is busy,
+			 * no matter whether it has busy descendant resource queues. This is
+			 * for the practicality of managing resource queues.
+			 *
+			 * The challenge is that, if we have one resource queue capacity
+			 * limits, or active statement limit or resource quota or NVSEG*
+			 * limits changed, the queued query resource requests' actual
+			 * resource requests should be recalculated, which may cause some
+			 * conflicts, for example, some requests require more resource than
+			 * the queue capacity limits, some requests encounter deadlock issue.
+			 *
+			 * The idea for altering a queue is :
+			 * STEP 1. Guarantee the ALTER RESOURCE QUQUE statement is valid to
+			 * 		   be processed for practical altering;
+			 * STEP 2. Making shadow instances for all resource queue track
+			 * 		   instances potentially having definition changed or
+			 * 		   queued resource requests changed;
+			 * STEP 3. Try to do all altering related updates in shadows;
+			 * STEP 4. Update catalog if the altering can be performed without
+			 * 		   logical errors;
+			 * STEP 5. Update resource queue track status based on the shadows;
+			 * STEP 6. Remove shadows to clean up the resource queue track tree.
+			 *
+			 * In case we can not complete the whole altering procedure, we will
+			 * only need to clean up the shadows to cleanup the resource queue
+			 * track tree.
+			 *------------------------------------------------------------------
+			 */
+
+			/* STEP 1. */
+			DynResourceQueueTrack	toaltertrack  = NULL;
+			List 				   *qhavingshadow = NULL;
 			newqueue = rm_palloc0(PCONTEXT, sizeof(DynResourceQueueData));
 			res = parseResourceQueueAttributes(fineattr,
 											   newqueue,
@@ -296,91 +333,113 @@ bool handleRMDDLRequestManipulateResourceQueue(void **arg)
 			{
 				rm_pfree(PCONTEXT, newqueue);
 				ddlres = res;
-				elog(WARNING, "Resource manager can not alter resource queue "
-							  "with its attributes because %s",
+				elog(WARNING, "Resource manager cannot alter resource queue %s, %s",
+							  nameattr->Val.Str,
 							  errorbuf);
 				goto senderr;
 			}
 			rm_pfree(PCONTEXT, newqueue);
+			newqueue = NULL;
 
-			toupdatetrack = getQueueTrackByQueueName((char *)(nameattr->Val.Str),
+			toaltertrack = getQueueTrackByQueueName((char *)(nameattr->Val.Str),
 					   	   	   	   	   	   	   	   	nameattr->Val.Len,
 					   	   	   	   	   	   	   	   	&exist);
-			if (!exist || toupdatetrack == NULL)
+			if (!exist || toaltertrack == NULL)
 			{
 				ddlres = RESQUEMGR_NO_QUENAME;
 				snprintf(errorbuf, sizeof(errorbuf), "the queue doesn't exist");
-				elog(WARNING, ERRORPOS_FORMAT
-					 "Resource manager can not alter resource queue %s because %s",
-				     ERRREPORTPOS,
-				     nameattr->Val.Str,
-				     errorbuf);
+				elog(WARNING, "Resource manager cannot alter resource queue %s, %s",
+							  nameattr->Val.Str,
+							  errorbuf);
 				goto senderr;
 			}
-			newqueue = toupdatetrack->QueueInfo;
-			oldqueue = (DynResourceQueue)
-					   rm_palloc0(PCONTEXT,
-								  sizeof(DynResourceQueueData));
-			memcpy(oldqueue, newqueue, sizeof(DynResourceQueueData));
-
-			res = updateResourceQueueAttributes(fineattr,
-												newqueue,
-												errorbuf,
-												sizeof(errorbuf));
+
+			/* STEP 2. Build all necessary shadow resource queue track instance. */
+			buildQueueTrackShadows(toaltertrack, &qhavingshadow);
+
+			/* STEP 3. Update resource queue attribute in shadow instance. */
+			res = updateResourceQueueAttributesInShadow(fineattr,
+														toaltertrack,
+														errorbuf,
+														sizeof(errorbuf));
 			if (res != FUNC_RETURN_OK)
 			{
 				ddlres = res;
-				elog(WARNING, ERRORPOS_FORMAT
-					 "HAWQ RM Can not alter resource queue with its attributes "
-					 "because %s",
-					 ERRREPORTPOS,
-					 errorbuf);
-				/* If fail in updating catalog table, revert previous updates */
-				memcpy(newqueue, oldqueue, sizeof(DynResourceQueueData));
-				rm_pfree(PCONTEXT, oldqueue);
+				elog(WARNING, "Resource manager cannot update resource queue with "
+							  "its attributes, %s",
+							  errorbuf);
+				cleanupQueueTrackShadows(&qhavingshadow);
 				goto senderr;
 			}
 
-			res = checkAndCompleteNewResourceQueueAttributes(newqueue,
-															 errorbuf,
-															 sizeof(errorbuf));
+			res = checkAndCompleteNewResourceQueueAttributes(
+					  toaltertrack->ShadowQueueTrack->QueueInfo,
+					  errorbuf,
+					  sizeof(errorbuf));
 			if (res != FUNC_RETURN_OK)
 			{
 				ddlres = res;
-				elog(WARNING, ERRORPOS_FORMAT
-					 "HAWQ RM Can not complete resource queue's attributes "
-					 "because %s",
-					 ERRREPORTPOS,
-					 errorbuf);
-				/* If fail in updating catalog table, revert previous updates */
-				memcpy(newqueue, oldqueue, sizeof(DynResourceQueueData));
-				rm_pfree(PCONTEXT, oldqueue);
+				elog(WARNING, "Resource manager cannot complete resource queue "
+							  "attributes, %s",
+							  errorbuf);
+				cleanupQueueTrackShadows(&qhavingshadow);
+				goto senderr;
+			}
+
+			/*
+			 * Refresh actual capacity of the resource queue, the change is
+			 * expected to be updated in the shadow instances.
+			 */
+			refreshResourceQueuePercentageCapacity();
+
+			/*------------------------------------------------------------------
+			 * Till now, we expect the input for altering a resource queue is
+			 * valid, and we have built the necessary shadows for those queues
+			 * whose dynamic status are possible to be updated, including queued
+			 * query resource requests and corresponding deadlock detector status.
+			 *
+			 * If we force resource manager to cancel queued resource request,
+			 * the queued resource requests impossible to be satisfied due to
+			 * queue capacity shrinking or deadlock detection are canceled at
+			 * once.
+			 *
+			 * If we do not, ALTER RESOURCE QUEUE statement is cancelled. res
+			 * has error code returned in this function.
+			 *------------------------------------------------------------------
+			 */
+
+			res = rebuildAllResourceQueueTrackDynamicStatusInShadow(qhavingshadow,
+																	errorbuf,
+																	sizeof(errorbuf));
+			if ( res != FUNC_RETURN_OK )
+			{
+				ddlres = res;
+				elog(WARNING, "Can not apply alter resource queue changes, %s",
+							  errorbuf);
+				cleanupQueueTrackShadows(&qhavingshadow);
 				goto senderr;
 			}
 
+			/* STEP 4. Update catalog. */
 			res = updateResqueueCatalog(request->ManipulateAction,
-										toupdatetrack,
+										toaltertrack,
 										rsqattr);
 			if (res != FUNC_RETURN_OK)
 			{
 				ddlres = res;
-				elog(WARNING, ERRORPOS_FORMAT
-					 "Cannot alter resource queue changes in pg_resqueue.",
-					 ERRREPORTPOS);
-
-				/* If fail in updating catalog table, revert previous updates */
-				memcpy(newqueue, oldqueue, sizeof(DynResourceQueueData));
-				rm_pfree(PCONTEXT, oldqueue);
+				elog(WARNING, "Cannot alter resource queue changes in pg_resqueue.");
+				cleanupQueueTrackShadows(&qhavingshadow);
 				goto senderr;
 			}
 
-			if(oldqueue)
-			{
-				rm_pfree(PCONTEXT, oldqueue);
-			}
+			/* STEP 5. Update resource queue tracks referencing corresponding
+			 * 		   shadows. */
+			applyResourceQueueTrackChangesFromShadows(qhavingshadow);
 
+			/* STEP 6. Clean up. */
+			cleanupQueueTrackShadows(&qhavingshadow);
 			break;
-
+		}
 		case MANIPULATE_RESQUEUE_DROP:
 			todroptrack = getQueueTrackByQueueName((char *)(nameattr->Val.Str),
 												   nameattr->Val.Len,
@@ -536,6 +595,7 @@ senderr:
 	}
 }
 
+
 bool handleRMDDLRequestManipulateRole(void **arg)
 {
 	RPCResponseHeadManipulateRoleData response;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 9549855..d36eb0e 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -1628,7 +1628,7 @@ void notifyPostmasterResManagerStarted(SIGNAL_ARGS)
  * queues[out]		: list of DynResourceQueue
  * users[out]		: list of UserInfo
  */
-int  addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
+int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
 {
 	static char	  errorbuf[1024];
 	int			  res			= FUNC_RETURN_OK;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 4c74029..18bba56 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -4135,7 +4135,6 @@ void refreshAvailableNodeCount(void)
 	ListCell *cell 		= NULL;
 	getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegs);
 
-	int oldcount = PRESPOOL->AvailNodeCount;
 	PRESPOOL->AvailNodeCount = 0;
 	foreach(cell, allsegs)
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/46ccd584/src/backend/resourcemanager/resqueuedeadlock.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuedeadlock.c b/src/backend/resourcemanager/resqueuedeadlock.c
index 9564983..69b149a 100644
--- a/src/backend/resourcemanager/resqueuedeadlock.c
+++ b/src/backend/resourcemanager/resqueuedeadlock.c
@@ -20,6 +20,7 @@
 #include "resqueuedeadlock.h"
 #include "dynrm.h"
 #include "utils/simplestring.h"
+#include "resqueuemanager.h"
 
 void initializeResqueueDeadLockDetector(ResqueueDeadLockDetector detector,
 										void                    *queuetrack)
@@ -32,10 +33,9 @@ void initializeResqueueDeadLockDetector(ResqueueDeadLockDetector detector,
 						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
 						HASHTABLE_KEYTYPE_CHARARRAY,
 						NULL);
-	detector->InUseTotalCore      = 0;
-	detector->InUseTotalMemoryMB  = 0;
-	detector->LockedTotalCore     = 0;
-	detector->LockedTotalMemoryMB = 0;
+
+	resetResourceBundleData(&(detector->InUseTotal), 0, 0.0, 0);
+	resetResourceBundleData(&(detector->LockedTotal), 0, 0.0, 0);
 }
 
 int addSessionInUseResource(ResqueueDeadLockDetector detector,
@@ -56,19 +56,16 @@ int addSessionInUseResource(ResqueueDeadLockDetector detector,
 	SessionTrack sessiontrack = (SessionTrack)(pair->Value);
 	Assert( sessiontrack != NULL );
 	Assert( !sessiontrack->Locked );
-	sessiontrack->InUseTotalMemoryMB += memorymb;
-	sessiontrack->InUseTotalCore     += core;
-
-	detector->InUseTotalMemoryMB += memorymb;
-	detector->InUseTotalCore     += core;
+	addResourceBundleData(&(sessiontrack->InUseTotal), memorymb, core);
+	addResourceBundleData(&(detector->InUseTotal), memorymb, core);
 
 	return FUNC_RETURN_OK;
 }
 
-int minusSessionInUserResource(ResqueueDeadLockDetector detector,
-							   int64_t 					sessionid,
-							   uint32_t 				memorymb,
-							   double 					core)
+int minusSessionInUseResource(ResqueueDeadLockDetector	detector,
+							  int64_t					sessionid,
+							  uint32_t 					memorymb,
+							  double 					core)
 {
 	/* Build key */
 	SimpArray key;
@@ -80,53 +77,29 @@ int minusSessionInUserResource(ResqueueDeadLockDetector detector,
 		return RESQUEMGR_NO_SESSIONID;
 	}
 
-	detector->InUseTotalMemoryMB -= memorymb;
-	detector->InUseTotalCore     -= core;
+	minusResourceBundleData(&(detector->InUseTotal), memorymb, core);
 
 	SessionTrack sessiontrack = (SessionTrack)(pair->Value);
 	Assert( sessiontrack != NULL );
 
-	sessiontrack->InUseTotalMemoryMB -= memorymb;
-	sessiontrack->InUseTotalCore     -= core;
+	minusResourceBundleData(&(sessiontrack->InUseTotal), memorymb, core);
 
-	Assert(detector->InUseTotalCore >= 0 && detector->InUseTotalMemoryMB >=
0);
-	Assert(sessiontrack->InUseTotalCore >= 0 && sessiontrack->InUseTotalMemoryMB
>= 0);
+	Assert(detector->InUseTotal.Core >= 0.0 &&
+		   detector->InUseTotal.MemoryMB >= 0);
+	Assert(sessiontrack->InUseTotal.Core >= 0.0 &&
+		   sessiontrack->InUseTotal.MemoryMB >= 0);
 
 	/* If the session has no resource used, remove the session tracker. */
-	if ( sessiontrack->InUseTotalMemoryMB == 0 &&
-		 sessiontrack->InUseTotalCore == 0	) {
+	if ( sessiontrack->InUseTotal.MemoryMB == 0 &&
+		 sessiontrack->InUseTotal.Core == 0.0 )
+	{
 		rm_pfree(PCONTEXT, sessiontrack);
-		/* Remove from hash table. */
 		removeHASHTABLENode(&(detector->Sessions), &key);
 	}
 
 	return FUNC_RETURN_OK;
 }
 
-int removeSession(ResqueueDeadLockDetector detector, int64_t sessionid)
-{
-	/* Build key */
-	SimpArray key;
-	setSimpleArrayRef(&key, (char *)&sessionid, sizeof(int64_t));
-
-	/* Check if the session id exists. */
-	PAIR pair = getHASHTABLENode(&(detector->Sessions), &key);
-	if ( pair == NULL ) {
-		return RESQUEMGR_NO_SESSIONID;
-	}
-
-	SessionTrack sessiontrack = (SessionTrack)(pair->Value);
-	detector->InUseTotalMemoryMB -= sessiontrack->InUseTotalMemoryMB;
-	detector->InUseTotalCore     -= sessiontrack->InUseTotalCore;
-
-	rm_pfree(PCONTEXT, sessiontrack);
-
-	/* Remove from hash table. */
-	removeHASHTABLENode(&(detector->Sessions), &key);
-
-	return FUNC_RETURN_OK;
-}
-
 void createAndLockSessionResource(ResqueueDeadLockDetector detector,
 								  int64_t 				   sessionid)
 {
@@ -138,28 +111,30 @@ void createAndLockSessionResource(ResqueueDeadLockDetector detector,
 
 	/* Check if the session id exists. */
 	PAIR pair = getHASHTABLENode(&(detector->Sessions), &key);
-	if ( pair == NULL ) {
+	if ( pair == NULL )
+	{
 		curstrack = (SessionTrack)rm_palloc0(PCONTEXT, sizeof(SessionTrackData));
-		curstrack->SessionID 		  = sessionid;
-		curstrack->InUseTotalCore     = 0;
-		curstrack->InUseTotalMemoryMB = 0;
-		curstrack->Locked			  = false;
+		curstrack->SessionID = sessionid;
+		curstrack->Locked	 = false;
+		resetResourceBundleData(&(curstrack->InUseTotal), 0, 0.0, 0);
+
 		/* Add to the detector. */
 		setHASHTABLENode(&(detector->Sessions), &key, curstrack, false);
 	}
-	else {
+	else
+	{
 		curstrack = (SessionTrack)(pair->Value);
 	}
 
 	Assert( curstrack != NULL );
 	Assert( !curstrack->Locked );
 	curstrack->Locked = true;
-	detector->LockedTotalCore     += curstrack->InUseTotalCore;
-	detector->LockedTotalMemoryMB += curstrack->InUseTotalMemoryMB;
+	addResourceBundleDataByBundle(&(detector->LockedTotal),
+								  &(curstrack->InUseTotal));
 
-	elog(DEBUG3, "Locked session "INT64_FORMAT "Left %d MB",
-				 sessionid,
-				 detector->LockedTotalMemoryMB);
+	elog(RMLOG, "Locked session "INT64_FORMAT" Locked %d MB",
+				sessionid,
+				detector->LockedTotal.MemoryMB);
 }
 
 void unlockSessionResource(ResqueueDeadLockDetector detector,
@@ -172,20 +147,22 @@ void unlockSessionResource(ResqueueDeadLockDetector detector,
 	/* Check if the session id exists. */
 	PAIR pair = getHASHTABLENode(&(detector->Sessions), &key);
 
-	if ( pair != NULL ) {
+	if ( pair != NULL )
+	{
 		SessionTrack sessiontrack = (SessionTrack)(pair->Value);
 		Assert(sessiontrack != NULL);
 		Assert(sessiontrack->Locked);
-		detector->LockedTotalCore     -= sessiontrack->InUseTotalCore;
-		detector->LockedTotalMemoryMB -= sessiontrack->InUseTotalMemoryMB;
+		minusResourceBundleDataByBundle(&(detector->LockedTotal),
+									    &(sessiontrack->InUseTotal));
 		sessiontrack->Locked = false;
 
-		elog(DEBUG3, "Unlocked session "INT64_FORMAT "Left %d MB",
+		elog(DEBUG3, "Unlocked session "INT64_FORMAT " Locked %d MB",
 					 sessionid,
-					 detector->LockedTotalMemoryMB);
+					 detector->LockedTotal.MemoryMB);
 	}
 
-	Assert(detector->LockedTotalCore >= 0 && detector->LockedTotalMemoryMB
>= 0);
+	Assert(detector->LockedTotal.Core >= 0.0 &&
+		   detector->LockedTotal.MemoryMB >= 0);
 }
 
 SessionTrack findSession(ResqueueDeadLockDetector detector,
@@ -197,7 +174,8 @@ SessionTrack findSession(ResqueueDeadLockDetector detector,
 
 	/* Check if the session id exists. */
 	PAIR pair = getHASHTABLENode(&(detector->Sessions), &key);
-	if ( pair != NULL ) {
+	if ( pair != NULL )
+	{
 		return (SessionTrack)(pair->Value);
 	}
 	return NULL;
@@ -216,8 +194,34 @@ void resetResourceDeadLockDetector(ResqueueDeadLockDetector detector)
 	freePAIRRefList(&(PCONTRACK->Connections), &allss);
 	clearHASHTABLE(&(detector->Sessions));
 
-	detector->InUseTotalMemoryMB 	= 0;
-	detector->InUseTotalCore		= 0.0;
-	detector->LockedTotalMemoryMB	= 0;
-	detector->LockedTotalCore		= 0.0;
+	resetResourceBundleData(&(detector->InUseTotal), 0, 0.0, 0);
+	resetResourceBundleData(&(detector->LockedTotal), 0, 0.0, 0);
+}
+
+void copyResourceDeadLockDetectorWithoutLocking(ResqueueDeadLockDetector source,
+												ResqueueDeadLockDetector target)
+{
+	Assert(source != NULL);
+	Assert(target != NULL);
+	resetResourceDeadLockDetector(target);
+	target->ResqueueTrack = source->ResqueueTrack;
+	addResourceBundleDataByBundle(&(target->InUseTotal), &(source->InUseTotal));
+
+	List 	 *allss = NULL;
+	ListCell *cell	= NULL;
+	getAllPAIRRefIntoList(&(source->Sessions), &allss);
+	foreach(cell, allss)
+	{
+		SessionTrack strack = (SessionTrack)(((PAIR)lfirst(cell))->Value);
+		SessionTrack newstrack = rm_palloc0(PCONTEXT, sizeof(SessionTrackData));
+		newstrack->SessionID = strack->SessionID;
+		newstrack->Locked	 = false;
+		resetResourceBundleData(&(newstrack->InUseTotal), 0, 0.0, 0);
+		addResourceBundleDataByBundle(&(newstrack->InUseTotal),
+									  &(strack->InUseTotal));
+		/* Add to the detector. */
+		SimpArray key;
+		setSimpleArrayRef(&key, (char *)&(newstrack->SessionID), sizeof(int64_t));
+		setHASHTABLENode(&(target->Sessions), &key, newstrack, false);
+	}
 }



Mime
View raw message