hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject incubator-hawq git commit: HAWQ-634. Let resource manager cancel waiting and return allocated resource when unregister connection rpc is called
Date Fri, 08 Apr 2016 11:37:34 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 300d9149f -> 0ee50a91b


HAWQ-634. Let resource manager cancel waiting and return allocated resource when unregister
connection rpc is called


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/0ee50a91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/0ee50a91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/0ee50a91

Branch: refs/heads/master
Commit: 0ee50a91b3b9304d7766fee43ce99a3d2690cc99
Parents: 300d914
Author: YI JIN <yjin@pivotal.io>
Authored: Fri Apr 8 21:37:05 2016 +1000
Committer: YI JIN <yjin@pivotal.io>
Committed: Fri Apr 8 21:37:05 2016 +1000

----------------------------------------------------------------------
 .../communication/rmcomm_AsyncComm.c            |  8 ++-
 .../communication/rmcomm_SyncComm.c             | 14 ++---
 src/backend/resourcemanager/conntrack.c         | 60 +++++++++++++++++---
 src/backend/resourcemanager/include/conntrack.h |  3 +
 .../resourcemanager/include/resqueuemanager.h   |  4 +-
 src/backend/resourcemanager/requesthandler.c    | 27 ++++++++-
 src/backend/resourcemanager/resqueuemanager.c   | 17 ++++--
 src/backend/tcop/postgres.c                     |  2 +-
 src/backend/tcop/pquery.c                       | 15 ++++-
 9 files changed, 119 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
index 181cc86..ee005bf 100644
--- a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
+++ b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
@@ -403,7 +403,7 @@ int processAllCommFileDescs(void)
 		if ( CommBuffers[i]->forcedClose )
 		{
 			/* Call cleanup handler if necessary to do user-defined cleanup. */
-			elog(DEBUG5, "Close FD %d Index %d.", CommBuffers[i]->FD, i);
+			elog(DEBUG3, "Close FD %d Index %d.", CommBuffers[i]->FD, i);
 
 			/* Close connection and free buffer */
 			closeRegisteredFileDesc(CommBuffers[i]);
@@ -414,6 +414,7 @@ int processAllCommFileDescs(void)
 			if ( CommBuffers[i]->ClientHostname.Str != NULL &&
 				 CommBuffers[i]->ServerPort != 0 )
 			{
+				elog(DEBUG3, "Return FD %d Index %d.", CommBuffers[i]->FD, i);
 				returnAliveConnectionRemoteByHostname(
 							&(CommBuffers[i]->FD),
 							CommBuffers[i]->ClientHostname.Str,
@@ -421,6 +422,7 @@ int processAllCommFileDescs(void)
 			}
 			else
 			{
+				elog(DEBUG3, "Close FD %d Index %d normally.", CommBuffers[i]->FD, i);
 				closeRegisteredFileDesc(CommBuffers[i]);
 			}
 			shouldfree = true;
@@ -518,6 +520,8 @@ void freeCommBuffer(AsyncCommBuffer *pcommbuffer)
 {
 	Assert( pcommbuffer != NULL );
 
+	elog(DEBUG3, "Free CommBuffer for FD %d.", (*pcommbuffer)->FD);
+
 	freeSimpleStringContent(&((*pcommbuffer)->ClientHostname));
 
 	destroySelfMaintainBuffer(&((*pcommbuffer)->ReadBuffer));
@@ -574,7 +578,7 @@ void unresigsterFileDesc(int fd)
 		{
 			/* Call cleanup handler if necessary to do user-defined cleanup. */
 			CommBuffers[i]->Methods->CleanUpHandle(CommBuffers[i]);
-			elog(DEBUG5, "Unregister FD %d Index %d.", CommBuffers[i]->FD, i);
+			elog(DEBUG3, "Unregister FD %d Index %d.", CommBuffers[i]->FD, i);
 			CommBuffers[i]->FD = -1;
 			freeCommBuffer(&CommBuffers[i]);
 			pos = i;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_SyncComm.c b/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
index f476add..62c043e 100644
--- a/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
+++ b/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
@@ -120,25 +120,19 @@ int callSyncRPCRemote(const char     	   *hostname,
 	while( true )
 	{
 		processAllCommFileDescs();
+		CHECK_FOR_INTERRUPTS();
 		if ( userdata.CommStatus == SYNC_RPC_COMM_IDLE )
 		{
 			break;
 		}
-		else if ( QueryCancelPending )
-		{
-			/*
-			 * We find that this QD wants to cancel the query, we don't need
-			 * to continue the communication.
-			 */
-			res = TRANSCANCEL_INPROGRESS;
-			break;
-		}
 	}
 
-	res = res == TRANSCANCEL_INPROGRESS ? res : userdata.Result;
+	res = userdata.Result;
 
 	/* Close and cleanup */
 	unresigsterFileDesc(fd);
+	elog(DEBUG3, "Result of synchronous RPC. %d", res);
+
 	if ( res == FUNC_RETURN_OK )
 	{
 		returnAliveConnectionRemoteByHostname(&fd, hostname, port);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/conntrack.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/conntrack.c b/src/backend/resourcemanager/conntrack.c
index 43fba9f..2babef4 100644
--- a/src/backend/resourcemanager/conntrack.c
+++ b/src/backend/resourcemanager/conntrack.c
@@ -23,6 +23,8 @@
 #include "communication/rmcomm_QD_RM_Protocol.h"
 
 void cutReferenceOfConnTrackAndCommBuffer(AsyncCommMessageHandlerContext context);
+void removeResourceRequestInConnHavingRequestsInternal(int32_t 	 connid,
+													   List    **requests);
 
 /* Initialize connection track manager. */
 void initializeConnectionTrackManager(void)
@@ -36,6 +38,7 @@ void initializeConnectionTrackManager(void)
 
 	PCONTRACK->FreeConnIDs			= NULL;
 	PCONTRACK->ConnHavingRequests 	= NULL;
+	PCONTRACK->ConnToRetry			= NULL;
 	PCONTRACK->ConnToSend			= NULL;
 
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
@@ -472,7 +475,6 @@ void cutReferenceOfConnTrackAndCommBuffer(AsyncCommMessageHandlerContext
context
 void processSubmittedRequests(void)
 {
 	ConnectionTrack  ct    		= NULL;
-	List			*tryagain	= NULL;
 
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
 	while( list_length(PCONTRACK->ConnHavingRequests) > 0 )
@@ -483,20 +485,20 @@ void processSubmittedRequests(void)
 		Assert(handler != NULL);
 		if ( !handler((void **)&ct) )
 		{
-			tryagain = lappend(tryagain, ct);
+			PCONTRACK->ConnToRetry = lappend(PCONTRACK->ConnToRetry, ct);
 		}
 	}
 
-	if ( list_length(tryagain) > 0 )
+	if ( list_length(PCONTRACK->ConnToRetry) > 0 )
 	{
 		elog(DEBUG3, "Resource manager retries %d requests in next loop.",
-				  	 list_length(tryagain));
+				  	 list_length(PCONTRACK->ConnToRetry));
 	}
 
-	while( list_length(tryagain) > 0 )
+	while( list_length(PCONTRACK->ConnToRetry) > 0 )
 	{
-		void *move = lfirst(list_head(tryagain));
-		tryagain = list_delete_first(tryagain);
+		void *move = lfirst(list_head(PCONTRACK->ConnToRetry));
+		PCONTRACK->ConnToRetry = list_delete_first(PCONTRACK->ConnToRetry);
 		PCONTRACK->ConnHavingRequests = lappend(PCONTRACK->ConnHavingRequests, move);
 	}
 	MEMORY_CONTEXT_SWITCH_BACK
@@ -829,3 +831,47 @@ void copyResourceQuotaConnectionTrack(ConnectionTrack source,
 	target->SegNumMin		= source->SegNumMin;
 	target->SegNumEqual		= source->SegNumEqual;
 }
+
+void removeResourceRequestInConnHavingReqeusts(int32_t connid)
+{
+	removeResourceRequestInConnHavingRequestsInternal(connid,
+													  &(PCONTRACK->ConnHavingRequests));
+	removeResourceRequestInConnHavingRequestsInternal(connid,
+													  &(PCONTRACK->ConnToRetry));
+}
+
+void removeResourceRequestInConnHavingRequestsInternal(int32_t 	 connid,
+													   List    **requests)
+{
+	ConnectionTrack  ct		  = NULL;
+	ListCell		*cell	  = NULL;
+	ListCell		*prevcell = NULL;
+
+	foreach(cell, (*requests))
+	{
+		ct = (ConnectionTrack)lfirst(cell);
+		elog(LOG, "Request id %d", ct->MessageID);
+		if ( ct->MessageID == REQUEST_QD_ACQUIRE_RESOURCE )
+		{
+			/* Try to get the connection id in this reqeust. */
+			RPCRequestHeadAcquireResourceFromRM request =
+				SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
+							&(ct->MessageBuff));
+			if ( request->ConnID == connid )
+			{
+				elog(WARNING, "Resource manager finds ConnID %d in request pending "
+							  "list should be cancelled.",
+							  request->ConnID);
+				if ( ct->CommBuffer!= NULL )
+				{
+					forceCloseFileDesc(ct->CommBuffer);
+				}
+				MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+				(*requests) = list_delete_cell((*requests), cell, prevcell);
+				MEMORY_CONTEXT_SWITCH_BACK
+				break;
+			}
+		}
+		prevcell = cell;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/include/conntrack.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/conntrack.h b/src/backend/resourcemanager/include/conntrack.h
index e1daf4a..770dd36 100644
--- a/src/backend/resourcemanager/include/conntrack.h
+++ b/src/backend/resourcemanager/include/conntrack.h
@@ -149,6 +149,7 @@ struct ConnectionTrackManagerData
 	List		   *FreeConnIDs;			/* Pre-built free connection IDs. */
 
 	List		   *ConnHavingRequests;		/* Batch request processing list. */
+	List		   *ConnToRetry;			/* Batch request to process list. */
 	List		   *ConnToSend;				/* Batch response sending list.	  */
 };
 
@@ -208,4 +209,6 @@ void setConnectionTrackMessageBuffer(ConnectionTrack  track,
 
 void freeUsedConnectionTrack(ConnectionTrack track);
 
+
+void removeResourceRequestInConnHavingReqeusts(int32_t connid);
 #endif /*DYNAMIC_RESOURCE_MANAGEMENT_CONNECTION_TRACK_H*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/include/resqueuemanager.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h
index 9c02de1..68cd7ff 100644
--- a/src/backend/resourcemanager/include/resqueuemanager.h
+++ b/src/backend/resourcemanager/include/resqueuemanager.h
@@ -470,7 +470,9 @@ void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout);
 int acquireResourceQuotaFromResQueMgr(ConnectionTrack	conntrack,
 									  char			   *errorbuf,
 									  int				errorbufsize);
-void cancelResourceAllocRequest(ConnectionTrack conntrack, char *errorbuf);
+void cancelResourceAllocRequest(ConnectionTrack  conntrack,
+								char 			*errorbuf,
+								bool			 generror);
 /*
  * APIs for operating resource detail instance.
  */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index b89d23a..daef075 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -259,7 +259,32 @@ bool handleRMRequestConnectionUnReg(void **arg)
 	request = SMBUFF_HEAD(RPCRequestHeadUnregisterConnectionInRM,
 			  	  	  	  &((*conntrack)->MessageBuff));
 
-	if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) )
+	/*
+	 * If this connection is waiting for resource allocated, cancel the request
+	 * from resource queue.
+	 */
+	if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
+	{
+		cancelResourceAllocRequest((*conntrack), errorbuf, false);
+		transformConnectionTrackProgress(conntrack, CONN_PP_REGISTER_DONE);
+	}
+	/* If this connection has resource allocated, return the resource. */
+	else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
+	{
+		returnResourceToResQueMgr((*conntrack));
+	}
+	/*
+	 * If this connection has acquire resource not processed yet, we should
+	 * remove that now. In this case, this connection should have registered.
+	 */
+	else if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE )
+	{
+		elog(WARNING, "Resource manager finds possible not handled resource "
+					  "request from ConnID %d.",
+					  request->ConnID);
+		removeResourceRequestInConnHavingReqeusts(request->ConnID);
+	}
+	else if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) )
 	{
 		snprintf(errorbuf, sizeof(errorbuf),
 				 "wrong resource context status for unregistering, %d",

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 5d43f92..630a38a 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -1861,7 +1861,9 @@ void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout)
 /*
  * Cancel one queued resource allocation request.
  */
-void cancelResourceAllocRequest(ConnectionTrack conntrack, char *errorbuf)
+void cancelResourceAllocRequest(ConnectionTrack  conntrack,
+								char 			*errorbuf,
+								bool			 generror)
 {
 	Assert(conntrack->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
 
@@ -1879,9 +1881,12 @@ void cancelResourceAllocRequest(ConnectionTrack conntrack, char *errorbuf)
 	/* Unlock session in deadlock */
 	unlockSessionResource(&(queuetrack->DLDetector), conntrack->SessionID);
 
-	buildAcquireResourceErrorResponseAndSend(conntrack,
-										 	 RESQUEMGR_NORESOURCE_TIMEOUT,
-											 errorbuf);
+	if (generror)
+	{
+		buildAcquireResourceErrorResponseAndSend(conntrack,
+												 RESQUEMGR_NORESOURCE_TIMEOUT,
+												 errorbuf);
+	}
 }
 
 /* Acquire resource from queue. */
@@ -4512,7 +4517,7 @@ void timeoutDeadResourceAllocation(void)
 						 "queued resource request is timed out due to no session "
 						 "lease heart-beat received");
 
-				cancelResourceAllocRequest(curcon, errorbuf);
+				cancelResourceAllocRequest(curcon, errorbuf, true);
 				returnConnectionToQueue(curcon, true);
 				if ( curcon->CommBuffer != NULL )
 				{
@@ -4710,7 +4715,7 @@ void timeoutQueuedRequest(void)
 
 			if ( tocancel )
 			{
-				cancelResourceAllocRequest(curcon, errorbuf);
+				cancelResourceAllocRequest(curcon, errorbuf, true);
 				returnConnectionToQueue(curcon, true);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/tcop/postgres.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 5910582..00a1a70 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4612,7 +4612,7 @@ PostgresMain(int argc, char *argv[], const char *username)
 
 				if (Gp_role == GP_ROLE_DISPATCH)
 				{
-				  CleanupGlobalQueryResources();
+					CleanupGlobalQueryResources();
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0ee50a91/src/backend/tcop/pquery.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 243ac75..8ef445a 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1119,9 +1119,15 @@ CleanupActiveQueryResource(void)
 extern void
 CleanupGlobalQueryResources(void)
 {
-  ListCell *lc;
-  int ret;
-  char errorbuf[1024];
+	ListCell *lc;
+	int ret;
+	char errorbuf[1024];
+
+	elog(LOG, "In CleanupGlobalQueryResources().");
+
+	/* Force using new socket connection to return and free. */
+	bool oldval = rm_enable_connpool;
+	rm_enable_connpool = false;
 
 	foreach(lc, GlobalQueryResources)
 	{
@@ -1152,6 +1158,9 @@ CleanupGlobalQueryResources(void)
 
   list_free(GlobalQueryResources);
   GlobalQueryResources = NULL;
+
+  /* Restore using connection pool. */
+  rm_enable_connpool = oldval;
 }
 
 extern QueryResource *


Mime
View raw message