hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject incubator-hawq git commit: HAWQ-141. memory accessing panic in system test
Date Tue, 10 Nov 2015 07:10:32 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 322fbf444 -> ffd8df67e


HAWQ-141. memory accessing panic in system test


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ffd8df67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ffd8df67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ffd8df67

Branch: refs/heads/master
Commit: ffd8df67e66731b6a793278c96871ddb3e7862f1
Parents: 322fbf4
Author: Yi Jin <yjin@pivotal.io>
Authored: Tue Nov 10 15:10:19 2015 +0800
Committer: Yi Jin <yjin@pivotal.io>
Committed: Tue Nov 10 15:10:19 2015 +0800

----------------------------------------------------------------------
 .../communication/rmcomm_QD2RM.c                | 198 ++++++++++++++++---
 1 file changed, 170 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ffd8df67/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index cffdec9..ea16c5f 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -48,6 +48,17 @@ void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer,
 									char 			  *queuename,
 									uint16_t 		   action,
 									List    		  *options);
+
+struct HeartBeatThreadArgData
+{
+	int		HostAddrLength;
+	int		HostAddrSize;
+	char  **HostAddrs;
+};
+typedef struct HeartBeatThreadArgData  HeartBeatThreadArgData;
+typedef struct HeartBeatThreadArgData *HeartBeatThreadArg;
+
+void freeHeartBeatThreadArg(HeartBeatThreadArg *arg);
 void *generateResourceRefreshHeartBeat(void *arg);
 
 int callSyncRPCToRM(const char 	 	   *sendbuff,
@@ -234,12 +245,66 @@ void initializeQD2RMComm(void)
     /* Start resource heart-beat thread. */
     if ( rm_session_lease_heartbeat_enable )
     {
+    	/* Resolve resource manager server address here before creating thread. */
+    	struct hostent *rmserver = gethostbyname(master_addr_host);
+    	if ( rmserver == NULL )
+    	{
+    		elog(ERROR, "failed to resolve resource manager hostname %s. herror %s",
+    					master_addr_host,
+						hstrerror(h_errno));
+    	}
+
+    	HeartBeatThreadArg tharg = malloc(sizeof(HeartBeatThreadArgData));
+    	tharg->HostAddrLength = rmserver->h_length;
+    	tharg->HostAddrs      = NULL;
+    	tharg->HostAddrSize   = 0;
+
+    	/* Get total and INET address count. */
+    	int addrcnt = 0;
+    	while( rmserver->h_addr_list[addrcnt] != NULL )
+    	{
+    		addrcnt++;
+    		if ( rmserver->h_addrtype == AF_INET )
+    		{
+    			tharg->HostAddrSize++;
+    		}
+    	}
+    	elog(DEBUG3, "Resolved resource manager host %s to %d INET addresses.",
+    				 master_addr_host,
+					 tharg->HostAddrSize);
+
+    	if ( tharg->HostAddrSize <= 0 )
+    	{
+    		freeHeartBeatThreadArg(&tharg);
+    		elog(ERROR, "Resource manager host %s does not have available INET "
+    					"address.");
+    	}
+
+    	tharg->HostAddrs = malloc(sizeof(char *) * tharg->HostAddrSize);
+
+    	int ineti = 0;
+    	for ( int i = 0 ; i < addrcnt ; ++i )
+    	{
+    		if ( rmserver->h_addrtype != AF_INET )
+    		{
+    			continue;
+    		}
+    		tharg->HostAddrs[ineti] = malloc(sizeof(char) * tharg->HostAddrLength);
+    		memcpy(tharg->HostAddrs[ineti],
+    			   rmserver->h_addr_list[i],
+				   tharg->HostAddrLength);
+    		ineti++;
+    	}
+
+
+    	/* Start heart-beat thread. */
 		if ( pthread_create(&ResourceHeartBeatThreadHandle,
 							NULL,
 							generateResourceRefreshHeartBeat,
-							NULL) != 0)
+							tharg) != 0)
 		{
-			elog(ERROR, "Fail to create background thread for communication with "
+			freeHeartBeatThreadArg(&tharg);
+			elog(ERROR, "failed to create background thread for communication with "
 						"resource manager.");
 		}
     }
@@ -1219,6 +1284,9 @@ void *generateResourceRefreshHeartBeat(void *arg)
 								   '\0','\0','\0','\0','\0','\0','\0','\0'};
 	static char messagetail[8]  = {'M' ,'S' ,'G' ,'E' ,'N' ,'D' ,'S' ,'!' };
 
+	HeartBeatThreadArg tharg = arg;
+	Assert(arg != NULL);
+
 	SelfMaintainBufferData sendbuffer;
 	SelfMaintainBufferData contbuffer;
 
@@ -1231,8 +1299,8 @@ void *generateResourceRefreshHeartBeat(void *arg)
 	prepareSelfMaintainBuffer(&sendbuffer, DEFAULT_HEARTBEAT_BUFFER, true);
 	prepareSelfMaintainBuffer(&contbuffer, DEFAULT_HEARTBEAT_BUFFER, true);
 
-	while( true ) {
-
+	while( true )
+	{
 		resetSelfMaintainBuffer(&sendbuffer);
 		resetSelfMaintainBuffer(&contbuffer);
 		bool sendcontent = false;
@@ -1263,43 +1331,117 @@ void *generateResourceRefreshHeartBeat(void *arg)
 
 		if ( sendcontent )
 		{
-			int fd = -1;
-			int res = connectToServerRemote(master_addr_host, rm_master_port, &fd);
-			if ( res == FUNC_RETURN_OK )
+			/* Connect to resource manager server. */
+			struct sockaddr_in server_addr;
+			int fd = socket(AF_INET, SOCK_STREAM, 0);
+			if ( fd < 0 )
+			{
+				write_log("ERROR generateResourceRefreshHeartBeat failed to open "
+						  "socket (errno %d)", errno);
+				break;
+			}
+			memset(&server_addr, 0, sizeof(server_addr));
+			server_addr.sin_family = AF_INET;
+			memcpy(&(server_addr.sin_addr.s_addr),
+				   tharg->HostAddrs[0],
+				   tharg->HostAddrLength);
+			server_addr.sin_port = htons(rm_master_port);
+
+			int sockres = 0;
+			while(true)
 			{
-				RMMessageHead phead = (RMMessageHead)messagehead;
-				RMMessageTail ptail = (RMMessageTail)messagetail;
-				phead->Mark1       = 0;
-				phead->Mark2       = 0;
-				phead->MessageID   = REQUEST_QD_REFRESH_RESOURCE;
-				phead->MessageSize = contbuffer.Cursor + 1;
-
-				appendSelfMaintainBuffer(&sendbuffer, (char *)phead, sizeof(*phead));
-				appendSelfMaintainBuffer(&sendbuffer, contbuffer.Buffer, contbuffer.Cursor+1);
-				appendSelfMaintainBuffer(&sendbuffer, (char *)ptail, sizeof(*ptail));
-
-				if ( sendWithRetry(fd, sendbuffer.Buffer, sendbuffer.Cursor+1, false) == FUNC_RETURN_OK)
{
-					RPCResponseRefreshResourceHeartBeatData response;
-					/* Do not care response at all. */
-					char recvbuf[16 + 8 + sizeof(response)];
-					if (recvWithRetry(fd, recvbuf, sizeof(recvbuf), false) != FUNC_RETURN_OK)
+				sockres = connect(fd,
+								  (struct sockaddr *)&server_addr,
+								  sizeof(server_addr));
+				if (sockres < 0)
+				{
+					if (errno == EINTR)
 					{
-					  write_log("generateResourceRefreshHeartBeat recv error (errno %d)", errno);
+						continue;
+					}
+					else
+					{
+						write_log("ERROR generateResourceRefreshHeartBeat "
+								  "failed to connect to resource manager, "
+								  "fd %d (errno %d)", fd, errno);
+						close(fd);
 					}
 				}
-				else
+				break;
+			}
+
+			if ( sockres < 0 )
+			{
+				pg_usleep(1000000L);
+				continue;
+			}
+
+			RMMessageHead phead = (RMMessageHead)messagehead;
+			RMMessageTail ptail = (RMMessageTail)messagetail;
+			phead->Mark1       = 0;
+			phead->Mark2       = 0;
+			phead->MessageID   = REQUEST_QD_REFRESH_RESOURCE;
+			phead->MessageSize = contbuffer.Cursor + 1;
+
+			appendSelfMaintainBuffer(&sendbuffer, (char *)phead, sizeof(*phead));
+			appendSelfMaintainBuffer(&sendbuffer, contbuffer.Buffer, contbuffer.Cursor+1);
+			appendSelfMaintainBuffer(&sendbuffer, (char *)ptail, sizeof(*ptail));
+
+			if ( sendWithRetry(fd,
+							   sendbuffer.Buffer,
+							   sendbuffer.Cursor+1,
+							   false) == FUNC_RETURN_OK)
+			{
+				RPCResponseRefreshResourceHeartBeatData response;
+				/* Do not care response at all. */
+				char recvbuf[sizeof(messagehead) +
+							 sizeof(messagetail) +
+							 sizeof(response)];
+
+				if ( recvWithRetry(fd,
+							       recvbuf,
+								   sizeof(recvbuf),
+								   false) != FUNC_RETURN_OK)
 				{
-				  write_log("generateResourceRefreshHeartBeat send error (errno %d)", errno);
+					write_log("ERROR generateResourceRefreshHeartBeat recv error "
+							  "(errno %d)", errno);
 				}
 			}
-			closeConnectionRemote(&fd);
+			else
+			{
+				write_log("ERROR generateResourceRefreshHeartBeat send error "
+						  "(errno %d)", errno);
+			}
+			close(fd);
+
+			if ( log_min_messages <= DEBUG3 )
+			{
+				write_log("generateResourceRefreshHeartBeat sent heart-beat.");
+			}
 		}
-		pg_usleep(rm_session_lease_heartbeat_interval * 1000000);
+		pg_usleep(rm_session_lease_heartbeat_interval * 1000000L);
 	}
 
+	freeHeartBeatThreadArg(&tharg);
+	write_log("generateResourceRefreshHeartBeat exits.");
 	return 0;
 }
 
+void freeHeartBeatThreadArg(HeartBeatThreadArg *arg)
+{
+	if ( *arg == NULL )
+	{
+		return;
+	}
+
+	for ( int i = 0 ; i < (*arg)->HostAddrSize ; ++i )
+	{
+		free((*arg)->HostAddrs[i]);
+	}
+	free(*arg);
+	*arg = NULL;
+}
+
 #define PG_RESQUEUE_STATUS_COLUMNS  10
 #define PG_RESQUEUE_STATUS_BUFSIZE  1024
 


Mime
View raw message