hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject incubator-hawq git commit: HAWQ-522. Add socket connection pool feature to improve resource negotiation performance
Date Mon, 14 Mar 2016 01:58:11 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 604856c09 -> 889292bfa


HAWQ-522. Add socket connection pool feature to improve resource negotiation performance


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/889292bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/889292bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/889292bf

Branch: refs/heads/master
Commit: 889292bfa2bf1a95e30d1737b02827459457f9c0
Parents: 604856c
Author: YI JIN <yjin@pivotal.io>
Authored: Mon Mar 14 12:56:16 2016 +1100
Committer: YI JIN <yjin@pivotal.io>
Committed: Mon Mar 14 12:56:16 2016 +1100

----------------------------------------------------------------------
 src/backend/cdb/cdbvars.c                       |   2 +
 src/backend/postmaster/postmaster.c             |   2 -
 .../communication/rmcomm_AsyncComm.c            | 311 ++++++++-------
 .../communication/rmcomm_Connect.c              |   6 +-
 .../communication/rmcomm_Message.c              |   6 +-
 .../communication/rmcomm_MessageHandler.c       |  15 +-
 .../communication/rmcomm_MessageServer.c        | 134 +++----
 .../communication/rmcomm_QD2RM.c                | 131 +++----
 .../communication/rmcomm_RM2RMSEG.c             |  18 +-
 .../communication/rmcomm_RMSEG2RM.c             |   6 +-
 .../communication/rmcomm_SyncComm.c             | 108 +-----
 src/backend/resourcemanager/conntrack.c         |  90 +++--
 .../include/communication/rmcomm_AsyncComm.h    |  36 +-
 .../include/communication/rmcomm_Connect.h      |   1 -
 .../communication/rmcomm_MessageServer.h        |  12 +-
 .../include/communication/rmcomm_SyncComm.h     |   9 -
 src/backend/resourcemanager/include/conntrack.h |  11 +-
 .../include/utils/network_utils.h               |  39 +-
 src/backend/resourcemanager/requesthandler.c    |  71 ++--
 src/backend/resourcemanager/resourcemanager.c   |  59 +--
 .../resourcemanager/resourcemanager_RMSEG.c     |   1 -
 src/backend/resourcemanager/resqueuemanager.c   |  17 +-
 .../resourcemanager/utils/network_utils.c       | 387 ++++++++++++-------
 src/backend/utils/misc/guc.c                    |  19 +
 src/include/cdb/cdbvars.h                       |   2 +
 25 files changed, 751 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index 4db61ab..fbb4bc0 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -302,6 +302,8 @@ char  *seg_directory;
 int    rm_master_port;
 int	   rm_segment_port;
 int	   rm_master_domain_port;
+bool   rm_enable_connpool;
+int	   rm_connpool_sameaddr_buffersize;
 
 int    rm_nvseg_perquery_limit;
 int	   rm_nvseg_perquery_perseg_limit;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/postmaster/postmaster.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 832dbef..b172eb4 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5483,8 +5483,6 @@ BackendStartup(Port *port)
 		/* And run the backend */
 		int brres = BackendRun(port);
 
-		cleanupQD2RMComm();
-
 		proc_exit(brres);
 	}
 #endif   /* EXEC_BACKEND */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
index 772adef..0084edc 100644
--- a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
+++ b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
@@ -37,12 +37,12 @@ char			RWBuffer[ASYNCCOMM_READ_WRITE_ONCE_SIZE];
 void freeCommBuffer(AsyncCommBuffer *pcommbuffer);
 
 AsyncCommBuffer createCommBuffer(int 					  fd,
-								 char					 *dmfilename,
 								 uint32_t				  actionmask,
 								 AsyncCommBufferHandlers  methods,
 								 void					 *userdata);
 
 void closeRegisteredFileDesc(AsyncCommBuffer commbuff);
+static void closeAllRegisteredFileDescs(int code, Datum arg);
 
 void initializeAsyncComm(void)
 {
@@ -53,15 +53,11 @@ void initializeAsyncComm(void)
 											 ALLOCSET_DEFAULT_INITSIZE,
 											 ALLOCSET_DEFAULT_MAXSIZE);
 	CommBufferCounter = 0;
-}
 
-bool canRegisterFileDesc(void)
-{
-	return CommBufferCounter < ASYNCCOMM_CONNECTION_MAX_CAPABILITY;
+	on_proc_exit(closeAllRegisteredFileDescs, 0);
 }
 
 int registerFileDesc(int 					  fd,
-					 char					 *dmfilename,
 					 uint32_t				  actionmask,
 					 AsyncCommBufferHandlers  methods,
 					 void 					 *userdata,
@@ -81,7 +77,6 @@ int registerFileDesc(int 					  fd,
 	}
 
 	CommBuffers[CommBufferCounter] = createCommBuffer(fd,
-													  dmfilename,
 													  actionmask,
 													  methods,
 													  userdata);
@@ -99,6 +94,31 @@ int registerFileDesc(int 					  fd,
 	return FUNC_RETURN_OK;
 }
 
+void assignFileDescClientAddressInfo(AsyncCommBuffer	 commbuffer,
+									 const char			*clienthostname,
+									 uint16_t			 serverport,
+									 struct sockaddr_in	*clientaddr,
+									 socklen_t			 clientaddrlen)
+{
+	/* Assign connection information into comm buffer. */
+	memcpy(&(commbuffer->ClientAddr), clientaddr, clientaddrlen);
+	commbuffer->ClientAddrLen = clientaddrlen;
+	strncpy(commbuffer->ClientAddrDotStr,
+			SOCKADDR(clientaddr),
+			sizeof(commbuffer->ClientAddrDotStr)-1);
+	commbuffer->ClientAddrPort = SOCKPORT(clientaddr);
+	commbuffer->ServerPort = serverport;
+
+	if ( clienthostname != NULL )
+	{
+		setSimpleStringNoLen(&(commbuffer->ClientHostname), clienthostname);
+
+		elog(DEBUG3, "Resource manager assigned hostname %s, port %d",
+					 clienthostname,
+					 commbuffer->ClientAddrPort);
+	}
+}
+
 int processAllCommFileDescs(void)
 {
 	static int FreeIndexes[ASYNCCOMM_CONNECTION_MAX_CAPABILITY];
@@ -106,8 +126,8 @@ int processAllCommFileDescs(void)
 
 	/*
 	 * This loop is to check if there are some FDs no need to check POLLOUT
-	 * event. Because, some FDs maybe usually POLLOUT ready but no data to write,
-	 * which causes CPU resource wasted.
+	 * event. Because, some FDs maybe usually POLLOUT ready but no data to
+	 * write, which causes CPU resource wasted.
 	 */
 	for ( int i = 0 ; i < CommBufferCounter ; ++i )
 	{
@@ -177,8 +197,7 @@ int processAllCommFileDescs(void)
 					CommBuffers[i]->Methods->ErrorReadyHandle(CommBuffers[i]);
 
 					/* Tell the close this connection and free the buffer. */
-					CommBuffers[i]->forcedClose = true;
-					CommBuffers[i]->toClose     = true;
+					forceCloseFileDesc(CommBuffers[i]);
 					readycount--;
 					continue;
 				}
@@ -270,9 +289,9 @@ int processAllCommFileDescs(void)
 									 CommBuffers[i]->WriteContentSize,
 									 list_length(CommBuffers[i]->WriteBuffer));
 					}
-					else if ( wrsize == -1 &&
-							  errno != EWOULDBLOCK &&
-							  errno != EAGAIN      &&
+					else if ( wrsize == -1 			&&
+							  errno != EWOULDBLOCK 	&&
+							  errno != EAGAIN      	&&
 							  errno != EINTR)
 					{
 						elog(WARNING, "FD %d failed to send message. errno %d",
@@ -283,8 +302,7 @@ int processAllCommFileDescs(void)
 						CommBuffers[i]->Methods->ErrorReadyHandle(CommBuffers[i]);
 
 						/* Not acceptable error, should actively force close. */
-						CommBuffers[i]->forcedClose = true;
-						CommBuffers[i]->toClose     = true;
+						forceCloseFileDesc(CommBuffers[i]);
 					}
 				}
 				readycount--;
@@ -300,6 +318,11 @@ int processAllCommFileDescs(void)
 					CommBuffers[i]->Methods->ReadReadyHandle(CommBuffers[i]);
 				}
 
+				elog(DEBUG3, "commbuffer action mask %d, toclose %d, forced %d",
+							 CommBuffers[i]->ActionMask,
+							 CommBuffers[i]->toClose ? 1 : 0,
+							 CommBuffers[i]->forcedClose ? 1 : 0);
+
 				/* Read ready handler might force the connection to close. */
 				if ( (CommBuffers[i]->ActionMask & ASYNCCOMM_READBYTES) &&
 					 !CommBuffers[i]->toClose &&
@@ -322,15 +345,14 @@ int processAllCommFileDescs(void)
 						 */
 						Assert(CommBuffers[i]->Methods->ReadPostHandle != NULL);
 						CommBuffers[i]->Methods->ReadPostHandle(CommBuffers[i]);
-						elog(DEBUG5, "FD %d read %d bytes. %d to handle",
+						elog(DEBUG3, "FD %d read %d bytes. %d to handle",
 									 CommBuffers[i]->FD,
 									 rdsize,
 									 getSMBContentSize(&(CommBuffers[i]->ReadBuffer)));
 					}
 					else if ( rdsize == 0 )
 					{
-						CommBuffers[i]->forcedClose = true;
-						CommBuffers[i]->toClose     = true;
+						forceCloseFileDesc(CommBuffers[i]);
 						elog(DEBUG3, "FD %d (client) is normally closed.",
 									 CommBuffers[i]->FD);
 					}
@@ -348,8 +370,11 @@ int processAllCommFileDescs(void)
 						CommBuffers[i]->Methods->ErrorReadyHandle(CommBuffers[i]);
 
 						/* Not acceptable error, should actively close. */
-						CommBuffers[i]->forcedClose = true;
-						CommBuffers[i]->toClose     = true;
+						forceCloseFileDesc(CommBuffers[i]);
+					}
+					else
+					{
+						elog(WARNING, "FD %d errno %d", CommBuffers[i]->FD, errno);
 					}
 				}
 				readycount--;
@@ -374,28 +399,35 @@ int processAllCommFileDescs(void)
 	freeidx = -1;
 	for ( int i = 0 ; i < CommBufferCounter ; ++i )
 	{
-		if ( CommBuffers[i]->toClose )
+		bool shouldfree = false;
+		if ( CommBuffers[i]->forcedClose )
 		{
-			/*
-			 * If we do not force a close request, we have to wait for cleaning
-			 * the write buffer by sending them out.
-			 */
-			int wbuffsize = list_length(CommBuffers[i]->WriteBuffer);
-			if ( !CommBuffers[i]->forcedClose && wbuffsize > 0 )
-			{
-
-				elog(DEBUG5, "FD %d has %d buffs in write buffer. Skip close.",
-							 CommBuffers[i]->FD,
-							 wbuffsize);
-				continue;
-			}
-
 			/* Call cleanup handler if necessary to do user-defined cleanup. */
 			elog(DEBUG5, "Close FD %d Index %d.", CommBuffers[i]->FD, i);
 
 			/* Close connection and free buffer */
 			closeRegisteredFileDesc(CommBuffers[i]);
+			shouldfree = true;
+		}
+		else if ( CommBuffers[i]->toClose && CommBuffers[i]->WriteBuffer == NULL )
+		{
+			if ( CommBuffers[i]->ClientHostname.Str != NULL &&
+				 CommBuffers[i]->ServerPort != 0 )
+			{
+				returnAliveConnectionRemoteByHostname(
+							&(CommBuffers[i]->FD),
+							CommBuffers[i]->ClientHostname.Str,
+							CommBuffers[i]->ServerPort);
+			}
+			else
+			{
+				closeRegisteredFileDesc(CommBuffers[i]);
+			}
+			shouldfree = true;
+		}
 
+		if ( shouldfree )
+		{
 			Assert(CommBuffers[i]->Methods->CleanUpHandle != NULL);
 			CommBuffers[i]->Methods->CleanUpHandle(CommBuffers[i]);
 			freeCommBuffer(&CommBuffers[i]); /* Now CommBuffers[i] is set NULL.*/
@@ -448,7 +480,6 @@ int processAllCommFileDescs(void)
 }
 
 AsyncCommBuffer createCommBuffer(int 					  fd,
-								 char					 *dmfilename,
 								 uint32_t				  actionmask,
 								 AsyncCommBufferHandlers  methods,
 								 void					 *userdata)
@@ -463,12 +494,12 @@ AsyncCommBuffer createCommBuffer(int 					  fd,
 	result->forcedClose 	 = false;
 	result->UserData		 = userdata;
 
-	if ( dmfilename != NULL )
-	{
-		int fstrlen = strlen(dmfilename);
-		result->DomainFileName = rm_palloc0(AsyncCommContext, fstrlen+1);
-		memcpy(result->DomainFileName, dmfilename, fstrlen+1);
-	}
+	result->ClientAddrLen	 = 0;
+	result->ClientAddrPort	 = 0;
+
+	result->ServerPort		 = 0;
+
+	initSimpleString(&(result->ClientHostname), AsyncCommContext);
 
 	initializeSelfMaintainBuffer(&(result->ReadBuffer), AsyncCommContext);
 	result->WriteBuffer 	 		 = NULL;
@@ -487,10 +518,7 @@ void freeCommBuffer(AsyncCommBuffer *pcommbuffer)
 {
 	Assert( pcommbuffer != NULL );
 
-	if ( (*pcommbuffer)->DomainFileName != NULL )
-	{
-		rm_pfree(AsyncCommContext, (*pcommbuffer)->DomainFileName);
-	}
+	freeSimpleStringContent(&((*pcommbuffer)->ClientHostname));
 
 	destroySelfMaintainBuffer(&((*pcommbuffer)->ReadBuffer));
 
@@ -509,14 +537,7 @@ void freeCommBuffer(AsyncCommBuffer *pcommbuffer)
 
 void closeRegisteredFileDesc(AsyncCommBuffer commbuff)
 {
-	if ( commbuff->DomainFileName != NULL )
-	{
-		closeConnectionDomain(&(commbuff->FD), commbuff->DomainFileName);
-	}
-	else
-	{
-		closeConnectionRemote(&(commbuff->FD));
-	}
+	closeConnectionRemote(&(commbuff->FD));
 }
 
 void closeAndRemoveAllRegisteredFileDesc(void)
@@ -538,6 +559,46 @@ void closeAndRemoveAllRegisteredFileDesc(void)
 	CommBufferCounter = 0;
 }
 
+static void closeAllRegisteredFileDescs(int code, Datum arg)
+{
+	closeAndRemoveAllRegisteredFileDesc();
+}
+void unresigsterFileDesc(int fd)
+{
+	int pos = -1;
+	for ( int i = 0 ; i < CommBufferCounter ; ++i )
+	{
+		Assert(CommBuffers[i] != NULL);
+
+		if ( CommBuffers[i]->FD == fd )
+		{
+			/* Call cleanup handler if necessary to do user-defined cleanup. */
+			CommBuffers[i]->Methods->CleanUpHandle(CommBuffers[i]);
+			elog(DEBUG5, "Unregister FD %d Index %d.", CommBuffers[i]->FD, i);
+			CommBuffers[i]->FD = -1;
+			freeCommBuffer(&CommBuffers[i]);
+			pos = i;
+			break;
+		}
+	}
+
+	/*
+	 * Shift to remove freed slot among in-use slots to shorten the length of
+	 * poll status array.
+	 */
+	if ( pos >= 0 )
+	{
+		if ( CommBufferCounter > 1 )
+		{
+			RegClients[pos].fd      = RegClients[CommBufferCounter-1].fd;
+			RegClients[pos].events  = RegClients[CommBufferCounter-1].events;
+			RegClients[pos].revents = RegClients[CommBufferCounter-1].revents;
+			CommBuffers[pos]        = CommBuffers[CommBufferCounter-1];
+		}
+		CommBufferCounter--;
+	}
+}
+
 void addMessageContentToCommBuffer(AsyncCommBuffer 		buffer,
 								   SelfMaintainBuffer 	content)
 {
@@ -580,8 +641,16 @@ void shiftOutFirstWriteBuffer(AsyncCommBuffer commbuffer)
 	commbuffer->WriteContentOriginalSize = commbuffer->WriteContentSize;
 }
 
-int registerAsyncConnectionFileDesc(const char				*sockpath,
-									const char				*address,
+void closeFileDesc(AsyncCommBuffer commbuff)
+{
+	commbuff->toClose = true;
+}
+void forceCloseFileDesc(AsyncCommBuffer commbuff)
+{
+	commbuff->toClose 	  = true;
+	commbuff->forcedClose = true;
+}
+int registerAsyncConnectionFileDesc(const char				*address,
 									uint16_t				 port,
 									uint32_t				 actionmask,
 									AsyncCommBufferHandlers  methods,
@@ -590,24 +659,43 @@ int registerAsyncConnectionFileDesc(const char				*sockpath,
 {
 	int					res				= FUNC_RETURN_OK;
 	int 				fd 				= -1;
-	bool				dconn 			= false;
-	char			   *dfilename		= NULL;
 	int					sockres			= 0;
-	struct hostent 	   *server  		= NULL;
-	struct sockaddr_un  sockaddr_domain;
-	struct sockaddr_in 	sockaddr_inet;
-	int 				len 			= 0;
+	struct sockaddr_in 	server_addr;
+
+	/* Prepare for connecting. */
+	AddressString resolvedaddr = getAddressStringByHostName(address);
+	if ( resolvedaddr == NULL )
+	{
+		write_log("Failed to get host by name %s for async connecting a remote "
+				  "socket server %s:%d",
+				  address,
+				  address,
+				  port);
+		return UTIL_NETWORK_FAIL_GETHOST;
+	}
 
-	/* Decide to use domain or remote socket connection. */
-	Assert( (sockpath == NULL && address != NULL && port > 0) ||
-			(sockpath != NULL && address == NULL && port == 0) );
-	dconn = sockpath != NULL;
+	if ( rm_enable_connpool )
+	{
+		/* Try to get an alive connection from connection pool. */
+		fd = fetchAliveSocketConnection(address, resolvedaddr, port);
+	}
+
+	if ( fd != -1 )
+	{
+		res = registerFileDesc(fd,
+							   actionmask,
+							   methods,
+							   userdata,
+							   newcommbuffer);
+		goto exit;
+	}
 
 	/* Create socket FD */
-	fd = socket(dconn ? AF_UNIX : AF_INET, SOCK_STREAM, 0);
+	fd = socket(AF_INET, SOCK_STREAM, 0);
 	if ( fd < 0 )
 	{
-		write_log("registerAsyncConnectionFileDesc open socket failed (errno %d)",
+		write_log("Failed to open socket for async connecting a remote socket "
+				  "(errno %d)",
 				  errno);
 		return UTIL_NETWORK_FAIL_CREATESOCKET;
 	}
@@ -619,62 +707,15 @@ int registerAsyncConnectionFileDesc(const char				*sockpath,
 		return UTIL_NETWORK_FAIL_SETFCNTL;
 	}
 
-	/* Prepare for connecting. */
-	if ( dconn )
-	{
-		memset( &sockaddr_domain, 0, sizeof(struct sockaddr_un) );
-		sockaddr_domain.sun_family = AF_UNIX;
-		sprintf(sockaddr_domain.sun_path, "%s.%d.%lu.%d",
-				sockpath,
-				getpid(),
-				(unsigned long)pthread_self(),
-				ASYNCCOMM_CONN_FILEINDEX);
-		len = offsetof(struct sockaddr_un, sun_path) + strlen(sockaddr_domain.sun_path);
-		unlink(sockaddr_domain.sun_path);
-		dfilename = rm_palloc0(AsyncCommContext, strlen(sockaddr_domain.sun_path) + 1);
-		strcpy(dfilename, sockaddr_domain.sun_path);
-
-		sockres = bind(fd, (struct sockaddr *)&sockaddr_domain, len);
-		if ( sockres < 0 )
-		{
-		  write_log("connectToServerDomain bind socket failed %s, fd %d (errno %d)",
-				    dfilename,
-					fd,
-					errno);
-		  closeConnectionDomain(&fd, dfilename);
-		  rm_pfree(AsyncCommContext, dfilename);
-		  return UTIL_NETWORK_FAIL_BIND;
-		}
-
-		memset( &sockaddr_domain, 0, sizeof(struct sockaddr_un) );
-		sockaddr_domain.sun_family = AF_UNIX;
-		sprintf(sockaddr_domain.sun_path, "%s", sockpath);
-		len = offsetof(struct sockaddr_un, sun_path) + strlen(sockaddr_domain.sun_path);
-	}
-	else
-	{
-		server = gethostbyname(address);
-		if ( server == NULL )
-		{
-			write_log("connectToServerRemote resove address %s failed. (herrno %d)",
-					  address,
-					  h_errno);
-			closeConnectionRemote(&fd);
-			return UTIL_NETWORK_FAIL_GETHOST;
-		}
-		bzero((char *)&sockaddr_inet, sizeof(sockaddr_inet));
-		sockaddr_inet.sin_family = AF_INET;
-		bcopy((char *)server->h_addr, (char *)&sockaddr_inet.sin_addr.s_addr, server->h_length);
-		sockaddr_inet.sin_port = htons(port);
-		len = sizeof(sockaddr_inet);
-	}
+	bzero((char *)&server_addr, sizeof(server_addr));
+	server_addr.sin_family = AF_INET;
+	memcpy((char *)&server_addr.sin_addr.s_addr,
+		   resolvedaddr->Address,
+		   resolvedaddr->Length);
+	server_addr.sin_port = htons(port);
 
 	/* Asynchronous connect. Should return value at once. */
-	sockres = connect(fd,
-					  dconn ?
-					    (struct sockaddr *)&sockaddr_domain :
-						(struct sockaddr *)&sockaddr_inet,
-					  len);
+	sockres = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
 	if ( sockres == 0 )
 	{
 		/*
@@ -683,11 +724,17 @@ int registerAsyncConnectionFileDesc(const char				*sockpath,
 		 * perform content sending and receiving.
 		 */
 		res = registerFileDesc(fd,
-						 	   dfilename,
 							   actionmask,
 							   methods,
 							   userdata,
 							   newcommbuffer);
+
+		/* Assign connection address. */
+		assignFileDescClientAddressInfo(*newcommbuffer,
+										address,
+										port,
+										&server_addr,
+										sizeof(server_addr));
 		goto exit;
 	}
 	else if ( sockres < 0 && errno == EINPROGRESS )
@@ -697,19 +744,25 @@ int registerAsyncConnectionFileDesc(const char				*sockpath,
 		 * asynchronous check. Build asynchronous connection commbuffer.
 		 */
 		res = registerFileDescForAsyncConn(fd,
-										   dfilename,
 										   actionmask,
 										   methods,
 										   userdata,
 										   newcommbuffer);
+
+		/* Assign connection address. */
+		assignFileDescClientAddressInfo(*newcommbuffer,
+										address,
+										port,
+										&server_addr,
+										sizeof(server_addr));
+
 		goto exit;
 	}
 	else
 	{
 		/* Fail to build connection. */
-		 write_log("registerAsyncConnectionFileDesc connect socket failed %s, "
+		 write_log("registerAsyncConnectionFileDesc connect socket failed, "
 				   "fd %d (errno %d)",
-				   dfilename == NULL ? "" : dfilename,
 				   fd,
 				   errno);
 		 close(fd);
@@ -717,9 +770,5 @@ int registerAsyncConnectionFileDesc(const char				*sockpath,
 	}
 
 exit:
-	if( dfilename != NULL )
-	{
-		rm_pfree(AsyncCommContext, dfilename);
-	}
 	return res;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_Connect.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_Connect.c b/src/backend/resourcemanager/communication/rmcomm_Connect.c
index db1640e..fd916da 100644
--- a/src/backend/resourcemanager/communication/rmcomm_Connect.c
+++ b/src/backend/resourcemanager/communication/rmcomm_Connect.c
@@ -71,9 +71,7 @@ void WriteReadyHandler_Connect(AsyncCommBuffer buffer)
 	if ( shouldclose )
 	{
 		ErrorHandler_Connect(buffer);
-
-		buffer->toClose 	= true;
-		buffer->forcedClose = true;
+		forceCloseFileDesc(buffer);
 		return;
 	}
 
@@ -123,7 +121,6 @@ void CleanUpHandler_Connect(AsyncCommBuffer buffer)
 }
 
 int registerFileDescForAsyncConn(int 		 			  fd,
-								 char					 *dmfilename,
 								 uint32_t				  actionmask_afterconn,
 								 AsyncCommBufferHandlers  methods_afterconn,
 								 void				     *userdata_afterconn,
@@ -139,7 +136,6 @@ int registerFileDescForAsyncConn(int 		 			  fd,
 	elog(DEBUG3, "Created AsyncComm Conn context.");
 
 	int res = registerFileDesc(fd,
-					 	 	   dmfilename,
 							   ASYNCCOMM_WRITE,
 							   &AsyncCommBufferHandlersConn,
 							   userdata,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_Message.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_Message.c b/src/backend/resourcemanager/communication/rmcomm_Message.c
index 663a88d..364c8a2 100644
--- a/src/backend/resourcemanager/communication/rmcomm_Message.c
+++ b/src/backend/resourcemanager/communication/rmcomm_Message.c
@@ -67,8 +67,7 @@ void ReadPostHandler_Message(AsyncCommBuffer buffer)
 				 * framework, therefore, here we force to close the connection.
 				 * No more processing is needed.
 				 */
-				buffer->forcedClose = true;
-				buffer->toClose     = true;
+				forceCloseFileDesc(buffer);
 				elog(WARNING, "AsyncComm framework received invalid message head. "
 							  "Close the connection FD %d.",
 							  buffer->FD);
@@ -143,8 +142,7 @@ void ReadPostHandler_Message(AsyncCommBuffer buffer)
 				/* We get wrong message content. */
 				elog(WARNING, "AsyncComm framework received wrong message tail "
 							  "content.");
-				buffer->forcedClose = true;
-				buffer->toClose     = true;
+				forceCloseFileDesc(buffer);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c b/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
index 34e2744..212ad46 100644
--- a/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
+++ b/src/backend/resourcemanager/communication/rmcomm_MessageHandler.c
@@ -320,16 +320,14 @@ void performMessageForErrorActionForWritingMessageByID(int  		   messageid,
 	case ERRINJ_DISCONNECT_BEFORE_SEND:
 		if ( beforeact )
 		{
-			buffer->toClose     = true;
-			buffer->forcedClose = true;
+			forceCloseFileDesc(buffer);
 		}
 		break;
 	case ERRINJ_DISCONNECT_AFTER_SEND:
 	case ERRINJ_DISCONNECT_AFTER_PROC:
 		if ( !beforeact )
 		{
-			buffer->toClose     = true;
-			buffer->forcedClose = true;
+			forceCloseFileDesc(buffer);
 		}
 		break;
 	case ERRINJ_PARTIAL_SEND:
@@ -351,8 +349,7 @@ void performMessageForErrorActionForWritingMessageByID(int  		   messageid,
 						  buffer->WriteContentOriginalSize,
 						  buffer->WriteContentSize);
 			}
-			buffer->toClose     = true;
-			buffer->forcedClose = false;
+			closeFileDesc(buffer);
 		}
 		break;
 	case ERRINJ_PARTIAL_SEND_FRAME_TAIL:
@@ -376,8 +373,7 @@ void performMessageForErrorActionForWritingMessageByID(int  		   messageid,
 						  buffer->WriteContentSize);
 
 			}
-			buffer->toClose     = true;
-			buffer->forcedClose = false;
+			closeFileDesc(buffer);
 		}
 		break;
 	case ERRINJ_PARTIAL_SEND_FRAME_HEAD:
@@ -397,8 +393,7 @@ void performMessageForErrorActionForWritingMessageByID(int  		   messageid,
 						  buffer->WriteContentOriginalSize,
 						  buffer->WriteContentSize);
 			}
-			buffer->toClose     = true;
-			buffer->forcedClose = false;
+			closeFileDesc(buffer);
 		}
 		break;
 	case ERRINJ_PARTIAL_RECV:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_MessageServer.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_MessageServer.c b/src/backend/resourcemanager/communication/rmcomm_MessageServer.c
index 1f472fe..ae404d6 100644
--- a/src/backend/resourcemanager/communication/rmcomm_MessageServer.c
+++ b/src/backend/resourcemanager/communication/rmcomm_MessageServer.c
@@ -36,7 +36,7 @@ AsyncCommBufferHandlersData AsyncCommBufferHandlersMsgServer = {
 	CleanUpHandler_MsgServer
 };
 
-AsyncCommMessageHandlerContext createConnTrackHandlerContext(ConnectionTrack newtrack);
+AsyncCommMessageHandlerContext createConnTrackHandlerContext(void);
 
 void ReadReadyHandler_MsgServer(AsyncCommBuffer buffer)
 {
@@ -44,94 +44,54 @@ void ReadReadyHandler_MsgServer(AsyncCommBuffer buffer)
 	ConnectionTrack 	newtrack 		= NULL;
 	ConnectionTrackData tmptrackdata;
 	AsyncCommBuffer     newcommbuffer   = NULL;
+	struct sockaddr_in	clientaddr;
+	socklen_t			clientaddrlen	= sizeof(clientaddr);
+	int					fd				= -1;
 
-	/* For each new client connection, we use one new connection tracker. */
-	res = useConnectionTrack(&newtrack);
-	if ( res == FUNC_RETURN_OK && canRegisterFileDesc() )
+	/* Always accept the connection. */
+	fd = accept(buffer->FD, (struct sockaddr *)&clientaddr, &clientaddrlen);
+	if ( fd == -1 )
 	{
-		/* The connection track progress must be at initial state. */
-		Assert(newtrack->Progress == CONN_PP_INFO_NOTSET);
-		/* Accept connection. */
-		newtrack->ClientAddrLen = sizeof(newtrack->ClientAddr);
-		newtrack->ClientSocket  = accept(buffer->FD,
-										 (struct sockaddr *)&(newtrack->ClientAddr),
-										 &(newtrack->ClientAddrLen));
-		if ( newtrack->ClientSocket == -1 )
-		{
-			elog(WARNING, "Resource manager socket accept error is detected. "
-						  "This connection is to be closed. errno %d",
-						  errno);
-			/* Return connection track. */
-			returnConnectionTrack(newtrack);
-		}
-		else
-		{
-			/* Set client connection address and port. */
-			strncpy(newtrack->ClientAddrDotStr,
-					SOCKADDR(&(newtrack->ClientAddr)),
-					sizeof(newtrack->ClientAddrDotStr)-1);
-			newtrack->ClientAddrPort = SOCKPORT(&(newtrack->ClientAddr));
-			newtrack->ConnectTime    = gettime_microsec();
-			newtrack->LastActTime    = newtrack->ConnectTime;
-
-			/* Create AsyncComm Message handler instance. */
-			AsyncCommMessageHandlerContext context =
-										createConnTrackHandlerContext(newtrack);
-
-			/* Add new client fd into AsyncComm manager. */
-			res = registerFileDesc(newtrack->ClientSocket,
-								   NULL,
-								   ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
-								   &AsyncCommBufferHandlersMessage,
-								   context,
-								   &newcommbuffer);
-			if ( res != FUNC_RETURN_OK )
-			{
-				elog(WARNING, "Resource manager can not track client FD %d. %d",
-							  newtrack->ClientSocket,
-							  res);
-				closeConnectionRemote(&newtrack->ClientSocket);
-				rm_pfree(AsyncCommContext, context);
-				returnConnectionTrack(newtrack);
-				return;
-			}
-
-			/* Make the connection tracker able to reference AsyncComm buffer */
-			newtrack->CommBuffer = newcommbuffer;
-			context->AsyncBuffer = newcommbuffer;
-
-			transformConnectionTrackProgress(newtrack, CONN_PP_ESTABLISHED);
-
-			elog(DEBUG3, "Resource manager accepted one client connected from "
-						 "%s:%d FD %d. connection track %lx\n",
-						 newtrack->ClientAddrDotStr,
-						 newtrack->ClientAddrPort,
-						 newtrack->ClientSocket,
-						 (unsigned long)newtrack);
-
-			/* Call callback function to initialize context for message handlers. */
-			InitHandler_Message(newcommbuffer);
-		}
+		elog(WARNING, "Resource manager socket accept error is detected. errno %d",
+					  errno);
+		return;
 	}
-	else
+
+	/* Create AsyncComm Message handler instance. */
+	AsyncCommMessageHandlerContext context = createConnTrackHandlerContext();
+
+	/* Add new client fd into AsyncComm manager. */
+	res = registerFileDesc(fd,
+						   ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
+						   &AsyncCommBufferHandlersMessage,
+						   context,
+						   &newcommbuffer);
+	if ( res != FUNC_RETURN_OK )
 	{
-		/* Accept but close the connection. */
-		tmptrackdata.ClientAddrLen = sizeof(tmptrackdata.ClientAddr);
-		tmptrackdata.ClientSocket  = accept(buffer->FD,
-											(struct sockaddr *)&(tmptrackdata.ClientAddr),
-											&(tmptrackdata.ClientAddrLen));
-		if ( tmptrackdata.ClientSocket != -1 )
-		{
-			elog(WARNING, "Resource manager cannot add more connections. Accept "
-						  "but close the connection. FD %d.",
-						  tmptrackdata.ClientSocket);
-			closeConnectionRemote(&tmptrackdata.ClientSocket);
-		}
-		if ( newtrack != NULL )
-		{
-			returnConnectionTrack(newtrack);
-		}
+		Assert(newcommbuffer == NULL);
+		/* close the connection and cleanup. */
+		closeConnectionRemote(&fd);
+		rm_pfree(AsyncCommContext, context);
+		return;
 	}
+
+	assignFileDescClientAddressInfo(newcommbuffer,
+									NULL,
+									0,
+									&clientaddr,
+									clientaddrlen);
+
+	/* Let context able to track comm buffer. */
+	context->AsyncBuffer = newcommbuffer;
+
+	/* Call callback function to initialize context for message handlers. */
+	InitHandler_Message(newcommbuffer);
+
+	elog(DEBUG3, "Resource manager accepted one client connected from "
+				 "%s:%d as FD %d.\n",
+				 newcommbuffer->ClientAddrDotStr,
+				 newcommbuffer->ClientAddrPort,
+				 newcommbuffer->FD);
 }
 
 void ErrorHandler_MsgServer(AsyncCommBuffer buffer)
@@ -147,16 +107,16 @@ void CleanUpHandler_MsgServer(AsyncCommBuffer buffer)
 }
 
 /* Register message handlers for clien FDs. */
-AsyncCommMessageHandlerContext createConnTrackHandlerContext(ConnectionTrack newtrack)
+AsyncCommMessageHandlerContext createConnTrackHandlerContext(void)
 {
 	AsyncCommMessageHandlerContext result =
 			rm_palloc0(AsyncCommContext,
 					   sizeof(AsyncCommMessageHandlerContextData));
 
 	result->inMessage				= false;
-	result->UserData  				= newtrack;
+	result->UserData  				= NULL;
 	result->MessageRecvReadyHandler = NULL;
-	result->MessageRecvedHandler 	= addNewMessageToConnTrack;
+	result->MessageRecvedHandler 	= addMessageToConnTrack;
 	result->MessageSendReadyHandler = NULL;
 	result->MessageSentHandler		= sentMessageFromConnTrack;
 	result->MessageErrorHandler 	= hasCommErrorInConnTrack;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
index 65837a2..d7d18ba 100644
--- a/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_QD2RM.c
@@ -193,13 +193,6 @@ DQueue buildResourceDistRowData(MCTYPE 				context,
  *------------------------------------------------------------------------------
  */
 
-#ifdef ENABLE_DOMAINSERVER
-/* Reference global configure.   */
-extern char 	   *UnixSocketDir;
-/* Unix domain socket file.      */
-char				QD2RM_SocketFile[1024];
-#endif
-
 MemoryContext		QD2RM_CommContext			  = NULL;
 QDResourceContext  *QD2RM_ResourceSets            = NULL;
 int					QD2RM_ResourceSetSize         = 0;
@@ -249,10 +242,6 @@ void initializeQD2RMComm(void)
 					"resource manager.");
     }
 
-#ifdef ENABLE_DOMAINSERVER
-    /* Get UNIX domain socket file. */
-    UNIXSOCK_PATH(QD2RM_SocketFile, rm_master_domain_port, UnixSocketDir);
-#endif
     /* Initialize global variables for maintaining a list of resource sets. */
     QD2RM_ResourceSets 	   = rm_palloc0(QD2RM_CommContext,
             							sizeof(QDResourceContext) *
@@ -342,6 +331,8 @@ void initializeQD2RMComm(void)
 
     initializeMessageHandlers();
 
+    initializeSocketConnectionPool();
+
     QD2RM_Initialized = true;
 }
 
@@ -461,6 +452,8 @@ int cleanupQD2RMComm(void)
 	int res = FUNC_RETURN_OK;
 	char errorbuf[ERRORMESSAGE_SIZE];
 
+	elog(LOG, "Clean up communication to resource manager now.");
+
 	initializeQD2RMComm();
 
 	pthread_mutex_lock(&ResourceSetsMutex);
@@ -1376,6 +1369,8 @@ void *generateResourceRefreshHeartBeat(void *arg)
 								   '\0','\0','\0','\0','\0','\0','\0','\0'};
 	static char messagetail[8]  = {'M' ,'S' ,'G' ,'E' ,'N' ,'D' ,'S' ,'!' };
 
+	int fd = -1;
+
 	HeartBeatThreadArg tharg = arg;
 	Assert(arg != NULL);
 
@@ -1423,49 +1418,54 @@ void *generateResourceRefreshHeartBeat(void *arg)
 
 		if ( sendcontent )
 		{
-			/* Connect to resource manager server. */
-			struct sockaddr_in server_addr;
-			int fd = socket(AF_INET, SOCK_STREAM, 0);
+			/* Connect to server only when necessary. */
 			if ( fd < 0 )
 			{
-				write_log("ERROR generateResourceRefreshHeartBeat failed to open "
-						  "socket (errno %d)", errno);
-				break;
-			}
-			memset(&server_addr, 0, sizeof(server_addr));
-			server_addr.sin_family = AF_INET;
-			memcpy(&(server_addr.sin_addr.s_addr),
-				   tharg->HostAddrs[0],
-				   tharg->HostAddrLength);
-			server_addr.sin_port = htons(rm_master_port);
-
-			int sockres = 0;
-			while(true)
-			{
-				sockres = connect(fd,
-								  (struct sockaddr *)&server_addr,
-								  sizeof(server_addr));
-				if (sockres < 0)
+				/* Connect to resource manager server. */
+				struct sockaddr_in server_addr;
+				fd = socket(AF_INET, SOCK_STREAM, 0);
+				if ( fd < 0 )
 				{
-					if (errno == EINTR)
-					{
-						continue;
-					}
-					else
+					write_log("ERROR generateResourceRefreshHeartBeat failed to open "
+							  "socket (errno %d)", errno);
+					break;
+				}
+				memset(&server_addr, 0, sizeof(server_addr));
+				server_addr.sin_family = AF_INET;
+				memcpy(&(server_addr.sin_addr.s_addr),
+					   tharg->HostAddrs[0],
+					   tharg->HostAddrLength);
+				server_addr.sin_port = htons(rm_master_port);
+
+				int sockres = 0;
+				while(true)
+				{
+					sockres = connect(fd,
+									  (struct sockaddr *)&server_addr,
+									  sizeof(server_addr));
+					if (sockres < 0)
 					{
-						write_log("ERROR generateResourceRefreshHeartBeat "
-								  "failed to connect to resource manager, "
-								  "fd %d (errno %d)", fd, errno);
-						close(fd);
+						if (errno == EINTR)
+						{
+							continue;
+						}
+						else
+						{
+							write_log("ERROR generateResourceRefreshHeartBeat "
+									  "failed to connect to resource manager, "
+									  "fd %d (errno %d)", fd, errno);
+							close(fd);
+						}
 					}
+					break;
+				}
+
+				if ( sockres < 0 )
+				{
+					pg_usleep(1000000L);
+					continue;
 				}
-				break;
-			}
 
-			if ( sockres < 0 )
-			{
-				pg_usleep(1000000L);
-				continue;
 			}
 
 			RMMessageHead phead = (RMMessageHead)messagehead;
@@ -1499,14 +1499,17 @@ void *generateResourceRefreshHeartBeat(void *arg)
 				{
 					write_log("ERROR generateResourceRefreshHeartBeat recv error "
 							  "(errno %d)", errno);
+					close(fd);
+					fd = -1;
 				}
 			}
 			else
 			{
 				write_log("ERROR generateResourceRefreshHeartBeat send error "
 						  "(errno %d)", errno);
+				close(fd);
+				fd = -1;
 			}
-			close(fd);
 
 			if ( log_min_messages <= DEBUG3 )
 			{
@@ -1516,6 +1519,9 @@ void *generateResourceRefreshHeartBeat(void *arg)
 		pg_usleep(rm_session_lease_heartbeat_interval * 1000000L);
 	}
 
+	close(fd);
+	fd = -1;
+
 	freeHeartBeatThreadArg(&tharg);
 	write_log("generateResourceRefreshHeartBeat exits.");
 	return 0;
@@ -2774,24 +2780,13 @@ int callSyncRPCToRM(const char 	 	   *sendbuff,
 					char			   *errorbuf,
 					int					errorbufsize)
 {
-#ifdef ENABLE_DOMAINSERVER
-		return callSyncRPCDomain(QD2RM_SocketFile,
-								 sendbuff,
-								 sendbuffsize,
-								 sendmsgid,
-								 exprecvmsgid,
-								 recvsmb,
-								 errorbuf,
-								 errorbufsize);
-#else
-		return callSyncRPCRemote(master_addr_host,
-								 rm_master_port,
-								 sendbuff,
-								 sendbuffsize,
-								 sendmsgid,
-								 exprecvmsgid,
-								 recvsmb,
-								 errorbuf,
-								 errorbufsize);
-#endif
+	return callSyncRPCRemote(master_addr_host,
+							 rm_master_port,
+							 sendbuff,
+							 sendbuffsize,
+							 sendmsgid,
+							 exprecvmsgid,
+							 recvsmb,
+							 errorbuf,
+							 errorbufsize);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index b8aa234..fe4a595 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -158,8 +158,7 @@ int sendRUAlive(char *seghostname)
 	context->UserData                = (void *)segres;
 
 	/* Connect to HAWQ RM server */
-	res = registerAsyncConnectionFileDesc(NULL,
-										  seghostname,
+	res = registerAsyncConnectionFileDesc(seghostname,
 										  rm_segment_port,
 										  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
 										  &AsyncCommBufferHandlersMessage,
@@ -253,8 +252,7 @@ void receivedRUAliveResponse(AsyncCommMessageHandlerContext  context,
 	}
 
 	setSegResRUAlivePending(segres, false);
-	context->AsyncBuffer->toClose     = true;
-	context->AsyncBuffer->forcedClose = false;
+	closeFileDesc(context->AsyncBuffer);
 }
 
 void sentRUAlive(AsyncCommMessageHandlerContext context)
@@ -358,8 +356,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
 
     elog(DEBUG3, "Created AsyncComm Message context for Async Conn.");
 
-	res = registerAsyncConnectionFileDesc(NULL,
-										  seghostname,
+	res = registerAsyncConnectionFileDesc(seghostname,
 										  rm_segment_port,
 										  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
 										  &AsyncCommBufferHandlersMessage,
@@ -435,8 +432,7 @@ void recvIncreaseMemoryQuotaResponse(AsyncCommMessageHandlerContext	context,
 	}
 
     processContainersAfterIncreaseMemoryQuota(ctns, acceptedcontainer);
-    context->AsyncBuffer->toClose     = true;
-    context->AsyncBuffer->forcedClose = false;
+    closeFileDesc(context->AsyncBuffer);
 }
 
 void sentIncreaseMemoryQuota(AsyncCommMessageHandlerContext context)
@@ -523,8 +519,7 @@ int decreaseMemoryQuota(char 			*seghostname,
     context->MessageErrorHandler     = sentDecreaseMemoryQuotaError;
     context->MessageCleanUpHandler   = sentDecreaseMemoryQuotaCleanup;
 
-	res = registerAsyncConnectionFileDesc(NULL,
-										  seghostname,
+	res = registerAsyncConnectionFileDesc(seghostname,
 										  rm_segment_port,
 										  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
 										  &AsyncCommBufferHandlersMessage,
@@ -599,8 +594,7 @@ void recvDecreaseMemoryQuotaResponse(AsyncCommMessageHandlerContext	context,
         		  rsp->Result);
     }
     processContainersAfterDecreaseMemoryQuota(ctns, kickedcontainer);
-    context->AsyncBuffer->toClose     = true;
-    context->AsyncBuffer->forcedClose = false;
+    closeFileDesc(context->AsyncBuffer);
 }
 
 void sentDecreaseMemoryQuota(AsyncCommMessageHandlerContext context)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
index 30256a4..77ea201 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
@@ -141,8 +141,7 @@ int sendIMAlive(int  *errorcode,
 
 	/* Connect to HAWQ RM server */
 
-	res = registerAsyncConnectionFileDesc(NULL,
-										  DRMGlobalInstance->SendToStandby?
+	res = registerAsyncConnectionFileDesc(DRMGlobalInstance->SendToStandby?
 										  standby_addr_host:
 										  master_addr_host,
 										  rm_master_port,
@@ -203,8 +202,7 @@ void receivedIMAliveResponse(AsyncCommMessageHandlerContext  context,
 				  response->Result);
 	}
 
-	context->AsyncBuffer->toClose     = true;
-	context->AsyncBuffer->forcedClose = false;
+	closeFileDesc(context->AsyncBuffer);
 }
 
 void sentIMAlive(AsyncCommMessageHandlerContext context)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_SyncComm.c b/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
index ba5e9c8..dfc89d7 100644
--- a/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
+++ b/src/backend/resourcemanager/communication/rmcomm_SyncComm.c
@@ -60,96 +60,6 @@ void sentSyncRPCRequest(AsyncCommMessageHandlerContext context);
 void sentSyncRPCRequestError(AsyncCommMessageHandlerContext context);
 void sentSyncRPCRequestCleanUp(AsyncCommMessageHandlerContext context);
 
-
-
-int callSyncRPCDomain(const char     	   *sockfile,
-					  const char 	 	   *sendbuff,
-		        	  int   		  		sendbuffsize,
-					  uint16_t		  		sendmsgid,
-					  uint16_t 		  		exprecvmsgid,
-					  SelfMaintainBuffer 	recvsmb,
-					  char				   *errorbuf,
-					  int					errorbufsize)
-{
-	static char            			dfilename[DOMAINSOCKET_FILE_SIZE];
-	int 							fd 			  = -1;
-	int 							res 		  = FUNC_RETURN_OK;
-	AsyncCommBuffer					newcommbuffer = NULL;
-	AsyncCommMessageHandlerContext 	context 	  = NULL;
-	SyncRPCContextData 				userdata;
-
-	/* Connect to the server side. */
-	res = connectToServerDomain(sockfile, 0, &fd, 0, dfilename);
-	if ( res != FUNC_RETURN_OK )
-	{
-		snprintf(errorbuf, errorbufsize,
-				 "failed to connect to domain socket server %s",
-				 sockfile);
-
-		elog(WARNING, "%s", errorbuf);
-		goto exit;
-	}
-
-	initializeSyncRPContent(&userdata, recvsmb, exprecvmsgid);
-	context = createMessageHandlerContext(&userdata);
-
-	res = registerFileDesc(fd,
-						   dfilename,
-						   ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
-						   &AsyncCommBufferHandlersMessage,
-						   context,
-						   &newcommbuffer);
-	if ( res != FUNC_RETURN_OK )
-	{
-		rm_pfree(AsyncCommContext, context);
-		elog(WARNING, "Fail to register FD for synchronous communication. %d", res);
-		goto exit;
-	}
-
-	buildMessageToCommBuffer(newcommbuffer,
-							 sendbuff,
-							 sendbuffsize,
-							 sendmsgid,
-							 0,
-							 0);
-	context->AsyncBuffer = newcommbuffer;
-
-	InitHandler_Message(newcommbuffer);
-
-	/* Wait for the complete of the communication. */
-	while( true )
-	{
-		processAllCommFileDescs();
-		if ( userdata.CommStatus == SYNC_RPC_COMM_IDLE )
-		{
-			break;
-		}
-		else if ( QueryCancelPending )
-		{
-			/*
-			 * We find that this QD wants to cancel the query, we don't need
-			 * to continue the communication.
-			 */
-			res = TRANSCANCEL_INPROGRESS;
-			break;
-		}
-	}
-
-	res = res == TRANSCANCEL_INPROGRESS ? res : userdata.Result;
-
-	/* Close and cleanup */
-	closeAndRemoveAllRegisteredFileDesc();
-
-	if (res != FUNC_RETURN_OK)
-	{
-	  elog(WARNING, "Sync RPC framework (domain) finds exception raised.");
-	}
-	return res;
-exit:
-	closeConnectionDomain(&fd, dfilename);
-	return res;
-}
-
 int callSyncRPCRemote(const char     	   *hostname,
 					  uint16_t              port,
 		  	  	  	  const char 	 	   *sendbuff,
@@ -182,7 +92,6 @@ int callSyncRPCRemote(const char     	   *hostname,
 	context = createMessageHandlerContext(&userdata);
 
 	res = registerFileDesc(fd,
-						   NULL,
 						   ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
 						   &AsyncCommBufferHandlersMessage,
 						   context,
@@ -201,12 +110,7 @@ int callSyncRPCRemote(const char     	   *hostname,
 		goto exit;
 	}
 
-	buildMessageToCommBuffer(newcommbuffer,
-							 sendbuff,
-							 sendbuffsize,
-							 sendmsgid,
-							 0,
-							 0);
+	buildMessageToCommBuffer(newcommbuffer, sendbuff, sendbuffsize, sendmsgid, 0, 0);
 
 	context->AsyncBuffer = newcommbuffer;
 
@@ -234,7 +138,15 @@ int callSyncRPCRemote(const char     	   *hostname,
 	res = res == TRANSCANCEL_INPROGRESS ? res : userdata.Result;
 
 	/* Close and cleanup */
-	closeAndRemoveAllRegisteredFileDesc();
+	unresigsterFileDesc(fd);
+	if ( res == FUNC_RETURN_OK )
+	{
+		returnAliveConnectionRemoteByHostname(&fd, hostname, port);
+	}
+	else
+	{
+		closeConnectionRemote(&fd);
+	}
 
 	if (res != FUNC_RETURN_OK)
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/conntrack.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/conntrack.c b/src/backend/resourcemanager/conntrack.c
index 972aa48..43fba9f 100644
--- a/src/backend/resourcemanager/conntrack.c
+++ b/src/backend/resourcemanager/conntrack.c
@@ -22,6 +22,8 @@
 #include "communication/rmcomm_MessageHandler.h"
 #include "communication/rmcomm_QD_RM_Protocol.h"
 
+void cutReferenceOfConnTrackAndCommBuffer(AsyncCommMessageHandlerContext context);
+
 /* Initialize connection track manager. */
 void initializeConnectionTrackManager(void)
 {
@@ -94,16 +96,13 @@ void createEmptyConnectionTrack(ConnectionTrack *track)
 	/* Create new entry in connection track. */
 	(*track) = rm_palloc0(PCONTEXT, sizeof(ConnectionTrackData));
 
-	(*track)->ConnectTime	 			= 0;
+	(*track)->RequestTime	 			= 0;
 	(*track)->RegisterTime   			= 0;
 	(*track)->ResRequestTime 			= 0;
 	(*track)->ResAllocTime	 			= 0;
 	(*track)->LastActTime	 			= 0;
 	(*track)->HeadQueueTime				= 0;
 
-	(*track)->ClientAddrLen  			= 0;
-	(*track)->ClientSocket   			= 0;
-
 	(*track)->MessageSize	 	 		= 0;
 	(*track)->MessageMark1   	 		= 0;
 	(*track)->MessageMark2   	 		= 0;
@@ -269,6 +268,7 @@ int retrieveConnectionTrack(ConnectionTrack track, int32_t connid)
 	track->RegisterTime 			= oldct->RegisterTime;
 	track->ResAllocTime 			= oldct->ResAllocTime;
 	track->ResRequestTime 			= oldct->ResRequestTime;
+	track->LastActTime				= oldct->LastActTime;
 
 	/* Move old resource list to new connection tracker. */
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
@@ -380,14 +380,17 @@ void transformConnectionTrackProgress(ConnectionTrack track,
 	track->Progress = progress;
 }
 
-void addNewMessageToConnTrack(AsyncCommMessageHandlerContext context,
-							  uint16_t						 messageid,
-							  uint8_t						 mark1,
-							  uint8_t						 mark2,
-							  char 							*buffer,
-							  uint32_t						 buffersize)
+void addMessageToConnTrack(AsyncCommMessageHandlerContext	context,
+						   uint16_t							messageid,
+						   uint8_t							mark1,
+						   uint8_t							mark2,
+						   char 						   *buffer,
+						   uint32_t							buffersize)
 {
-	ConnectionTrack conntrack = (ConnectionTrack)(context->UserData);
+	/* Create a new connection track instance to save received message. */
+	ConnectionTrack conntrack = NULL;
+	createEmptyConnectionTrack(&conntrack);
+
 	conntrack->MessageID    = messageid;
 	conntrack->MessageMark1 = mark1;
 	conntrack->MessageMark2 = mark2;
@@ -395,8 +398,18 @@ void addNewMessageToConnTrack(AsyncCommMessageHandlerContext context,
 	resetSelfMaintainBuffer(&(conntrack->MessageBuff));
 	appendSelfMaintainBuffer(&(conntrack->MessageBuff), buffer, buffersize);
 
+	/* Let connection track instance able to find the socket connection. */
+	conntrack->CommBuffer = context->AsyncBuffer;
+	/* Let comm buffer instance able to find connection track. */
+	context->UserData = conntrack;
+
+	/* Start from a established connection. */
+	transformConnectionTrackProgress(conntrack, CONN_PP_ESTABLISHED);
+
+	conntrack->RequestTime = gettime_microsec();
 	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
-	PCONTRACK->ConnHavingRequests = lappend(PCONTRACK->ConnHavingRequests, conntrack);
+	PCONTRACK->ConnHavingRequests = lappend(PCONTRACK->ConnHavingRequests,
+											conntrack);
 	MEMORY_CONTEXT_SWITCH_BACK
 }
 
@@ -404,26 +417,36 @@ void sentMessageFromConnTrack(AsyncCommMessageHandlerContext context)
 {
 	ConnectionTrack conntrack = (ConnectionTrack)(context->UserData);
 	conntrack->ResponseSent = true;
+
+	/* Clean up the connection between connection track and comm buffer. */
+	cutReferenceOfConnTrackAndCommBuffer(context);
 }
 void hasCommErrorInConnTrack(AsyncCommMessageHandlerContext context)
 {
 	/* This is a call back function, nothing to do. */
+	cutReferenceOfConnTrackAndCommBuffer(context);
 }
+
 void cleanupConnTrack(AsyncCommMessageHandlerContext context)
 {
+	cutReferenceOfConnTrackAndCommBuffer(context);
+}
+
+void cutReferenceOfConnTrackAndCommBuffer(AsyncCommMessageHandlerContext context)
+{
 	ConnectionTrack conntrack = (ConnectionTrack)(context->UserData);
 	bool returnconn = false;
 
 	if ( conntrack != NULL && conntrack->ConnID == -1 )
 	{
-		elog(DEBUG5, "Resource manager returns connection track with no conn id set.");
+		elog(DEBUG3, "Resource manager returns connection track with no conn id set.");
 		returnconn = true;
 	}
 	else if ( conntrack != NULL &&
-		      (conntrack->Progress == CONN_PP_ESTABLISHED ||
+		      (conntrack->Progress == CONN_PP_ESTABLISHED||
 		       conntrack->Progress > CONN_PP_FAILS) )
 	{
-		elog(DEBUG5, "Resource manager returns connection track due to removable "
+		elog(DEBUG3, "Resource manager returns connection track due to removable "
 					 "status. %d",
 					 conntrack->Progress);
 		returnconn = true;
@@ -432,6 +455,7 @@ void cleanupConnTrack(AsyncCommMessageHandlerContext context)
 	{
 		/* Cut the reference between connection track and rmcomm buffer. */
 		conntrack->CommBuffer = NULL;
+		context->UserData = NULL;
 	}
 
 	if ( returnconn )
@@ -586,11 +610,17 @@ void dumpConnectionTracks(const char *filename)
 		{
 			ConnectionTrack conn = (ConnectionTrack)(((PAIR)lfirst(cell))->Value);
 
-			fprintf(fp, "SOCK(client=%s:%d:time=%s),",
-						conn->ClientAddrDotStr,
-						conn->ClientAddrPort,
-						format_time_microsec(conn->ConnectTime));
-
+			if ( conn->CommBuffer != NULL )
+			{
+				fprintf(fp, "SOCK(client=%s:%d:time=%s),",
+							conn->CommBuffer->ClientAddrDotStr,
+							conn->CommBuffer->ClientAddrPort,
+							format_time_microsec(conn->RequestTime));
+			}
+			else
+			{
+				fprintf(fp, "SOCK(client=DISCONNECTED:time=NOTIME),");
+			}
 			fprintf(fp, "CONN(id=%d:user=%s:",
 						conn->ConnID,
 						conn->UserID);
@@ -678,8 +708,12 @@ void dumpConnectionTracks(const char *filename)
 						conn->MessageSize,
 						conn->MessageBuff.Cursor+1,
 						format_time_microsec(conn->MessageReceiveTime),
-						conn->ClientAddrDotStr,
-						conn->ClientAddrPort);
+						conn->CommBuffer == NULL ?
+							"UNKNOWNHOST" :
+							conn->CommBuffer->ClientAddrDotStr,
+						conn->CommBuffer == NULL ?
+							0:
+							conn->CommBuffer->ClientAddrPort);
 
 			fprintf(fp, "COMMSTAT(");
 			if ( conn->CommBuffer == NULL )
@@ -724,23 +758,13 @@ void buildResponseIntoConnTrack(ConnectionTrack      conntrack,
 void copyAllocWaitingConnectionTrack(ConnectionTrack source,
 									 ConnectionTrack target)
 {
-	target->ConnectTime					= source->ConnectTime;
+	target->RequestTime					= source->RequestTime;
 	target->RegisterTime   				= source->RegisterTime;
 	target->ResRequestTime 				= source->ResRequestTime;
 	target->ResAllocTime	 			= 0;
 	target->LastActTime	 				= source->LastActTime;
 	target->HeadQueueTime				= source->HeadQueueTime;
 
-	memcpy(&(target->ClientAddr),
-		   &(source->ClientAddr),
-		   sizeof(struct sockaddr_in));
-	target->ClientAddrLen  				= source->ClientAddrLen;
-	target->ClientSocket   				= source->ClientSocket;
-	memcpy(target->ClientAddrDotStr,
-		   source->ClientAddrDotStr,
-		   sizeof(target->ClientAddrDotStr));
-	target->ClientAddrPort				= source->ClientAddrPort;
-
 	target->MessageSize	 	 			= source->MessageSize;
 	target->MessageMark1   	 			= source->MessageMark1;
 	target->MessageMark2   	 			= source->MessageMark2;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
index 21736bc..eb3cf8a 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
@@ -21,6 +21,7 @@
 #define RESOURCE_MAMANGER_INTER_PROCESS_COMMUNICATION_ASYNCCOMM_H
 
 #include "resourcemanager/envswitch.h"
+#include "resourcemanager/utils/simplestring.h"
 
 extern MCTYPE AsyncCommContext;
 
@@ -50,19 +51,27 @@ typedef struct AsyncCommBufferHandlersData *AsyncCommBufferHandlers;
 
 struct AsyncCommBufferData {
 	int						 FD;
-	char					*DomainFileName;
+
+	/* Socket connection information */
+	SimpString				 ClientHostname;
+	struct sockaddr_in 		 ClientAddr;
+	socklen_t				 ClientAddrLen;
+	char					 ClientAddrDotStr[16];
+	uint16_t				 ClientAddrPort;
+	uint16_t				 ServerPort;
+
 	SelfMaintainBufferData 	 ReadBuffer;
 	List 		 			*WriteBuffer;
+
 	/* Complete content size track. */
 	int						 WriteContentSize;
 	int						 WriteContentOriginalSize;
 
 	uint32_t				 ActionMask;
 
-	/* If should actively close. */
-	bool				  	 toClose;
-	/* If should close without handling left data. */
-	bool					 forcedClose;
+	bool				  	 toClose;		/* If should actively close. */
+	bool					 forcedClose;	/* If should close without handling
+											   left to write data. */
 	void				   	*UserData;
 	AsyncCommBufferHandlers	 Methods;
 
@@ -75,26 +84,28 @@ void initializeAsyncComm(void);
 
 /* Register one file descriptor for a connected socket connection. */
 int registerFileDesc(int 					  fd,
-					 char					 *dmfilename,
 					 uint32_t				  actionmask,
 					 AsyncCommBufferHandlers  methods,
 					 void 					 *userdata,
 					 AsyncCommBuffer         *newcommbuffer);
 
+void assignFileDescClientAddressInfo(AsyncCommBuffer	 commbuffer,
+									 const char			*clienthostname,
+									 uint16_t			 serverport,
+									 struct sockaddr_in	*clientaddr,
+									 socklen_t			 clientaddrlen);
+
 /* Register one comm buffer for asynchronous connection and communication. */
-int registerAsyncConnectionFileDesc(const char				*sockpath,
-									const char				*address,
+int registerAsyncConnectionFileDesc(const char				*address,
 									uint16_t				 port,
 									uint32_t				 actionmask,
 									AsyncCommBufferHandlers  methods,
 									void					*userdata,
 									AsyncCommBuffer			*newcommbuffer);
 
-/* If new fd can be registered. */
-bool canRegisterFileDesc(void);
 /* Process all registered file descriptors. */
 int processAllCommFileDescs(void);
-
+void unresigsterFileDesc(int fd);
 void closeAndRemoveAllRegisteredFileDesc(void);
 
 void addMessageContentToCommBuffer(AsyncCommBuffer 		buffer,
@@ -103,4 +114,7 @@ void addMessageContentToCommBuffer(AsyncCommBuffer 		buffer,
 SelfMaintainBuffer getFirstWriteBuffer(AsyncCommBuffer commbuffer);
 
 void shiftOutFirstWriteBuffer(AsyncCommBuffer commbuffer);
+
+void closeFileDesc(AsyncCommBuffer commbuff);
+void forceCloseFileDesc(AsyncCommBuffer commbuff);
 #endif /*RESOURCE_MAMANGER_INTER_PROCESS_COMMUNICATION_ASYNCCOMM_H*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/include/communication/rmcomm_Connect.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_Connect.h b/src/backend/resourcemanager/include/communication/rmcomm_Connect.h
index fe6d830..f5212a7 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_Connect.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_Connect.h
@@ -34,7 +34,6 @@ void ErrorHandler_Connect(AsyncCommBuffer buffer);
 void CleanUpHandler_Connect(AsyncCommBuffer buffer);
 
 int registerFileDescForAsyncConn(int 		 			  fd,
-								 char					 *dmfilename,
 								 uint32_t				  actionmask_afterconn,
 								 AsyncCommBufferHandlers  methods_afterconn,
 								 void				     *userdata_afterconn,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/include/communication/rmcomm_MessageServer.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_MessageServer.h b/src/backend/resourcemanager/include/communication/rmcomm_MessageServer.h
index 2e09d19..862ea01 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_MessageServer.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_MessageServer.h
@@ -58,12 +58,12 @@ void ErrorHandler_MsgServer(AsyncCommBuffer buffer);
 void CleanUpHandler_MsgServer(AsyncCommBuffer buffer);
 
 /* Callbacks registered in AsyncComm Message instance */
-void addNewMessageToConnTrack(AsyncCommMessageHandlerContext context,
-							  uint16_t						 messageid,
-							  uint8_t						 mark1,
-							  uint8_t						 mark2,
-							  char 							*buffer,
-							  uint32_t						 buffersize);
+void addMessageToConnTrack(AsyncCommMessageHandlerContext	context,
+						   uint16_t							messageid,
+						   uint8_t							mark1,
+						   uint8_t							mark2,
+						   char 						   *buffer,
+						   uint32_t							buffersize);
 
 void sentMessageFromConnTrack(AsyncCommMessageHandlerContext context);
 void hasCommErrorInConnTrack(AsyncCommMessageHandlerContext context);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/include/communication/rmcomm_SyncComm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/communication/rmcomm_SyncComm.h b/src/backend/resourcemanager/include/communication/rmcomm_SyncComm.h
index d7493e3..cbbe448 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_SyncComm.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_SyncComm.h
@@ -27,15 +27,6 @@ typedef void (* UserDefinedBackgroundLogicHandler)(void);
 void initializeSyncRPCComm(void);
 void setUserDefinedBackgroundLogic(UserDefinedBackgroundLogicHandler handler);
 
-int callSyncRPCDomain(const char     	   *sockfile,
-					  const char 	 	   *sendbuff,
-		        	  int   		  		sendbuffsize,
-					  uint16_t		  		sendmsgid,
-					  uint16_t 		  		exprecvmsgid,
-					  SelfMaintainBuffer 	recvsmb,
-					  char				   *errorbuf,
-					  int					errorbufsize);
-
 int callSyncRPCRemote(const char     	   *hostname,
 					  uint16_t              port,
 		  	  	  	  const char 	 	   *sendbuff,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/include/conntrack.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/conntrack.h b/src/backend/resourcemanager/include/conntrack.h
index f1799dd..e1daf4a 100644
--- a/src/backend/resourcemanager/include/conntrack.h
+++ b/src/backend/resourcemanager/include/conntrack.h
@@ -76,7 +76,7 @@ struct DynResourceQueueTrackData;
 
 struct ConnectionTrackData
 {
-	uint64_t				ConnectTime;   /* When connection is created.	  */
+	uint64_t				RequestTime;   /* When request is accepted.	  	  */
 	uint64_t				RegisterTime;  /* When connection registered. 	  */
 	uint64_t				ResRequestTime;/* When resource allocation request
 											  is received.			  		  */
@@ -84,14 +84,6 @@ struct ConnectionTrackData
 	uint64_t				LastActTime;   /* Last action time.			      */
 	uint64_t				HeadQueueTime; /* When request is queued at head. */
 
-	/* Socket connection information */
-	struct sockaddr_in 		ClientAddr;
-	socklen_t				ClientAddrLen;
-	int						ClientSocket;
-	char					ClientAddrDotStr[16];
-	uint16_t				ClientAddrPort;
-
-	/* Input Message content */
 	uint32_t				MessageSize;
 	uint8_t					MessageMark1;
 	uint8_t					MessageMark2;
@@ -215,4 +207,5 @@ void setConnectionTrackMessageBuffer(ConnectionTrack  track,
 									 int 			  size);
 
 void freeUsedConnectionTrack(ConnectionTrack track);
+
 #endif /*DYNAMIC_RESOURCE_MANAGEMENT_CONNECTION_TRACK_H*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/include/utils/network_utils.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/utils/network_utils.h b/src/backend/resourcemanager/include/utils/network_utils.h
index 01a817a..4aa2cf0 100644
--- a/src/backend/resourcemanager/include/utils/network_utils.h
+++ b/src/backend/resourcemanager/include/utils/network_utils.h
@@ -108,18 +108,43 @@ int recvWithRetry(int fd, char *buff, int size, bool checktrans);
 int setConnectionNonBlocked(int fd);
 
 #define DRM_SOCKET_CONN_RETRY 5
-int  connectToServerDomain(const char 	*sockpath,
-						   uint16_t 	 port,
-						   int 			*clientfd,
-						   int			 fileidx,
-						   char			*filename);
 int  connectToServerRemote(const char *address,uint16_t port,int *clientfd);
 void closeConnectionRemote(int *clientfd);
-void closeConnectionDomain(int *clientfd, char *filename);
-
+void returnAliveConnectionRemote(int 			*clientfd,
+								 const char 	*hostname,
+								 AddressString   addrstr,
+								 uint16_t 		 port);
+void returnAliveConnectionRemoteByHostname(int 		  *clientfd,
+										   const char *hostname,
+										   uint16_t    port);
 char *format_time_microsec(uint64_t microtime);
 
 int readPipe(int fd, void *buff, int buffsize);
 int writePipe(int fd, void *buff, int buffsize);
 
+struct ConnAddressStringData {
+	uint16_t				Port;
+	uint16_t				Reserved;
+	AddressStringData		Address;
+};
+typedef struct ConnAddressStringData	 ConnAddressStringData;
+typedef struct ConnAddressStringData	*ConnAddressString;
+
+#define SIZEOFCONNADDRSTRING(connaddr) offsetof(ConnAddressStringData,Address)+\
+									   offsetof(AddressStringData,Address)+\
+									   (connaddr)->Address.Length+1
+
+#define EXPSIZEOFCONNADDRSTRING(addrstr) offsetof(ConnAddressStringData,Address)+\
+		   	   	   	   	   	   	   	     offsetof(AddressStringData,Address)+\
+										 (addrstr)->Length+1
+
+ConnAddressString createConnAddressString(AddressString address, uint16_t port);
+void freeConnAddressString(ConnAddressString connaddr);
+
+void initializeSocketConnectionPool(void);
+AddressString getAddressStringByHostName(const char *hostname);
+
+int fetchAliveSocketConnection(const char 	 *hostname,
+							   AddressString  address,
+							   uint16_t 	  port);
 #endif /* RESOURCE_MANANGER_NETWORK_UTILITIES_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index f690c8a..ba35e2e 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -32,11 +32,6 @@
 
 #include "resourcemanager.h"
 
-/******************************************************************************
- * Global Variables
- ******************************************************************************/
-extern char *UnixSocketDir;		  /* Reference from global configure.         */
-
 /*
  * The MAIN ENTRY of request handler.
  * The implementation of all request handlers are :
@@ -248,20 +243,17 @@ bool handleRMRequestConnectionUnReg(void **arg)
 
 	elog(DEBUG3, "ConnID %d. Try to unregister.", request->ConnID);
 
-	if ( (*conntrack)->ConnID == INVALID_CONNID )
+	res = retrieveConnectionTrack((*conntrack), request->ConnID);
+	if ( res != FUNC_RETURN_OK )
 	{
-		res = retrieveConnectionTrack((*conntrack), request->ConnID);
-		if ( res != FUNC_RETURN_OK )
-		{
-			snprintf(errorbuf, sizeof(errorbuf),
-					 "the resource context is invalid or timed out");
-			elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
-			goto sendresponse;
-		}
-		elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
-					 (*conntrack)->ConnID,
-					 (*conntrack)->Progress);
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "the resource context is invalid or timed out");
+		elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
+		goto sendresponse;
 	}
+	elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
+				 (*conntrack)->ConnID,
+				 (*conntrack)->Progress);
 
 	/* Get connection ID. */
 	request = SMBUFF_HEAD(RPCRequestHeadUnregisterConnectionInRM,
@@ -363,20 +355,17 @@ bool handleRMRequestAcquireResource(void **arg)
 				 request->ConnID,
 				 request->SessionID);
 
-	if ( (*conntrack)->ConnID == INVALID_CONNID )
+	res = retrieveConnectionTrack((*conntrack), request->ConnID);
+	if ( res != FUNC_RETURN_OK )
 	{
-		res = retrieveConnectionTrack((*conntrack), request->ConnID);
-		if ( res != FUNC_RETURN_OK )
-		{
-			snprintf(errorbuf, sizeof(errorbuf),
-					 "the resource context may be timed out");
-			elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
-			goto sendresponse;
-		}
-		elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
-					 (*conntrack)->ConnID,
-					 (*conntrack)->Progress);
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "the resource context may be timed out");
+		elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
+		goto sendresponse;
 	}
+	elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
+				 (*conntrack)->ConnID,
+				 (*conntrack)->Progress);
 
 	request = SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
 						  &((*conntrack)->MessageBuff));
@@ -523,20 +512,17 @@ bool handleRMRequestReturnResource(void **arg)
 
 	elog(DEBUG3, "ConnID %d. Returns query resource.", request->ConnID);
 
-	if ( (*conntrack)->ConnID == INVALID_CONNID )
+	res = retrieveConnectionTrack((*conntrack), request->ConnID);
+	if ( res != FUNC_RETURN_OK )
 	{
-		res = retrieveConnectionTrack((*conntrack), request->ConnID);
-		if ( res != FUNC_RETURN_OK )
-		{
-			snprintf(errorbuf, sizeof(errorbuf),
-					 "the resource context may be timed out");
-			elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
-			goto sendresponse;
-		}
-		elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
-					 (*conntrack)->ConnID,
-					 (*conntrack)->Progress);
+		snprintf(errorbuf, sizeof(errorbuf),
+				 "the resource context may be timed out");
+		elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
+		goto sendresponse;
 	}
+	elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
+				 (*conntrack)->ConnID,
+				 (*conntrack)->Progress);
 
 	/* Get connection ID. */
 	request = SMBUFF_HEAD(RPCRequestHeadReturnResource,
@@ -617,7 +603,8 @@ bool handleRMSEGRequestIMAlive(void **arg)
 	struct hostent* fts_client_host   = NULL;
 	struct in_addr 	fts_client_addr;
 
-	fts_client_ip = conntrack->ClientAddrDotStr;
+	Assert(conntrack->CommBuffer != NULL);
+	fts_client_ip = conntrack->CommBuffer->ClientAddrDotStr;
 	fts_client_ip_len = strlen(fts_client_ip);
 	inet_aton(fts_client_ip, &fts_client_addr);
 	fts_client_host = gethostbyaddr(&fts_client_addr, 4, AF_INET);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index 1d2e68d..9a4bb85 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -305,6 +305,9 @@ int ResManagerMain(int argc, char *argv[])
 	DRMGlobalInstance->ThisPID	 = getpid();
 	DRMGlobalInstance->ParentPID = getppid();
 
+	/* Initialize socket connection pool. */
+	initializeSocketConnectionPool();
+
 	elog(DEBUG5, "HAWQ RM :: starts as role %d.", DRMGlobalInstance->Role);
 
 	/*******************************************/
@@ -1008,10 +1011,6 @@ static void InitTemporaryDirs(DQueue tmpdirs_list, char *tmpdirs_string)
  */
 int  loadDynamicResourceManagerConfigure(void)
 {
-#ifdef ENABLE_DOMAINSERVER
-	elog(DEBUG3, "Resource manager loads Unix Domain Socket Port %d",
-				 rm_master_addr_domain_port);
-#endif
 	elog(DEBUG3, "Resource manager loads Socket Listening Port %d",
 				 rm_master_port);
 	elog(DEBUG3, "Resource manager loads Segment Socket Listening Port %d",
@@ -2468,43 +2467,12 @@ int	 initializeSocketServer(void)
 		RMListenSocket[i] = PGINVALID_SOCKET;
 	}
 
-#ifdef ENABLE_DOMAINSERVER
-	/* Listen local unix domain socket port. */
-	netres = StreamServerPort(AF_UNIX,
-							  NULL,
-							  rm_master_addr_domain_port,
-							  UnixSocketDir,
-							  RMListenSocket,
-							  1);	 /* Only one unix domain socket server. */
-
-	if ( netres != STATUS_OK ||
-		 /*
-		  * This condition is for double-checking the server is successfully
-		  * created.
-		  */
-		 (netres == STATUS_OK && RMListenSocket[0] == PGINVALID_SOCKET)	)
-	{
-		res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
-		elog(LOG, "Resource manager cannot create UNIX domain socket server. Port=%d",
-				  rm_master_addr_domain_port);
-		return res;
-	}
-
-	/* Listen normal socket addresses. */
-	netres = StreamServerPort(AF_UNSPEC,
-							  allip,
-							  rm_master_port,
-							  NULL,
-							  &(RMListenSocket[1]),
-							  HAWQRM_SERVER_PORT_COUNT-1);
-#else
 	netres = StreamServerPort(AF_UNSPEC,
 			  	  	  	  	  allip,
 							  rm_master_port,
 							  NULL,
 							  RMListenSocket,
 							  HAWQRM_SERVER_PORT_COUNT);
-#endif
 	if ( netres != STATUS_OK )
 	{
 		res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
@@ -2522,7 +2490,6 @@ int	 initializeSocketServer(void)
 		if (RMListenSocket[i] != PGINVALID_SOCKET)
 		{
 			netres = registerFileDesc(RMListenSocket[i],
-									  NULL,
 									  ASYNCCOMM_READ,
 									  &AsyncCommBufferHandlersMsgServer,
 									  NULL,
@@ -2550,21 +2517,11 @@ int	 initializeSocketServer(void)
 		return res;
 	}
 
-#ifdef ENABLE_DOMAINSERVER
-	elog(LOG, "Resource manager starts accepting resource request. "
-			  "Listening unix domain socket port %d. "
-			  "Listening normal socket port %d. "
-			  "Total listened %d FDs.",
-			  rm_master_addr_domain_port,
-			  rm_master_port,
-			  validfdcount);
-#else
 	elog(LOG, "Resource manager starts accepting resource request. "
 			  "Listening normal socket port %d. "
 			  "Total listened %d FDs.",
 			  rm_master_port,
 			  validfdcount);
-#endif
 	return res;
 }
 
@@ -2592,16 +2549,6 @@ void sendResponseToClients(void)
 									 conntrack->MessageID,
 									 conntrack->MessageMark1,
 									 conntrack->MessageMark2);
-			/* If socket connection has error, close connection but keep the
-			 * connection track. */
-			if ( conntrack->Progress >= CONN_PP_FAILS ) {
-
-				/* Tell AsyncComm framework, gracefully close the connection. */
-				conntrack->CommBuffer->forcedClose = false;
-				conntrack->CommBuffer->toClose     = true;
-
-				elog(DEBUG5, "Resource manager returns connection track.");
-			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/resourcemanager_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager_RMSEG.c b/src/backend/resourcemanager/resourcemanager_RMSEG.c
index afc7c49..737d523 100644
--- a/src/backend/resourcemanager/resourcemanager_RMSEG.c
+++ b/src/backend/resourcemanager/resourcemanager_RMSEG.c
@@ -118,7 +118,6 @@ int  initializeSocketServer_RMSEG(void)
 	for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
 		if (RMListenSocket[i] != PGINVALID_SOCKET) {
 			netres = registerFileDesc(RMListenSocket[i],
-									  NULL,
 									  ASYNCCOMM_READ,
 									  &AsyncCommBufferHandlersMsgServer,
 									  NULL,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/resqueuemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c
index 735a871..b4449cd 100644
--- a/src/backend/resourcemanager/resqueuemanager.c
+++ b/src/backend/resourcemanager/resqueuemanager.c
@@ -4885,8 +4885,7 @@ void timeoutDeadResourceAllocation(void)
 				returnConnectionToQueue(curcon, true);
 				if ( curcon->CommBuffer != NULL )
 				{
-					curcon->CommBuffer->toClose = true;
-					curcon->CommBuffer->forcedClose = true;
+					forceCloseFileDesc(curcon->CommBuffer);
 				}
 				else
 				{
@@ -4913,8 +4912,7 @@ void timeoutDeadResourceAllocation(void)
 				returnConnectionToQueue(curcon, true);
 				if ( curcon->CommBuffer != NULL )
 				{
-					curcon->CommBuffer->toClose = true;
-					curcon->CommBuffer->forcedClose = true;
+					forceCloseFileDesc(curcon->CommBuffer);
 				}
 				else
 				{
@@ -4935,8 +4933,7 @@ void timeoutDeadResourceAllocation(void)
 				returnConnectionToQueue(curcon, true);
 				if ( curcon->CommBuffer != NULL )
 				{
-					curcon->CommBuffer->toClose = true;
-					curcon->CommBuffer->forcedClose = true;
+					forceCloseFileDesc(curcon->CommBuffer);
 				}
 				else
 				{
@@ -4979,14 +4976,14 @@ void timeoutQueuedRequest(void)
 		 * 		   added into resource queue manager queues.
 		 */
 		elog(DEBUG3, "Deferred connection track is found. "
-					 " Conn Time " UINT64_FORMAT
+					 " Request Time " UINT64_FORMAT
 					 " Curr Time " UINT64_FORMAT
 					 " Delta " UINT64_FORMAT,
-					 ct->ConnectTime,
+					 ct->RequestTime,
 					 curmsec,
-					 curmsec - ct->ConnectTime);
+					 curmsec - ct->RequestTime);
 
-		if ( curmsec - ct->ConnectTime > 1000000L * rm_resource_allocation_timeout )
+		if ( curmsec - ct->RequestTime > 1000000L * rm_resource_allocation_timeout )
 		{
 			snprintf(errorbuf, sizeof(errorbuf),
 					 "resource request is timed out due to no available cluster");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/resourcemanager/utils/network_utils.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/utils/network_utils.c b/src/backend/resourcemanager/utils/network_utils.c
index 4cfc9df..b8bbab9 100644
--- a/src/backend/resourcemanager/utils/network_utils.c
+++ b/src/backend/resourcemanager/utils/network_utils.c
@@ -30,6 +30,15 @@
 #include <unistd.h>
 #include <sys/types.h>
 
+#include "dynrm.h"
+/*
+ * Global variables for socket connection pool
+ */
+HASHTABLEData ResolvedHostnames;	/* All resolved hostname's address info.  */
+HASHTABLEData ActiveConnections;	/* All currently active connections.	  */
+
+static void cleanupSocketConnectionPool(int code, Datum arg);
+
 uint64_t gettime_microsec(void)
 {
     static struct timeval t;
@@ -326,101 +335,6 @@ int setConnectionNonBlocked(int fd)
 }
 
 /*
- * A wrapper for getting one unix domain socket connection to server.
- *
- * sockpath[in]			The domain socket file name.
- * port[in] 			The port number.
- * clientfd[out]		The fd of connection.
- *
- * Return:
- * FUNC_RETURN_OK					Succeed.
- * UTIL_NETWORK_FAIL_CREATESOCKET. 	Fail to call socket().
- * UTIL_NETWORK_FAIL_BIND. 			Fail to call bind().
- * UTIL_NETWORK_FAIL_CONNECT. 		Fail to call connect().
- **/
-int  connectToServerDomain(const char 	*sockpath,
-						   uint16_t 	 port,
-						   int 			*clientfd,
-						   int			 fileidx,
-						   char			*filename)
-{
-	struct sockaddr_un  sockaddr;
-	int					fd			= 0;
-	int					len			= 0;
-	int					sockres		= 0;
-
-	*clientfd   = -1;
-	filename[0] = '\0';
-
-	fd = socket(AF_UNIX, SOCK_STREAM, 0);
-	if ( fd < 0 )
-	{
-		write_log("Failed to open socket for connecting domain socket server "
-				  "(errno %d)",
-				  errno);
-		return UTIL_NETWORK_FAIL_CREATESOCKET;
-	}
-
-	memset( &sockaddr, 0, sizeof(struct sockaddr_un) );
-	sockaddr.sun_family = AF_UNIX;
-	sprintf(sockaddr.sun_path, "%s.%d.%lu.%d",
-			sockpath,
-			getpid(),
-			(unsigned long)pthread_self(),
-			fileidx);
-	len = offsetof(struct sockaddr_un, sun_path) + strlen(sockaddr.sun_path);
-	unlink(sockaddr.sun_path);
-	strcpy(filename, sockaddr.sun_path);
-
-	sockres = bind(fd, (struct sockaddr *)&sockaddr, len);
-	if ( sockres < 0 )
-	{
-		write_log("Failed to bind socket for connecting domain socket server "
-				  "%s (errno %d), close fd %d at once",
-				  filename,
-				  errno,
-				  fd);
-		closeConnectionDomain(&fd, filename);
-		return UTIL_NETWORK_FAIL_BIND;
-	}
-
-	memset( &sockaddr, 0, sizeof(struct sockaddr_un) );
-	sockaddr.sun_family = AF_UNIX;
-	sprintf(sockaddr.sun_path, "%s", sockpath);
-	len = offsetof(struct sockaddr_un, sun_path) + strlen(sockaddr.sun_path);
-
-	for ( int i = 0 ; i < DRM_SOCKET_CONN_RETRY ; ++i )
-	{
-		sockres = connect(fd, (struct sockaddr *)&sockaddr, len);
-		if ( sockres < 0 )
-		{
-			write_log("Failed to connect to domain socket server "
-					  "(retry %d, errno %d), fd %d",
-					  i,
-					  errno,
-					  fd);
-			pg_usleep(1000000); /* Sleep 1 seconds and retry. */
-		}
-		else
-		{
-			break;
-		}
-	}
-
-	if ( sockres < 0 )
-	{
-		write_log("Failed to connect to domain socket server after retries, "
-				  "close fd %d at once",
-				  fd);
-		closeConnectionDomain(&fd, filename);
-		return UTIL_NETWORK_FAIL_CONNECT;
-	}
-
-	*clientfd = fd;
-	return FUNC_RETURN_OK;
-}
-
-/*
  * A wrapper for getting one socket connection to server.
  *
  * address[in]			The address to connect.
@@ -435,25 +349,35 @@ int  connectToServerDomain(const char 	*sockpath,
  */
 int connectToServerRemote(const char *address, uint16_t port, int *clientfd)
 {
-	int					fd		= 0;
+	int					fd		= -1;
 	int 		    	sockres = 0;
 	struct sockaddr_in 	server_addr;
-	struct hostent 	   *server  = NULL;
 
 	*clientfd = -1;
 
-	server = gethostbyname(address);
-	if ( server == NULL )
+	AddressString resolvedaddr = getAddressStringByHostName(address);
+	if ( resolvedaddr == NULL )
 	{
 		write_log("Failed to get host by name %s for connecting to a remote "
-				  "socket server %s:%d (error %s)",
+				  "socket server %s:%d",
 				  address,
 				  address,
-				  port,
-				  hstrerror(h_errno));
+				  port);
 		return UTIL_NETWORK_FAIL_GETHOST;
 	}
 
+	if ( rm_enable_connpool )
+	{
+		/* Try to get an alive connection from connection pool. */
+		fd = fetchAliveSocketConnection(address, resolvedaddr, port);
+	}
+
+	if ( fd != -1 )
+	{
+		*clientfd = fd;
+		return FUNC_RETURN_OK;
+	}
+
 	fd = socket(AF_INET, SOCK_STREAM, 0);
 	if ( fd < 0 )
 	{
@@ -465,30 +389,28 @@ int connectToServerRemote(const char *address, uint16_t port, int *clientfd)
 
 	bzero((char *)&server_addr, sizeof(server_addr));
 	server_addr.sin_family = AF_INET;
-	bcopy((char *)server->h_addr,
-		  (char *)&server_addr.sin_addr.s_addr,
-		  server->h_length);
+	memcpy((char *)&server_addr.sin_addr.s_addr,
+		   resolvedaddr->Address,
+		   resolvedaddr->Length);
 	server_addr.sin_port = htons(port);
 
 	while(true)
 	{
-		sockres = connect(fd,
-						  (struct sockaddr *)&server_addr,
-						  sizeof(server_addr));
+		sockres = connect(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
 		if( sockres < 0)
 		{
-			write_log("Failed to connect to remove socket server (errno %d), fd %d",
+			write_log("Failed to connect to remove socket server "
+					  "(errno %d), fd %d",
 					  errno,
 					  fd);
-
 			if (errno == EINTR)
 			{
 				continue;
 			}
 			else
 			{
-				write_log("Close fd %d at once due to not recoverable error "
-						  "detected.",
+				write_log("Close fd %d at once due to not recoverable connection"
+						  "error detected.",
 						  fd);
 				closeConnectionRemote(&fd);
 				return UTIL_NETWORK_FAIL_CONNECT;
@@ -519,30 +441,6 @@ void closeConnectionRemote(int *clientfd)
 	*clientfd = -1;
 }
 
-void closeConnectionDomain(int *clientfd, char *filename)
-{
-	Assert(clientfd);
-	if (*clientfd == -1)
-	{
-		return;
-	}
-	else
-	{
-		int ret = close(*clientfd);
-		if (ret < 0)
-		{
-			write_log("Failed to close fd %d (errno %d)", *clientfd, errno);
-		}
-	}
-
-	if ( filename != NULL && filename[0] != '\0' )
-	{
-		unlink(filename);
-	}
-
-	*clientfd = -1;
-}
-
 char *format_time_microsec(uint64_t microtime)
 {
 	static char result[64];
@@ -600,3 +498,220 @@ retry:
 
 	return -1;
 }
+
+/*
+ * This function check buffered resolved host address by hostname string. If the
+ * hostname is new to current process, gethostbyname() is called. If the hostname
+ * is not resolved successfully, NULL is returned.
+ */
+AddressString getAddressStringByHostName(const char *hostname)
+{
+	AddressString res = NULL;
+
+	/* Check HASHTABLE to see if this hostname has been successfully resolved. */
+	SimpString key;
+	setSimpleStringRef(&key, (char *)hostname, strlen(hostname));
+
+	PAIR pair = getHASHTABLENode(&ResolvedHostnames, (void *)&key);
+	if ( pair != NULL )
+	{
+		/* Return buffered host address content. */
+		res = pair->Value;
+		return res;
+	}
+
+	/* Resolve hostname and build up the object buffered in HASHTABLE. */
+	struct hostent *server = gethostbyname(hostname);
+	if ( server == NULL )
+	{
+		write_log("Failed to resolve hostname %s. (herrno %d)", hostname, h_errno);
+		return NULL;
+	}
+
+	res = rm_palloc0(PCONTEXT,
+					 offsetof(AddressStringData, Address) + server->h_length + 1);
+	memcpy(res->Address, server->h_addr, server->h_length);
+	res->Length = server->h_length;
+	setHASHTABLENode(&ResolvedHostnames, (void *)&key, (void *)res, false);
+	return res;
+}
+
+ConnAddressString createConnAddressString(AddressString address, uint16_t port)
+{
+	ConnAddressString res = rm_palloc0(PCONTEXT,
+									   EXPSIZEOFCONNADDRSTRING(address));
+	res->Port = port;
+	res->Reserved = 0;
+	res->Address.Length = address->Length;
+	memcpy(res->Address.Address, address->Address, address->Length);
+	return res;
+}
+
+void freeConnAddressString(ConnAddressString connaddr)
+{
+	rm_pfree(PCONTEXT, connaddr);
+}
+
+int fetchAliveSocketConnection(const char 	 *hostname,
+							   AddressString  address,
+							   uint16_t 	  port)
+{
+	ConnAddressString connaddr = createConnAddressString(address, port);
+	SimpArray key;
+	setSimpleArrayRef(&key, (char *)connaddr, SIZEOFCONNADDRSTRING(connaddr));
+	PAIR pair = getHASHTABLENode(&ActiveConnections, (void *)&key);
+	if ( pair == NULL )
+	{
+		freeConnAddressString(connaddr);
+		return -1;
+	}
+
+	List *list = (List *)pair->Value;
+	Assert(list != NULL);
+
+	int res = lfirst_int(list_head(list));
+	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT);
+	list = list_delete_first(list);
+	MEMORY_CONTEXT_SWITCH_BACK
+
+	if ( list == NULL )
+	{
+		/*
+		 * If the last buffered connection for this address and port, remove
+		 * the hash table node.
+		 */
+		removeHASHTABLENode(&ActiveConnections, (void *)&key);
+	}
+
+	freeConnAddressString(connaddr);
+	elog(DEBUG3, "Fetched FD %d for %s:%d.", res, hostname, port);
+	return res;
+}
+
+void returnAliveConnectionRemoteByHostname(int 		  *clientfd,
+										   const char *hostname,
+										   uint16_t port)
+{
+	/* Resolve hostname by checking hash table. */
+	AddressString addrstr = getAddressStringByHostName(hostname);
+	if ( addrstr == NULL )
+	{
+		closeConnectionRemote(clientfd);
+	}
+	else
+	{
+		returnAliveConnectionRemote(clientfd, hostname, addrstr, port);
+	}
+
+}
+
+void returnAliveConnectionRemote(int 			*clientfd,
+								 const char 	*hostname,
+								 AddressString   addrstr,
+								 uint16_t 		 port)
+{
+
+	/* In case no need to buffer connection, we close the connection directly. */
+	if ( !rm_enable_connpool )
+	{
+		closeConnectionRemote(clientfd);
+		return;
+	}
+
+	/* Try to get node. */
+	ConnAddressString connaddr = createConnAddressString(addrstr, port);
+	SimpArray key;
+	setSimpleArrayRef(&key, (char *)connaddr, SIZEOFCONNADDRSTRING(connaddr));
+	PAIR pair = getHASHTABLENode(&ActiveConnections, (void *)&key);
+
+	List *list = NULL;
+	if ( pair == NULL )
+	{
+		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+		list = list_make1_int(*clientfd);
+		MEMORY_CONTEXT_SWITCH_BACK
+		setHASHTABLENode(&ActiveConnections, (void *)&key, (void *)list, false);
+		elog(DEBUG3, "Buffered FD %d for %s:%d.", *clientfd, hostname, port);
+	}
+	else
+	{
+		list = (List *)(pair->Value);
+		if ( list_length(list) >= rm_connpool_sameaddr_buffersize )
+		{
+			elog(DEBUG3, "Drop FD %d because too many FDs buffered already for "
+						 "the same address and port.",
+						 *clientfd);
+			closeConnectionRemote(clientfd);
+		}
+		else
+		{
+			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+			list = lappend_int(list, *clientfd);
+			MEMORY_CONTEXT_SWITCH_BACK
+			elog(DEBUG3, "Buffered FD %d for %s:%d.", *clientfd, hostname, port);
+		}
+		pair->Value = (void *)list;
+	}
+
+	*clientfd = -1;
+	freeConnAddressString(connaddr);
+}
+
+void initializeSocketConnectionPool(void)
+{
+	/* Initialize the hash table for buffering resolved hosts. */
+    initializeHASHTABLE(&ResolvedHostnames,
+    					PCONTEXT,
+						HASHTABLE_SLOT_VOLUME_DEFAULT,
+						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
+						HASHTABLE_KEYTYPE_SIMPSTR,
+						NULL);
+
+    /* Initialize the hash table for buffering alive socket connections. */
+    initializeHASHTABLE(&ActiveConnections,
+        				PCONTEXT,
+    					HASHTABLE_SLOT_VOLUME_DEFAULT,
+    					HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
+						HASHTABLE_KEYTYPE_CHARARRAY,
+    					NULL);
+
+    on_proc_exit(cleanupSocketConnectionPool, 0);
+}
+
+static void cleanupSocketConnectionPool(int code, Datum arg)
+{
+	/* Free alive connections. */
+	List 	 *connlist 	= NULL;
+	ListCell *cell 		= NULL;
+	getAllPAIRRefIntoList(&ActiveConnections, &connlist);
+	foreach(cell, connlist)
+	{
+		PAIR pair = (PAIR)lfirst(cell);
+		List *aliveconns = (List *)(pair->Value);
+
+		while( aliveconns != NULL )
+		{
+			int fd = lfirst_int(list_head(aliveconns));
+			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+			aliveconns = list_delete_first(aliveconns);
+			MEMORY_CONTEXT_SWITCH_BACK
+			closeConnectionRemote(&fd);
+		}
+		pair->Value = NULL;
+	}
+
+	freePAIRRefList(&ActiveConnections, &connlist);
+	cleanHASHTABLE(&ActiveConnections);
+
+	/* Free buffered resolved hosts. */
+	List 	 *addrlist	= NULL;
+	getAllPAIRRefIntoList(&ResolvedHostnames, &addrlist);
+	foreach(cell, addrlist)
+	{
+		AddressString addrstr = (AddressString)(((PAIR)lfirst(cell))->Value);
+		rm_pfree(PCONTEXT, addrstr);
+	}
+
+	freePAIRRefList(&ResolvedHostnames, &addrlist);
+	cleanHASHTABLE(&ResolvedHostnames);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0321b9f..5b04054 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4413,6 +4413,15 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"hawq_rm_enable_connpool", PGC_POSTMASTER, RESOURCES_MGM,
+		 gettext_noop("enalbe client side socket connection pool."),
+		 NULL
+		},
+		&rm_enable_connpool,
+		true, NULL, NULL
+	},
+
+	{
 		{"hawq_rm_force_alterqueue_cancel_queued_request", PGC_POSTMASTER, RESOURCES_MGM,
 		 gettext_noop("force to cancel a query resource request when altering a resource queue."),
 		 NULL
@@ -6234,6 +6243,16 @@ static struct config_int ConfigureNamesInt[] =
     },
 
     {
+            {"hawq_rm_connpool_sameaddr_buffersize", PGC_POSTMASTER, RESOURCES_MGM,
+                    gettext_noop("buffered socket connection maximum size for "
+                    			 "one address and one port"),
+                    NULL
+            },
+            &rm_connpool_sameaddr_buffersize,
+            2, 1, 65535, NULL, NULL
+    },
+
+    {
             {"hawq_rm_master_domain_port", PGC_POSTMASTER, RESOURCES_MGM,
                     gettext_noop("resource manager master domain socket port number"),
                     NULL

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/889292bf/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index c1227b4..fbe6251 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -1153,6 +1153,8 @@ extern char   *seg_directory;
 extern int	   rm_master_domain_port;
 extern int     rm_master_port;
 extern int	   rm_segment_port;
+extern bool	   rm_enable_connpool;
+extern int	   rm_connpool_sameaddr_buffersize;
 
 extern char   *rm_global_rm_type;
 



Mime
View raw message