hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lei_ch...@apache.org
Subject incubator-hawq git commit: HAWQ-667. QD resource heart-beat sleeps too long time
Date Wed, 13 Apr 2016 10:09:26 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master c71e20cee -> ae0809013


HAWQ-667. QD resource heart-beat sleeps too long time


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ae080901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ae080901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ae080901

Branch: refs/heads/master
Commit: ae0809013932a0d8d5c56e05e574e684a131ad0e
Parents: c71e20c
Author: Lei Chang <chang.lei.cn@gmail.com>
Authored: Wed Apr 13 18:06:41 2016 +0800
Committer: Lei Chang <chang.lei.cn@gmail.com>
Committed: Wed Apr 13 18:06:41 2016 +0800

----------------------------------------------------------------------
 .../communication/rmcomm_QD2RM.c                | 270 ++++++++++---------
 1 file changed, 140 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ae080901/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index b83edae..ad2d59b 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -1386,6 +1386,7 @@ void *generateResourceRefreshHeartBeat(void *arg)
 	static char messagetail[8]  = {'M' ,'S' ,'G' ,'E' ,'N' ,'D' ,'S' ,'!' };
 
 	int fd = -1;
+	int sleepTimes = 0;
 
 	HeartBeatThreadArg tharg = arg;
 	Assert(arg != NULL);
@@ -1402,162 +1403,171 @@ void *generateResourceRefreshHeartBeat(void *arg)
 
 	while( ResourceHeartBeatRunning )
 	{
-		resetSelfMaintainBuffer(&sendbuffer);
-		resetSelfMaintainBuffer(&contbuffer);
-		bool sendcontent = false;
-
-		/* Lock to access array of resource sets */
-		pthread_mutex_lock(&ResourceSetsMutex);
-
-		RPCRequestHeadRefreshResourceHeartBeatData request;
-		request.ConnIDCount = QD2RM_ResourceSetCount;
-		request.Reserved    = 0;
-		appendSMBVar(&contbuffer, request);
-
-		/* Get all current in-use resource set IDs and build into request. */
-	    for ( int i = 0 ; i < QD2RM_ResourceSetSize ; ++i ) {
-	        if ( QD2RM_ResourceSets[i] == NULL ||
-	        	 QD2RM_ResourceSets[i]->QD_Conn_ID == INVALID_CONNID )
-	        {
-	        	continue;
-	        }
-	        appendSMBVar(&contbuffer, QD2RM_ResourceSets[i]->QD_Conn_ID);
-	        sendcontent = true;
-	    }
-		/* Unlock */
-		pthread_mutex_unlock(&ResourceSetsMutex);
-
-		/* Build final request content and send out. */
-		appendSelfMaintainBufferTill64bitAligned(&contbuffer);
-
-		if ( sendcontent )
-		{
-			/* Connect to server only when necessary. */
-			if ( fd < 0 )
-			{
-				/* Connect to resource manager server. */
-				struct sockaddr_in server_addr;
-				fd = socket(AF_INET, SOCK_STREAM, 0);
-				if ( fd < 0 )
+#define SLEEP_TIME (0.1) // in seconds
+		// the following check is used to avoid sleeping too long time
+		// without checking ResourceHeartBeatRunning
+		if (sleepTimes * SLEEP_TIME < rm_session_lease_heartbeat_interval) {
+			pg_usleep(SLEEP_TIME * 1000000L);
+			sleepTimes++;
+		} else {
+			sleepTimes = 0;
+
+			resetSelfMaintainBuffer(&sendbuffer);
+			resetSelfMaintainBuffer(&contbuffer);
+			bool sendcontent = false;
+
+			/* Lock to access array of resource sets */
+			pthread_mutex_lock(&ResourceSetsMutex);
+
+			RPCRequestHeadRefreshResourceHeartBeatData request;
+			request.ConnIDCount = QD2RM_ResourceSetCount;
+			request.Reserved    = 0;
+			appendSMBVar(&contbuffer, request);
+
+			/* Get all current in-use resource set IDs and build into request. */
+			for ( int i = 0 ; i < QD2RM_ResourceSetSize ; ++i ) {
+				if ( QD2RM_ResourceSets[i] == NULL ||
+					 QD2RM_ResourceSets[i]->QD_Conn_ID == INVALID_CONNID )
 				{
-					write_log("ERROR generateResourceRefreshHeartBeat failed to open "
-							  "socket (errno %d)", errno);
-					break;
+					continue;
 				}
-				memset(&server_addr, 0, sizeof(server_addr));
-				server_addr.sin_family = AF_INET;
-				memcpy(&(server_addr.sin_addr.s_addr),
-					   tharg->HostAddrs[0],
-					   tharg->HostAddrLength);
-				server_addr.sin_port = htons(rm_master_port);
-
-				int sockres = 0;
-				while(true)
+				appendSMBVar(&contbuffer, QD2RM_ResourceSets[i]->QD_Conn_ID);
+				sendcontent = true;
+			}
+			/* Unlock */
+			pthread_mutex_unlock(&ResourceSetsMutex);
+
+			/* Build final request content and send out. */
+			appendSelfMaintainBufferTill64bitAligned(&contbuffer);
+
+			if ( sendcontent )
+			{
+				/* Connect to server only when necessary. */
+				if ( fd < 0 )
 				{
-					int on;
-					sockres = connect(fd,
-									  (struct sockaddr *)&server_addr,
-									  sizeof(server_addr));
-					if (sockres < 0)
+					/* Connect to resource manager server. */
+					struct sockaddr_in server_addr;
+					fd = socket(AF_INET, SOCK_STREAM, 0);
+					if ( fd < 0 )
+					{
+						write_log("ERROR generateResourceRefreshHeartBeat failed to open "
+								  "socket (errno %d)", errno);
+						break;
+					}
+					memset(&server_addr, 0, sizeof(server_addr));
+					server_addr.sin_family = AF_INET;
+					memcpy(&(server_addr.sin_addr.s_addr),
+						   tharg->HostAddrs[0],
+						   tharg->HostAddrLength);
+					server_addr.sin_port = htons(rm_master_port);
+
+					int sockres = 0;
+					while(true)
 					{
-						if (errno == EINTR)
+						int on;
+						sockres = connect(fd,
+										  (struct sockaddr *)&server_addr,
+										  sizeof(server_addr));
+						if (sockres < 0)
 						{
-							continue;
+							if (errno == EINTR)
+							{
+								continue;
+							}
+							else
+							{
+								write_log("ERROR generateResourceRefreshHeartBeat "
+										  "failed to connect to resource manager, "
+										  "fd %d (errno %d)", fd, errno);
+								close(fd);
+								fd = -1;
+							}
 						}
-						else
+	#ifdef	TCP_NODELAY
+						on = 1;
+						if (sockres == 0 &&
+							setsockopt(fd,
+									   IPPROTO_TCP, TCP_NODELAY,
+									   (char *) &on, sizeof(on)) < 0)
 						{
-							write_log("ERROR generateResourceRefreshHeartBeat "
-									  "failed to connect to resource manager, "
-									  "fd %d (errno %d)", fd, errno);
+							write_log("ERROR setsockopt(TCP_NODELAY) failed: %m");
 							close(fd);
 							fd = -1;
+							sockres = -1;
 						}
+	#endif
+						on = 1;
+						if (sockres == 0 &&
+							setsockopt(fd,
+									   SOL_SOCKET, SO_KEEPALIVE,
+									   (char *) &on, sizeof(on)) < 0)
+						{
+							write_log("ERROR setsockopt(SO_KEEPALIVE) failed: %m");
+							close(fd);
+							fd = -1;
+							sockres = -1;
+						}
+
+						break;
 					}
-#ifdef	TCP_NODELAY
-					on = 1;
-					if (sockres == 0 &&
-						setsockopt(fd,
-								   IPPROTO_TCP, TCP_NODELAY,
-								   (char *) &on, sizeof(on)) < 0)
-					{
-						write_log("ERROR setsockopt(TCP_NODELAY) failed: %m");
-						close(fd);
-						fd = -1;
-						sockres = -1;
-					}
-#endif
-					on = 1;
-					if (sockres == 0 &&
-						setsockopt(fd,
-								   SOL_SOCKET, SO_KEEPALIVE,
-								   (char *) &on, sizeof(on)) < 0)
+
+					if ( sockres < 0 )
 					{
-						write_log("ERROR setsockopt(SO_KEEPALIVE) failed: %m");
-						close(fd);
-						fd = -1;
-						sockres = -1;
+						pg_usleep(1000000L);
+						continue;
 					}
 
-					break;
 				}
 
-				if ( sockres < 0 )
+				RMMessageHead phead = (RMMessageHead)messagehead;
+				RMMessageTail ptail = (RMMessageTail)messagetail;
+				phead->Mark1       = 0;
+				phead->Mark2       = 0;
+				phead->MessageID   = REQUEST_QD_REFRESH_RESOURCE;
+				phead->MessageSize = contbuffer.Cursor + 1;
+
+				appendSelfMaintainBuffer(&sendbuffer, (char *)phead, sizeof(*phead));
+				appendSelfMaintainBuffer(&sendbuffer,
+										 SMBUFF_CONTENT(&contbuffer),
+										 getSMBContentSize(&contbuffer));
+				appendSelfMaintainBuffer(&sendbuffer, (char *)ptail, sizeof(*ptail));
+
+				if ( sendWithRetry(fd,
+								   SMBUFF_CONTENT(&sendbuffer),
+								   getSMBContentSize(&sendbuffer),
+								   false) == FUNC_RETURN_OK)
 				{
-					pg_usleep(1000000L);
-					continue;
+					RPCResponseRefreshResourceHeartBeatData response;
+					/* Do not care response at all. */
+					char recvbuf[sizeof(messagehead) +
+								 sizeof(messagetail) +
+								 sizeof(response)];
+
+					if ( recvWithRetry(fd,
+									   recvbuf,
+									   sizeof(recvbuf),
+									   false) != FUNC_RETURN_OK)
+					{
+						write_log("ERROR generateResourceRefreshHeartBeat recv error "
+								  "(errno %d)", errno);
+						close(fd);
+						fd = -1;
+					}
 				}
-
-			}
-
-			RMMessageHead phead = (RMMessageHead)messagehead;
-			RMMessageTail ptail = (RMMessageTail)messagetail;
-			phead->Mark1       = 0;
-			phead->Mark2       = 0;
-			phead->MessageID   = REQUEST_QD_REFRESH_RESOURCE;
-			phead->MessageSize = contbuffer.Cursor + 1;
-
-			appendSelfMaintainBuffer(&sendbuffer, (char *)phead, sizeof(*phead));
-			appendSelfMaintainBuffer(&sendbuffer,
-									 SMBUFF_CONTENT(&contbuffer),
-									 getSMBContentSize(&contbuffer));
-			appendSelfMaintainBuffer(&sendbuffer, (char *)ptail, sizeof(*ptail));
-
-			if ( sendWithRetry(fd,
-							   SMBUFF_CONTENT(&sendbuffer),
-							   getSMBContentSize(&sendbuffer),
-							   false) == FUNC_RETURN_OK)
-			{
-				RPCResponseRefreshResourceHeartBeatData response;
-				/* Do not care response at all. */
-				char recvbuf[sizeof(messagehead) +
-							 sizeof(messagetail) +
-							 sizeof(response)];
-
-				if ( recvWithRetry(fd,
-							       recvbuf,
-								   sizeof(recvbuf),
-								   false) != FUNC_RETURN_OK)
+				else
 				{
-					write_log("ERROR generateResourceRefreshHeartBeat recv error "
+					write_log("ERROR generateResourceRefreshHeartBeat send error "
 							  "(errno %d)", errno);
 					close(fd);
 					fd = -1;
 				}
-			}
-			else
-			{
-				write_log("ERROR generateResourceRefreshHeartBeat send error "
-						  "(errno %d)", errno);
-				close(fd);
-				fd = -1;
-			}
 
-			if ( log_min_messages <= DEBUG3 )
-			{
-				write_log("generateResourceRefreshHeartBeat sent heart-beat.");
+				if ( log_min_messages <= DEBUG3 )
+				{
+					write_log("generateResourceRefreshHeartBeat sent heart-beat.");
+				}
 			}
 		}
-		pg_usleep(rm_session_lease_heartbeat_interval * 1000000L);
 	}
 
 	destroySelfMaintainBuffer(&sendbuffer);


Mime
View raw message