hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iw...@apache.org
Subject incubator-hawq git commit: HAWQ-1208. Porting gpdb interconnect fix to hawq
Date Tue, 27 Dec 2016 06:29:00 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 492e8b71c -> 2a077062d


HAWQ-1208. Porting gpdb interconnect fix to hawq

This commit includes below commits in gpdb:
------------------------------------------
commit ce9cf07073946644b9dbc9a2fa0c1ca1179c57ee
Author: xiongg1 <gxiong@pivotal.io>
Date:   Wed Jun 10 19:31:05 2015 -0400

    [JIRA: MPP-25589] interconnect timeout bug

commit 615934aa7cf87eab9b258cb56eae7766aabab979
Author: tangp4 <ptang@pivotal.io>
Date:   Tue Jul 7 22:00:24 2015 -0400

    [JIRA: MPP-25497] Fix the coverity defect CIDs: 16672

commit ae75ad81020533e0acc69daba086581dbca68ed2
Author: TangPengzhou <ptang@pivotal.io>
Date:   Thu May 21 17:36:30 2015 +0800

    [JIRA: MPP-25497]  Shutdown interconnect thread before shmem_exit()

    Sometimes, if a FATAL level error occurs, shmem_exit() freed contexts that
    still being accessed by ic thread, this cause a segment fault unexpectedly.

commit 2d7d0735f5b862d62a84d446f6dd2ce44b0df250
Author: Gang Xiong <gxiong@pivotal.io>
Date:   Sun Jan 3 13:37:34 2016 -0500

    [JIRA:MPP-26123] Handle corner case of interconnect ack packet

    When the ack packet is resent with UDPIC_FLAGS_STOP set, we need to make
    sure the sender is aware of that. Otherwise, the sender will hang there
    forever.

commit ad8328423661cb64e7b905df4928e48d532653b9
Author: Heikki Linnakangas <hlinnakangas@pivotal.io>
Date:   Sun Aug 30 00:11:17 2015 +0300

    [MPP-25631] Remove unnecessary #includes.

    No particular urgency to clean up just these, just something that caught my
    eye while browsing the code.

commit 65809c0bf53a665a16e467331ea6e7d65aee54c6
Author: xiongg1 <gxiong@pivotal.io>
Date:   Mon Jun 15 19:36:55 2015 -0400

    [JIRA: MPP-25590] Wrong error message when socket is exhausted on master

commit a343ed5bfd992b10137ad232f241be72857d6ea2
Author: gpadmin <gpadmin@g187>
Date:   Thu Oct 27 18:58:54 2016 +0000

    Add GUC called gp_interconnect_tcp_listener_backlog for tcp interconnect to control the
backlog param of listen call

commit eda343b5050e31764154698f25e88d6f3fa7e957
Author: Kenan Yao <kyao@pivotal.io>
Date:   Tue Oct 25 16:10:05 2016 +0800

    Fix a bug in function destroyConnHashTable which frees a wrong pointer and
    should cause SIGSEGV.

    Signed-off-by: Pengzhou Tang <ptang@pivotal.io>

commit 616f3c8f0ea372e01d166f4a6c52c6075a74ecd3
Author: Pengzhou Tang <ptang@pivotal.io>
Date:   Tue Apr 5 10:50:55 2016 +0800

    Fix coverity issue for 7e8f391dfdd

commit 7e8f391dfdd8945573d8b621533626813f8f7684
Author: tangpengzhou <ptang@pivotal.io>
Date:   Mon Mar 28 23:47:48 2016 +0000

    Fix tcp socket/port leak when interconnect type is udp or udpifc

    Since gpdb don't allow changing interconnect type after connection started, it's unnecessary
to
    allocate a tcp socket and port for hot switching from udp/udpifc to tcp.

commit 7e8f391dfdd8945573d8b621533626813f8f7684
Author: tangpengzhou <ptang@pivotal.io>
Date:   Mon Mar 28 23:47:48 2016 +0000

    Fix tcp socket/port leak when interconnect type is udp or udpifc

    Since gpdb don't allow changing interconnect type after connection started, it's unnecessary
to
    allocate a tcp socket and port for hot switching from udp/udpifc to tcp.

commit 8ec73f12c997777cae321c301a079f2cae2914fc
Author: Pengzhou Tang <ptang@pivotal.io>
Date:   Thu Mar 3 15:36:06 2016 +0800

    Fix incorrect EOS warning message generated by direct-dispatch type queries

    QD should not expect end-of-stream comes from QEs who is not members of
    direct dispatch and should not report warning message.

commit 21973ab2282701eff45f9b9448525da266843ca4
Author: Pengzhou Tang <ptang@pivotal.io>
Date:   Thu Mar 24 16:16:11 2016 +0800

    Fix ic thread waiting error in utility mode

    Interconnect thread is not created in utility mode, ic_rx_thread_created is initialized
to true which
    cause WaitInterconnectQuitUDP() to wait a non-existent ic thread.

commit ba4b8ab4bce0cb62e7b1d3124012a20849434d81
Author: xiong-gang <gxiong@pivotal.io>
Date:   Fri Nov 18 17:10:41 2016 +0800

    Correct 'extraSeq' in ack packet after stop is requested

    If the ack packet in doSendStopMessageUDPIFC() is lost, QE will keep sending status packet,
    and QD will ack it in handleDataPacket(). We need make sure the 'extraSeq' is equal to
'seq'
    in the ack packet so that QE can update the capacity. Or else, QE will hang for ever.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2a077062
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2a077062
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2a077062

Branch: refs/heads/master
Commit: 2a077062d30892d7c6372d1e7995234ae0aa95d5
Parents: 492e8b7
Author: Ming LI <mli@apache.org>
Authored: Fri Dec 9 19:35:12 2016 +0800
Committer: ivan <iweng@pivotal.io>
Committed: Tue Dec 27 14:26:35 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/motion/cdbmotion.c             |   7 ++
 src/backend/cdb/motion/ic_common.c             | 124 +++++++++++++++++++-
 src/backend/cdb/motion/ic_tcp.c                |   8 ++
 src/backend/cdb/motion/ic_udp.c                |  75 +++++++++++-
 src/backend/parser/analyze.c                   |   5 +-
 src/backend/storage/ipc/ipc.c                  |  40 ++++++-
 src/backend/utils/misc/faultinjector.c         |   5 +-
 src/include/cdb/ml_ipc.h                       |  11 ++
 src/include/utils/faultinjector.h              |   2 +
 src/test/regress/expected/icudp_full.out       |  17 +++
 src/test/regress/sql/icudp_full.sql            |  16 +++
 tools/bin/hawqpylib/programs/clsInjectFault.py |   1 +
 12 files changed, 295 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/cdbmotion.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/motion/cdbmotion.c b/src/backend/cdb/motion/cdbmotion.c
index 03a9633..a237703 100644
--- a/src/backend/cdb/motion/cdbmotion.c
+++ b/src/backend/cdb/motion/cdbmotion.c
@@ -750,6 +750,13 @@ EndMotionLayerNode(MotionLayerState *mlStates, int16 motNodeID, bool
flushCommLa
 		{
 			pCSEntry = &pMNEntry->ready_tuple_lists[i];
 
+			/*
+			 * QD should not expect end-of-stream comes from QEs who is not members of
+			 * direct dispatch
+			 */
+			if (!pCSEntry->init)
+				continue;
+
 			if (pMNEntry->preserve_order &&
 				gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
 			{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/ic_common.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/motion/ic_common.c b/src/backend/cdb/motion/ic_common.c
index 6484a11..0961a25 100644
--- a/src/backend/cdb/motion/ic_common.c
+++ b/src/backend/cdb/motion/ic_common.c
@@ -36,13 +36,10 @@
 
 #include "postgres.h"
 
-#include <pthread.h>
-
 #include "nodes/execnodes.h"            /* Slice, SliceTable */
 #include "nodes/pg_list.h"
 #include "nodes/print.h"
 #include "utils/memutils.h"
-#include "utils/hsearch.h"
 #include "miscadmin.h"
 #include "libpq/libpq-be.h"
 #include "libpq/ip.h"
@@ -322,8 +319,10 @@ InitMotionLayerIPC(void)
 	/*activated = false;*/
 	savedSeqServerFd = -1;
 
-	InitMotionTCP(&TCP_listenerFd, &tcp_listener);
-	InitMotionUDP(&UDP_listenerFd, &udp_listener);
+	if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
+		InitMotionTCP(&TCP_listenerFd, &tcp_listener);
+	else if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
+		InitMotionUDP(&UDP_listenerFd, &udp_listener);
 
 	Gp_listener_port = (udp_listener<<16) | tcp_listener;
 
@@ -1020,3 +1019,118 @@ void adjustMasterRouting(Slice *recvSlice)
 		}
 	}
 }
+
+void
+SendDummyPacket(void)
+{
+	int sockfd = -1;
+	int ret = -1;
+	struct addrinfo* addrs = NULL;
+	struct addrinfo* rp = NULL;
+	struct addrinfo hint;
+	uint16 udp_listenner;
+	char	port_str[32] = {0};
+	char* dummy_pkt = "stop it";
+	/*
+	* Get address info from interconnect udp listenner port
+	*/
+	udp_listenner = (Gp_listener_port >> 16) & 0x0ffff;
+	snprintf(port_str, sizeof(port_str), "%d", udp_listenner);
+
+	MemSet(&hint, 0, sizeof(hint));
+	hint.ai_socktype = SOCK_DGRAM;
+	hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6  */
+
+#ifdef AI_NUMERICSERV
+	hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;  /* Never do name resolution */
+#else
+	hint.ai_flags = AI_NUMERICHOST;  /* Never do name resolution */
+#endif
+
+	ret = pg_getaddrinfo_all(NULL, port_str, &hint, &addrs);
+	if (ret || !addrs)
+	{
+		elog(LOG, "Send dummy packet failed, pg_getaddrinfo_all(): %s", strerror(errno));
+		goto send_error;
+	}
+
+	for (rp = addrs; rp != NULL; rp = rp->ai_next)
+	{
+		/* Create socket according to pg_getaddrinfo_all() */
+		sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+		if (sockfd < 0)
+		{
+			continue;
+		}
+
+		if (!pg_set_noblock(sockfd))
+		{
+			if (sockfd >= 0)
+				closesocket(sockfd);
+			continue;
+		}
+		break;
+	}
+
+	if (rp == NULL)
+	{
+		elog(LOG, "Send dummy packet failed, create socket failed: %s", strerror(errno));
+		goto send_error;
+	}
+
+	/*
+	* Send a dummy package to the interconnect listener, try 10 times
+	*/
+	int counter = 0;
+	while (counter < 10)
+	{
+		counter++;
+		ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen);
+		if(ret < 0)
+		{
+			if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
+			{
+				continue;
+			}
+			else
+			{
+				elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno));
+				goto send_error;
+			}
+		}
+		break;
+	}
+
+	if (counter >= 10)
+	{
+		elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno));
+		goto send_error;
+	}
+
+	pg_freeaddrinfo_all(hint.ai_family, addrs);
+	close(sockfd);
+	return;
+
+send_error:
+
+	if (addrs)
+	{
+		pg_freeaddrinfo_all(hint.ai_family, addrs);
+	}
+	if (sockfd != -1)
+	{
+		close(sockfd);
+	}
+	return;
+}
+
+/*
+* WaitInterconnectQuit
+*
+* Wait for the ic thread to quit, don't clean any resource owned by ic thread
+*/
+void
+WaitInterconnectQuit(void)
+{
+	WaitInterconnectQuitUDP();
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/ic_tcp.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/motion/ic_tcp.c b/src/backend/cdb/motion/ic_tcp.c
index 5d3b16b..28592c7 100644
--- a/src/backend/cdb/motion/ic_tcp.c
+++ b/src/backend/cdb/motion/ic_tcp.c
@@ -312,8 +312,12 @@ setupTCPListeningSocket(int backlog, int *listenerSocketFd, uint16 *listenerPort
 			break;              /* Success */
 
 		close(fd);
+		fd = -1;
 	}
 
+	fun = "bind";
+	if (fd == -1)
+		goto error;
 
     /* Make socket non-blocking. */
     fun = "fcntl(O_NONBLOCK)";
@@ -1469,6 +1473,10 @@ SetupTCPInterconnect(EState *estate)
 
 		expectedTotalIncoming += activeNumProcs;
 	}
+	
+	if (expectedTotalIncoming > listenerBacklog)
+		ereport(WARNING, (errmsg("SetupTCPInterconnect: too many expected incoming connections(%d),
Interconnect setup might possibly fail", expectedTotalIncoming),
+						  errhint("Try enlarging the gp_interconnect_tcp_listener_backlog GUC value and OS
net.core.somaxconn parameter")));
 
     if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
         ereport(DEBUG1, (errmsg("SetupInterconnect will activate "

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/ic_udp.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c
index bcc959b..0c31224 100644
--- a/src/backend/cdb/motion/ic_udp.c
+++ b/src/backend/cdb/motion/ic_udp.c
@@ -48,6 +48,7 @@
 #include "utils/atomic.h"
 #include "utils/builtins.h"
 #include "utils/debugbreak.h"
+#include "utils/faultinjector.h"
 #include "utils/pg_crc.h"
 #include "port/pg_crc32c.h"
 
@@ -1815,6 +1816,9 @@ destroyConnHashTable(ConnHashTable *ht)
 		pfree(ht->table);
 	else
 		free(ht->table);
+
+	ht->table = NULL;
+	ht->size = 0;
 }
 
 /*
@@ -4587,7 +4591,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry
*pEntr
 					break;
 				}
 
-				if (pkt->seq <= ackConn->receivedAckSeq)
+				if (pkt->seq < ackConn->receivedAckSeq)
 				{
 					if (DEBUG1 >= log_min_messages)
 						write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq
%d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity,
ackConn->consumedSeq);
@@ -4606,6 +4610,13 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry
*pEntr
 					/* continue to deal with acks */
 				}
 
+				if (pkt->seq == ackConn->receivedAckSeq)
+				{
+					if (DEBUG1 >= log_min_messages)
+						write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq
%d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity,
ackConn->consumedSeq);
+					break;
+				}
+
 				/* deal with a regular ack. */
 				if (pkt->flags & UDPIC_FLAGS_ACK)
 				{
@@ -5394,7 +5405,7 @@ checkExpirationCapacityFC(ChunkTransportState *transportStates, ChunkTransportSt
 	uint64 now = getCurrentTime();
 	uint64 elapsed = now - ic_control_info.lastPacketSendTime;
 
-	if (elapsed >= (timeout * 1000))
+	if (elapsed >= ((uint64)timeout * 1000))
 	{
 		ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
 		ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
@@ -5777,6 +5788,18 @@ doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID)
 				 * We will skip sending ACKs to those connections.
 				 */
 
+#ifdef FAULT_INJECTOR
+				if (FaultInjector_InjectFaultIfSet(
+												   InterconnectStopAckIsLost,
+												   DDLNotSpecified,
+												   "" /* databaseName */,
+												   "" /* tableName */) == FaultInjectorTypeSkip)
+				{
+					pthread_mutex_unlock(&ic_control_info.lock);
+					continue;
+				}
+#endif
+
 				if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6)
 				{
 					uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
@@ -5977,7 +6000,9 @@ handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage
*peer,
 	#ifdef AMS_VERBOSE_LOGGING
 		logPkt("STATUS QUERY MESSAGE", pkt);
 	#endif
-		setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags,
conn->conn_info.seq - 1, conn->conn_info.extraSeq);
+		uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
+		uint32 extraSeq = conn->stopRequested ? seq : conn->conn_info.extraSeq;
+		setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags,
seq, extraSeq);
 
 		return false;
 	}
@@ -6224,7 +6249,9 @@ rxThreadFunc(void *arg)
 		if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
 		{
 			if (DEBUG1 >= log_min_messages)
+			{
 				write_log("udp-ic: rx-thread shutting down");
+			}
 			break;
 		}
 
@@ -6250,6 +6277,15 @@ rxThreadFunc(void *arg)
 
 			n = poll(&nfd, 1, RX_THREAD_POLL_TIMEOUT);
 
+			if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
+			{
+				if (DEBUG1 >= log_min_messages)
+				{
+					write_log("udp-ic: rx-thread shutting down");
+				}
+				break;
+			}
+
 			if (n < 0)
 			{
 				if (errno == EINTR)
@@ -6285,6 +6321,15 @@ rxThreadFunc(void *arg)
 			read_count = recvfrom(UDP_listenerFd, (char *)pkt, Gp_max_packet_size, 0,
 								  (struct sockaddr *)&peer, &peerlen);
 
+			if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
+			{
+				if (DEBUG1 >= log_min_messages)
+				{
+					write_log("udp-ic: rx-thread shutting down");
+				}
+				break;
+			}
+
 			if (DEBUG5 >= log_min_messages)
 				write_log("received inbound len %d", read_count);
 
@@ -6877,3 +6922,27 @@ dumpConnections(ChunkTransportStateEntry *pEntry, const char *fname)
 	}
     fclose(ofile);
 }
+
+void
+WaitInterconnectQuitUDP(void)
+{
+	if (Gp_role == GP_ROLE_UTILITY)
+	{
+		return;	
+	}
+
+	/*
+	 * Just in case ic thread is waiting on the locks.
+	*/
+	pthread_mutex_unlock(&ic_control_info.errorLock);
+	pthread_mutex_unlock(&ic_control_info.lock);
+
+	compare_and_swap_32(&ic_control_info.shutdown, 0, 1);
+
+	if (ic_control_info.threadCreated)
+	{
+		SendDummyPacket();
+		pthread_join(ic_control_info.threadHandle, NULL);
+	}
+	ic_control_info.threadCreated = false;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/parser/analyze.c
----------------------------------------------------------------------
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6ae62b3..5024389 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -85,9 +85,8 @@
 #include "parser/parse_cte.h"
 #include "parser/parsetree.h"
 #include "rewrite/rewriteManip.h"
-#include "utils/acl.h"
 #include "utils/builtins.h"
-#include "utils/fmgroids.h"
+#include "utils/datum.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -97,8 +96,6 @@
 #include "cdb/cdbhash.h"
 #include "cdb/cdbsreh.h"
 
-#include "executor/spi.h"
-
 /* temporary rule to control whether we generate RULEs or not -- for testing */
 bool        enable_partition_rules = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/storage/ipc/ipc.c
----------------------------------------------------------------------
diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c
index 4934f80..dd7aa85 100644
--- a/src/backend/storage/ipc/ipc.c
+++ b/src/backend/storage/ipc/ipc.c
@@ -23,12 +23,13 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include "cdb/cdbdisp.h"
 #include "miscadmin.h"
 #ifdef PROFILE_PID_DIR
 #include "postmaster/autovacuum.h"
 #endif
 #include "storage/ipc.h"
-
+#include "libpq/pqsignal.h"
 
 /*
  * This flag is set during proc_exit() to change ereport()'s behavior,
@@ -42,8 +43,7 @@ bool		proc_exit_inprogress = false;
  * (or in the parent postmaster).
  */
 static bool atexit_callback_setup = false;
-
-
+extern void WaitInterconnectQuit(void);
 
 /* ----------------------------------------------------------------
  *						exit() handling stuff
@@ -143,6 +143,16 @@ void
 proc_exit_prepare(int code)
 {
 	/*
+	 * If we came here from any critical section, we don't have safe way to
+	 * clean up shared memory or transaction state.  Though it's not a pleasant
+	 * solution, this is better than messing up database.  This is the least
+	 * desirable bail-out, and whenever you should see this situation, you
+	 * should consider to resolve the actual programming error.
+	 */
+	if (CritSectionCount > 0)
+		elog(PANIC, "process is dying from critical section");
+
+	/*
 	 * Once we set this flag, we are committed to exit.  Any ereport() will
 	 * NOT send control back to the main loop, but right back here.
 	 */
@@ -162,6 +172,30 @@ proc_exit_prepare(int code)
 	InterruptHoldoffCount = 1;
 	CritSectionCount = 0;
 
+	/*
+	 * Also clear the error context stack, to prevent error callbacks
+	 * from being invoked by any elog/ereport calls made during proc_exit.
+	 * Whatever context they might want to offer is probably not relevant,
+	 * and in any case they are likely to fail outright after we've done
+	 * things like aborting any open transaction.  (In normal exit scenarios
+	 * the context stack should be empty anyway, but it might not be in the
+	 * case of elog(FATAL) for example.)
+	 */
+	error_context_stack = NULL;
+
+	/*
+	* Make sure interconnect thread quit before shmem_exit() in FATAL case.
+	* Otherwise, shmem_exit() may free MemoryContex of MotionConns in connHtab unexpectedly;
+	*
+	* For example: PORTAL_MULTI_QUERY strategy doesn't bind estate with portal,
+	* so when fatal occurs, MotionConns of estate don't get removed through
+	* TeardownInterconnect(), but MemoryContex of these MotionConns are freed.
+	*
+	* It's ok to shutdown Interconnect background thread here, process is dying, no
+	* necessary to receive more motion data.
+	*/
+	WaitInterconnectQuit();
+
 	/* do our shared memory exits first */
 	shmem_exit(code);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/utils/misc/faultinjector.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/faultinjector.c b/src/backend/utils/misc/faultinjector.c
index 5c47c35..192b884 100644
--- a/src/backend/utils/misc/faultinjector.c
+++ b/src/backend/utils/misc/faultinjector.c
@@ -324,7 +324,9 @@ FaultInjectorIdentifierEnumToString[] = {
 	_("opt_task_allocate_string_buffer"),
 		/* inject fault while allocating string buffer */
 	_("runaway_cleanup"),
-		/* inject fault before cleaning up a runaway query */		
+		/* inject fault before cleaning up a runaway query */	
+	_("interconnect_stop_ack_is_lost"),
+		/* inject fault in interconnect to skip sending the stop ack */	
 	_("not recognized"),
 };
 
@@ -985,6 +987,7 @@ FaultInjector_NewHashEntry(
 				
 			case FinishPreparedTransactionAbortPass1AbortingCreateNeeded:
 			case FinishPreparedTransactionAbortPass2AbortingCreateNeeded:
+			case InterconnectStopAckIsLost:
 
 			case SyncPersistentTable:
 			case XLOGInsert:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/include/cdb/ml_ipc.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h
index 2b37c4f..2276b42 100644
--- a/src/include/cdb/ml_ipc.h
+++ b/src/include/cdb/ml_ipc.h
@@ -84,6 +84,16 @@ extern void InitMotionLayerIPC(void);
  */
 extern void CleanUpMotionLayerIPC(void);
 
+/*
+ * Wait interconnect thread to quit, called when proc exit.
+ */
+extern void WaitInterconnectQuit(void);
+
+/*
+* Send a dummy packet to interconnect thread to exit poll() immediately
+*/
+extern void SendDummyPacket(void);
+
 /* Returns the fd of the socket that connects to the seqserver.  This value
  * is -1 if it has not been setup.
  */
@@ -335,6 +345,7 @@ extern void markUDPConnInactive(MotionConn *conn);
 
 extern void CleanupMotionTCP(void);
 extern void CleanupMotionUDP(void);
+extern void WaitInterconnectQuitUDP(void);
 
 extern void adjustMasterRouting(Slice *recvSlice);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/include/utils/faultinjector.h
----------------------------------------------------------------------
diff --git a/src/include/utils/faultinjector.h b/src/include/utils/faultinjector.h
index 06a3dfc..b7cf10f 100644
--- a/src/include/utils/faultinjector.h
+++ b/src/include/utils/faultinjector.h
@@ -215,6 +215,8 @@ typedef enum FaultInjectorIdentifier_e {
 
 	RunawayCleanup,
 
+	InterconnectStopAckIsLost,
+
 	/* INSERT has to be done before that line */
 	FaultInjectorIdMax,
 	

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/test/regress/expected/icudp_full.out
----------------------------------------------------------------------
diff --git a/src/test/regress/expected/icudp_full.out b/src/test/regress/expected/icudp_full.out
index c1b8d91..f2d58a3 100644
--- a/src/test/regress/expected/icudp_full.out
+++ b/src/test/regress/expected/icudp_full.out
@@ -749,3 +749,20 @@ RESET gp_log_interconnect;
 RESET log_min_messages;
 RESET search_path;
 DROP SCHEMA ic_udp_test CASCADE;
+/*
+ * If ack packet is lost in doSendStopMessageUDP(), transaction with cursor
+ * should still be able to commit.
+*/
+--start_ignore
+drop table if exists ic_test_1;
+NOTICE:  table "ic_test_1" does not exist, skipping
+--end_ignore
+create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'c1' as the
Greenplum Database data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s)
chosen are the optimal data distribution key to minimize skew.
+begin;
+declare ic_test_cursor_c1 cursor for select * from ic_test_1;
+\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1
+\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1
+commit;
+drop table ic_test_1;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/test/regress/sql/icudp_full.sql
----------------------------------------------------------------------
diff --git a/src/test/regress/sql/icudp_full.sql b/src/test/regress/sql/icudp_full.sql
index 116f89a..0a02b27 100644
--- a/src/test/regress/sql/icudp_full.sql
+++ b/src/test/regress/sql/icudp_full.sql
@@ -372,3 +372,19 @@ RESET log_min_messages;
 
 RESET search_path;
 DROP SCHEMA ic_udp_test CASCADE;
+
+
+/*
+ * If ack packet is lost in doSendStopMessageUDP(), transaction with cursor
+ * should still be able to commit.
+*/
+--start_ignore
+drop table if exists ic_test_1;
+--end_ignore
+create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i;
+begin;
+declare ic_test_cursor_c1 cursor for select * from ic_test_1;
+\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1
+\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1
+commit;
+drop table ic_test_1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/tools/bin/hawqpylib/programs/clsInjectFault.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/programs/clsInjectFault.py b/tools/bin/hawqpylib/programs/clsInjectFault.py
index 7281338..0ec1dcd 100644
--- a/tools/bin/hawqpylib/programs/clsInjectFault.py
+++ b/tools/bin/hawqpylib/programs/clsInjectFault.py
@@ -420,6 +420,7 @@ class HAWQInjectFaultProgram:
 				  "fail_qe_when_do_query (inject fault when QE actually working, set error)" \
 				  "fail_qe_when_begin_parquet_scan (inject fault when begin scan parquet table, set error)"\
 				  "fail_qe_when_parquet_get_next (inject fault when get next, set error)"\
+			      "interconnect_stop_ack_is_lost (inject fault in interconnect to skip sending the
stop ack), " \
 				  "all (affects all faults injected, used for 'status' and 'reset'), ")
         addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string",
                          metavar="ddlStatement",


Mime
View raw message