hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject incubator-hawq git commit: HAWQ-759. Fix query cannot be terminated by pg_cancel_backend or pg_terminate_backend
Date Tue, 14 Jun 2016 02:11:36 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master abf38b723 -> 62559633a


HAWQ-759. Fix query cannot be terminated by pg_cancel_backend or pg_terminate_backend


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/62559633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/62559633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/62559633

Branch: refs/heads/master
Commit: 62559633a90fa427995819352c94a7d0f2513e0a
Parents: abf38b7
Author: Wen Lin <wlin@pivotal.io>
Authored: Tue Jun 14 10:10:28 2016 +0800
Committer: Wen Lin <wlin@pivotal.io>
Committed: Tue Jun 14 10:10:28 2016 +0800

----------------------------------------------------------------------
 src/backend/libpq/be-secure.c    |   8 ++
 src/backend/libpq/pqcomm.c       | 170 ++++++++++++++++++++++++++++++----
 src/backend/tcop/postgres.c      |  84 ++++++++++++++++-
 src/backend/utils/init/globals.c |   2 +
 src/include/libpq/pqcomm.h       |   4 +
 src/include/miscadmin.h          |   2 +
 src/include/tcop/tcopprot.h      |   2 +
 7 files changed, 250 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/libpq/be-secure.c
----------------------------------------------------------------------
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 155074d..06fb8d1 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -420,7 +420,11 @@ wloop:
 	}
 	else
 #endif
+	{
+		prepare_for_client_write();
 		n = send(port->sock, ptr, len, 0);
+		client_write_ended();
+	}
 
 	return n;
 }
@@ -475,6 +479,8 @@ my_sock_write(BIO *h, const char *buf, int size)
 {
 	int			res = 0;
 
+	prepare_for_client_write();
+
 	res = send(h->num, buf, size, 0);
 	if (res <= 0)
 	{
@@ -484,6 +490,8 @@ my_sock_write(BIO *h, const char *buf, int size)
 		}
 	}
 
+	client_write_ended();
+
 	return res;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/libpq/pqcomm.c
----------------------------------------------------------------------
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 3365558..dc00ee5 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -94,6 +94,7 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "cdb/cdbvars.h"
+#include "tcop/tcopprot.h"
 
 /*
  * Configuration options
@@ -133,6 +134,8 @@ static bool DoingCopyOut;
 static void pq_close(int code, Datum arg);
 static int	internal_putbytes(const char *s, size_t len);
 static int	internal_flush(void);
+static void pq_set_nonblocking(bool nonblocking);
+static bool pq_send_mutex_lock();
 
 #ifdef HAVE_UNIX_SOCKETS
 static int	Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
@@ -752,6 +755,43 @@ TouchSocketFile(void)
  * --------------------------------
  */
 
+/* --------------------------------
+ *			  pq_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+pq_set_nonblocking(bool nonblocking)
+{
+	if (MyProcPort->noblock == nonblocking)
+		return;
+
+#ifdef WIN32
+	pgwin32_noblock = nonblocking ? 1 : 0;
+#else
+
+	/*
+	 * Use COMMERROR on failure, because ERROR would try to send the error to
+	 * the client, which might require changing the mode again, leading to
+	 * infinite recursion.
+	 */
+	if (nonblocking)
+	{
+		if (!pg_set_noblock(MyProcPort->sock))
+			ereport(COMMERROR,
+				  (errmsg("could not set socket to non-blocking mode: %m")));
+	}
+	else
+	{
+		if (!pg_set_block(MyProcPort->sock))
+			ereport(COMMERROR,
+					(errmsg("could not set socket to blocking mode: %m")));
+	}
+#endif
+	MyProcPort->noblock = nonblocking;
+}
 
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
@@ -1191,6 +1231,77 @@ pq_getmessage(StringInfo s, int maxlen)
 	return 0;
 }
 
+/*
+ * Wrapper of simple pthread locking functionality, using pthread_mutex_trylock
+ * and loop to make it interruptible when waiting the lock;
+ *
+ * return true if successfuly acquires the lock, false if unable to get the lock
+ * and interrupted by SIGTERM, otherwise, infinitely loop to acquire the mutex.
+ *
+ * If we are going to return false, we close the socket to client; this is crucial
+ * for exiting dispatch thread if it is stuck on sending NOTICE to client, and hence
+ * avoid mutex deadlock;
+ *
+ * NOTE: should not call CHECK_FOR_INTERRUPTS and ereport in this routine, since
+ * it is in multi-thread context;
+ */
+static bool
+pq_send_mutex_lock()
+{
+	int count = PQ_BUSY_TEST_COUNT_IN_EXITING;
+	int mutex_res;
+
+	do
+	{
+		mutex_res = pthread_mutex_trylock(&send_mutex);
+
+		if (mutex_res == 0)
+		{
+			return true;
+		}
+
+		if (mutex_res == EBUSY)
+		{
+			/* No need to acquire lock for TermSignalReceived, since we are in
+ 			 * a loop here */
+			if (TermSignalReceived)
+			{
+				/*
+ 				 * try PQ_BUSY_TEST_COUNT_IN_EXITING times before going to
+ 				 * close the socket, in case real concurrent writing is in
+ 				 * progress(compared to stuck send call in secure_write);
+ 				 *
+ 				 * It cannot help completely eliminate the false negative
+ 				 * cases, but giving the process is exiting, it is acceptable
+ 				 * to discard some messages, contrasted with the chance of
+ 				 * infinite stuck;
+ 				 */
+				if (count-- < 0)
+				{
+					/* On Redhat and Suse, simple closing the socket would not get
+					 * send() out of hanging state, shutdown() can do this(though not
+					 * explicitly mentioned in manual page); however, if send over a
+					 * socket which has been shutdown, process would be terminated by
+					 * SIGPIPE; to avoid this race condition, we set the socket to be
+					 * invalid before calling shutdown()
+					 *
+					 * On OSX, close() can get send() out of hanging state, while
+					 * shutdown() would lead to SIGPIPE */
+					int saved_fd = MyProcPort->sock;
+					MyProcPort->sock = -1;
+					whereToSendOutput = DestNone;
+#ifndef __darwin__
+					shutdown(saved_fd, SHUT_WR);
+#endif
+					closesocket(saved_fd);
+					return false;
+				}
+			}
+		}
+		pg_usleep(1000L);
+	} while (true);
+}
+
 
 /* --------------------------------
  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
@@ -1205,9 +1316,12 @@ pq_putbytes(const char *s, size_t len)
 
 	/* Should only be called by old-style COPY OUT */
 	Assert(DoingCopyOut);
-    pthread_mutex_lock(&send_mutex);
+	if (!pq_send_mutex_lock())
+	{
+		return EOF;
+	}
 	res = internal_putbytes(s, len);
-    pthread_mutex_unlock(&send_mutex);
+	pthread_mutex_unlock(&send_mutex);
 	return res;
 }
 
@@ -1246,7 +1360,13 @@ pq_flush(void)
 
 	/* No-op if reentrant call */
 	if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster)
-		pthread_mutex_lock(&send_mutex);
+	{
+		if (!pq_send_mutex_lock())
+		{
+			return EOF;
+		}
+	}
+	pq_set_nonblocking(false);
 	res = internal_flush();
 	if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster)
 		pthread_mutex_unlock(&send_mutex);
@@ -1287,6 +1407,7 @@ internal_flush(void)
 				
 				HOLD_INTERRUPTS();
 
+				/* we can use ereport here, for the protection of send mutex */
 				ereport(COMMERROR,
 						(errcode_for_socket_access(),
 						 errmsg("could not send data to client: %m")));
@@ -1340,9 +1461,7 @@ internal_flush(void)
  *
  *		We also suppress messages generated while pqcomm.c is busy.  This
  *		avoids any possibility of messages being inserted within other
- *		messages.  The only known trouble case arises if SIGQUIT occurs
- *		during a pqcomm.c routine --- quickdie() will try to send a warning
- *		message, and the most reasonable approach seems to be to drop it.
+ *		messages.
  *
  *		returns 0 if OK, EOF if trouble
  * --------------------------------
@@ -1350,21 +1469,25 @@ internal_flush(void)
 int
 pq_putmessage(char msgtype, const char *s, size_t len)
 {
-    int ret = EOF;
+
+	if (DoingCopyOut)
+	{
+		return EOF;
+	}
+
 	if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster)
-		pthread_mutex_lock(&send_mutex);
+	{
+		if (!pq_send_mutex_lock())
+		{
+			return EOF;
+		}
+	}
 
-    if (DoingCopyOut)
-    {
-        ret = 0;
-        goto fail;
-    }
-        
 	if (msgtype)
-    {
+	{
 		if (internal_putbytes(&msgtype, 1))
 			goto fail;
-    }
+	}
 
 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
 	{
@@ -1397,9 +1520,13 @@ fail:
 void
 pq_startcopyout(void)
 {
-    pthread_mutex_lock(&send_mutex);
+	if (!pq_send_mutex_lock())
+	{
+		/* no need to return a status, since socket has been closed in failed cases */
+		return;
+	}
 	DoingCopyOut = true;
-    pthread_mutex_unlock(&send_mutex);
+	pthread_mutex_unlock(&send_mutex);
 }
 
 /* --------------------------------
@@ -1420,9 +1547,12 @@ pq_endcopyout(bool errorAbort)
 	if (errorAbort)
 		pq_putbytes("\n\n\\.\n", 5);
 	/* in non-error case, copy.c will have emitted the terminator line */
-    pthread_mutex_lock(&send_mutex);
+	if (!pq_send_mutex_lock())
+	{
+		return;
+	}
 	DoingCopyOut = false;
-    pthread_mutex_unlock(&send_mutex);
+	pthread_mutex_unlock(&send_mutex);
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/tcop/postgres.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 00a1a70..5a8327e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -214,6 +214,8 @@ static PreparedStatement *unnamed_stmt_pstmt = NULL;
 
 static bool EchoQuery = false;	/* default don't echo */
 
+static bool DoingPqReading = false; /* in the middle of recv call of secure_read */
+
 extern pthread_t main_tid;
 #ifndef _WIN32
 pthread_t main_tid = (pthread_t)0;
@@ -599,6 +601,17 @@ ReadCommand(StringInfo inBuf)
  * non-reentrant libc functions.  This restriction makes it safe for us
  * to allow interrupt service routines to execute nontrivial code while
  * we are waiting for input.
+ *
+ * When waiting in the main loop, we can process any interrupt immediately
+ * in the signal handler. In any other read from the client, like in a COPY
+ * FROM STDIN, we can't safely process a query cancel signal, because we might
+ * be in the middle of sending a message to the client, and jumping out would
+ * violate the protocol. Or rather, pqcomm.c would detect it and refuse to
+ * send any more messages to the client. But handling a SIGTERM is OK, because
+ * we're terminating the backend and don't need to send any more messages
+ * anyway. That means that we might not be able to send an error message to
+ * the client, but that seems better than waiting indefinitely, in case the
+ * client is not responding.
  */
 void
 prepare_for_client_read(void)
@@ -619,6 +632,18 @@ prepare_for_client_read(void)
 		QueryCancelPending = false;
 		CHECK_FOR_INTERRUPTS();
 	}
+	else
+	{
+		DoingPqReading = true;
+		/* Allow die interrupts to be processed while waiting */
+		ImmediateDieOK = true;
+
+		/* Process the ones that already arrived */
+		if (ProcDiePending)
+		{
+			CHECK_FOR_INTERRUPTS();
+		}
+	}
 }
 
 /*
@@ -637,8 +662,48 @@ client_read_ended(void)
 		DisableNotifyInterrupt();
 		DisableCatchupInterrupt();
 	}
+	else
+	{
+		ImmediateDieOK = false;
+		DoingPqReading = false;
+	}
+}
+
+/*
+ * prepare_for_client_write -- set up to possibly block on client output
+ *
+ * Like prepare_for_client_read, but for writing.
+ *
+ * NOTE: this routine may be called in dispatch thread;
+ */
+void
+prepare_for_client_write(void)
+{
+	/* Only enable this on main thread */
+	if (pthread_equal(main_tid, pthread_self()))
+	{
+		/* Allow die interrupts to be processed while waiting */
+		ImmediateDieOK = true;
+
+		/* And don't forget to detect one that already arrived */
+		if (ProcDiePending)
+			CHECK_FOR_INTERRUPTS();
+	}
 }
 
+/*
+ * client_read_ended -- get out of the client-output state
+ *
+ * This is called just after low-level writes.
+ */
+void
+client_write_ended(void)
+{
+	if (pthread_equal(main_tid, pthread_self()))
+	{
+		ImmediateDieOK = false;
+	}
+}
 
 /*
  * Parse a query string and pass it through the rewriter.
@@ -3295,6 +3360,7 @@ die(SIGNAL_ARGS)
 	{
 		InterruptPending = true;
 		ProcDiePending = true;
+		TermSignalReceived = true;
 
 		/* although we don't strictly need to set this to true since the
 		 * ProcDiePending will occur first.  We set this anyway since the
@@ -3307,9 +3373,22 @@ die(SIGNAL_ARGS)
 		 * If it's safe to interrupt, and we're waiting for input or a lock,
 		 * service the interrupt immediately
 		 */
-		if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
-			CritSectionCount == 0)
+		if ((ImmediateInterruptOK || ImmediateDieOK) &&
+			InterruptHoldoffCount == 0 && CritSectionCount == 0)
 		{
+			if (ImmediateDieOK && !DoingPqReading)
+			{
+				/*
+				 * Getting here indicates that we have been interrupted during a
+				 * data message is under sending to client, so close the connection
+				 * immediately, since sending any more bytes may cause self dead
+				 * lock(though we can handle this using pq_send_mutex_lock() now, it
+				 * is better to avoid the unnecessary cost).
+				 */
+				close(MyProcPort->sock);
+				whereToSendOutput = DestNone;
+			}
+
 			/* bump holdoff count to make ProcessInterrupts() a no-op */
 			/* until we are done getting ready for it */
 			InterruptHoldoffCount++;
@@ -3486,6 +3565,7 @@ ProcessInterrupts(void)
 		ProcDiePending = false;
 		QueryCancelPending = false;		/* ProcDie trumps QueryCancel */
 		ImmediateInterruptOK = false;	/* not idle anymore */
+		ImmediateDieOK = false;		/* prevent re-entry */
 		DisableNotifyInterrupt();
 		DisableCatchupInterrupt();
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/utils/init/globals.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 842b2d1..a0220a6 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -33,6 +33,8 @@ volatile bool ProcDiePending = false;
 volatile bool ClientConnectionLost = false;
 volatile bool ImmediateInterruptOK = false;
 volatile bool InterruptWhenCallingPLUDF = false;
+volatile bool ImmediateDieOK = false;
+volatile bool TermSignalReceived = false;
 
 // Make these signed integers (instead of uint32) to detect garbage negative values.
 volatile int32 InterruptHoldoffCount = 0;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/libpq/pqcomm.h
----------------------------------------------------------------------
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index 54798fb..6fae8f0 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -201,4 +201,8 @@ typedef struct PrimaryMirrorTransitionPacket
 	uint32 dataLength;
 } PrimaryMirrorTransitionPacket;
 
+/* the number of times trying to acquire the send mutex for the front
+ * end connection after detecting process is exitting */
+#define PQ_BUSY_TEST_COUNT_IN_EXITING 5
+
 #endif   /* PQCOMM_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/miscadmin.h
----------------------------------------------------------------------
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 80c3a17..aabef90 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -80,6 +80,8 @@ extern volatile bool ClientConnectionLost;
 
 /* these are marked volatile because they are examined by signal handlers: */
 extern volatile bool ImmediateInterruptOK;
+extern volatile bool ImmediateDieOK;
+extern volatile bool TermSignalReceived;
 extern volatile bool InterruptWhenCallingPLUDF;
 extern PGDLLIMPORT volatile int32 InterruptHoldoffCount;
 extern PGDLLIMPORT volatile int32 CritSectionCount;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/tcop/tcopprot.h
----------------------------------------------------------------------
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index ad45a3a..5fc42b7 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -65,6 +65,8 @@ extern void StatementCancelHandler(SIGNAL_ARGS);
 extern void FloatExceptionHandler(SIGNAL_ARGS);
 extern void prepare_for_client_read(void);
 extern void client_read_ended(void);
+extern void prepare_for_client_write(void);
+extern void client_write_ended(void);
 extern int	PostgresMain(int argc, char *argv[], const char *username);
 extern long get_stack_depth_rlimit(void);
 extern void ResetUsage(void);


Mime
View raw message