hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul...@apache.org
Subject incubator-hawq git commit: HAWQ-1326. Cancel the query earlier if one of the segments for the query crashes
Date Tue, 14 Feb 2017 09:12:42 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 17a9dc1e5 -> 55e5ab5a3


HAWQ-1326. Cancel the query earlier if one of the segments for the query crashes


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/55e5ab5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/55e5ab5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/55e5ab5a

Branch: refs/heads/master
Commit: 55e5ab5a3eadfa9ba022db86826980cda870f1ce
Parents: 17a9dc1
Author: Paul Guo <paulguo@gmail.com>
Authored: Mon Feb 6 19:54:57 2017 +0800
Committer: Paul Guo <paulguo@gmail.com>
Committed: Tue Feb 14 17:12:27 2017 +0800

----------------------------------------------------------------------
 src/backend/cdb/dispatcher_mgt.c              | 32 +++++----
 src/backend/cdb/executormgr.c                 | 32 ++++++++-
 src/backend/resourcemanager/include/dynrm.h   |  8 +++
 src/backend/resourcemanager/resourcemanager.c | 77 ++++++++++++++++++++++
 src/backend/resourcemanager/resourcepool.c    | 30 +++++++++
 src/backend/storage/ipc/ipci.c                |  6 +-
 src/backend/tcop/pquery.c                     | 13 ++--
 src/include/cdb/cdbutil.h                     |  3 +
 src/include/cdb/executormgr.h                 |  2 +
 9 files changed, 181 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/cdb/dispatcher_mgt.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c
index 8ed3402..31c483e 100644
--- a/src/backend/cdb/dispatcher_mgt.c
+++ b/src/backend/cdb/dispatcher_mgt.c
@@ -42,7 +42,6 @@
 #endif
 #include "cdb/cdbconn.h"		/* SOCK_ERRNO */
 
-
 typedef enum DispMgtConstant {
 	DISPMGT_POLL_TIME = 2 * 1000,
 } DispMgtConstant;
@@ -288,14 +287,16 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState
*state)
 	while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) !=
NULL)
 	{
 		if (workermgr_should_query_stop(state)) {
-			write_log("+++++++++dispmgr_thread_func_run meets should query "
-					  "stop before dispatching, entering error_cleanup");
+			write_log("+++++++++%s meets should query "
+					  "stop before dispatching, entering error_cleanup",
+					  __func__);
 			goto error_cleanup;
 		}
 
 		if (!executormgr_dispatch_and_run(data, executor)) {
-			write_log("+++++++++dispmgr_thread_func_run meets dispatch_and_run "
-					  "problem when dispatching, entering error_cleanup");
+			write_log("+++++++++%s meets dispatch_and_run "
+					  "problem when dispatching, entering error_cleanup",
+					  __func__);
 			goto error_cleanup;
 		}
 	}
@@ -309,14 +310,23 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState
*state)
 
 		/* Check global state to abort query, this let poll process easier. */
 		if (workermgr_should_query_stop(state)){
-			write_log("dispmgr_thread_func_run meets should query stop when "
-					  "polling executors, entering error_cleanup");
+			write_log("%s meets should query stop when "
+					  "polling executors, entering error_cleanup",
+					  __func__);
 			goto error_cleanup;
 		}
 		/* Skip the stopped executor make the logic easy to understand. */
 		dispmgt_init_query_executor_in_group_iterator(group, &iterator, true);
 		while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator))
!= NULL)
 		{
+			if (!executormgr_check_segment_status(executor))
+			{
+				write_log("Detected one segment (Global ID: %d) is down, so "
+						  "abort the query that is running or will run on it",
+						  executormgr_get_ID(executor));
+				goto error_cleanup;
+			}
+
 			/*
 			 * The fds array may shorter than executor array.
 			 * DO NOT mark executor stop!
@@ -367,7 +377,7 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState
*state)
 		dispmgt_init_query_executor_in_group_iterator(group, &iterator, true);
 		while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator))
!= NULL)
 		{
-			int 	sockfd;
+			int __MAYBE_UNUSED sockfd;
 
 			sockfd = executormgr_get_fd(executor);
 			/* TODO: is that safe to call Assert() in a thread ? */
@@ -376,7 +386,7 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState
*state)
 				continue;
 
 			if (!executormgr_consume(executor)) {
-				write_log("dispmgr_thread_func_run meets consume error for executor, entering error_cleanup");
+				write_log("%s meets consume error for executor, entering error_cleanup", __func__);
 				goto error_cleanup;
 			}
 		}
@@ -388,7 +398,7 @@ error_cleanup:
 	 * 1. query cancel, result error, and poll error: mark the executor stop.
 	 * 2. connection error: mark the gang error. Set by workermgr_mark_executor_error().
 	 */
-  workermgr_set_state_cancel(state);
+	workermgr_set_state_cancel(state);
 	dispmgt_init_query_executor_in_group_iterator(group, &iterator, false);
 	while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) !=
NULL)
 	{
@@ -514,8 +524,6 @@ dispmgt_concurrent_connect(List	*executors, int executors_num_per_thread)
 	}
 	PG_CATCH();
 	{
-		ListCell	*lc;
-
 		workermgr_cancel_job(state);
 		/* We have to clean up the executors. */
 		dispmgt_free_concurrent_connect_state(tasks);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/cdb/executormgr.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c
index f8efb3d..85d3381 100644
--- a/src/backend/cdb/executormgr.c
+++ b/src/backend/cdb/executormgr.c
@@ -42,6 +42,8 @@
 #include "utils/guc_tables.h"	/* TODO: manipulate gucs */
 #include "portability/instr_time.h"	/* Monitor the dispatcher performance */
 
+#include "resourcemanager/dynrm.h"
+
 typedef enum ExecutorMgrConstant {
 	EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE = 256,
 } ExecutorMgrConstant;
@@ -320,6 +322,12 @@ executormgr_get_executor_result(QueryExecutor *executor)
 }
 
 int
+executormgr_get_ID(QueryExecutor *executor)
+{
+	return executor->desc->segment->ID;
+}
+
+int
 executormgr_get_fd(QueryExecutor *executor)
 {
 	return PQsocket(executor->desc->conn);
@@ -399,7 +407,7 @@ executormgr_is_dispatchable(QueryExecutor *executor)
 
 	if (!executormgr_validate_conn(conn) || PQstatus(conn) == CONNECTION_BAD)
 	{
-		write_log("function executormgr_is_dispatchable meets error, connection is bad.");
+		write_log("function %s meets error, connection is bad.", __func__);
 		executormgr_catch_error(executor);
 		return false;
 	}
@@ -471,6 +479,26 @@ error:
 	return false;
 }
 
+bool
+executormgr_check_segment_status(QueryExecutor *executor)
+{
+	/*
+	 * Cancel the query if a segment is down. QEs could hang in interconnect
+	 * until timeout when one segment is down. This will cause QD keep polling
+	 * until QE timeout.
+	 */
+	int ID = executormgr_get_ID(executor);
+
+	if (IsSegmentDown(ID))
+	{
+		cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1,
+						   executormgr_get_executor_result(executor));
+		return false;
+	}
+
+	return true;
+}
+
 /*
  * executormgr_consume
  *	If there are data available for executor, use this interface to consume data.
@@ -547,7 +575,7 @@ executormgr_consume(QueryExecutor *executor)
 
 connection_error:
 	/* Let caller deal with connection error. */
-	write_log("function executormgr_consume meets error, connection is bad.");
+	write_log("function %s meets error, connection is bad.", __func__);
 	executormgr_catch_error(executor);
 	return false;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h
index a6309c8..60b6c6a 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -324,4 +324,12 @@ int  MainHandlerLoop_RMSEG(void);
 void checkAndBuildFailedTmpDirList(void);
 
 void switchIMAliveTarget(void);
+
+int SegmentStatus_ShmSize(void);
+void SegmentStatusShmemReset(void);
+void SegmentStatusShmemInit(void);
+void MarkSegmentDown(int x);
+void MarkSegmentUp(int x);
+bool IsSegmentDown(int x);
+
 #endif //DYNAMIC_RESOURCE_MANAGEMENT_H

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index dda4271..652c83f 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -20,6 +20,7 @@
 #include "envswitch.h"
 #include "dynrm.h"
 #include "utils/hashtable.h"
+#include "cdb/cdbfts.h"
 
 #include "resourcemanager/resourcemanager.h"
 #include "resourceenforcer/resourceenforcer.h"
@@ -67,6 +68,11 @@
 extern bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
 static char *probeDatabase = "template1";
 
+/* Bitmap to monitor segment health info (up/down). */
+bits8 *shm_segment_status;
+void MarkSegmentUp(int x);
+void MarkSegmentDown(int x);
+
 int  loadAllQueueAndUser(void);
 int  loadHostInformationIntoResourcePool(void);
 
@@ -3014,3 +3020,74 @@ bool cleanedAllGRMContainers(void)
 	return PRESPOOL->AddPendingContainerCount == 0 &&
 		   PRESPOOL->RetPendingContainerCount == 0;
 }
+
+int SegmentStatus_ShmSize(void)
+{
+	return (FTS_MAX_DBS >> 3) + 1;
+}
+
+void SegmentStatusShmemReset(void)
+{
+	/* For each bit, 0 means an invalid node (down or non-existent). */
+	MemSet(shm_segment_status, 0, SegmentStatus_ShmSize());
+}
+
+void SegmentStatusShmemInit(void)
+{
+	bool found;
+
+	shm_segment_status = (bits8 *)ShmemInitStruct(
+		"Segment status (up/down. Monitored by RM)", SegmentStatus_ShmSize(), &found);
+
+	if (!shm_segment_status)
+		elog(FATAL, "could not initialize segment status bitmap (shared memory).");
+
+	if (found)
+		return;
+
+	SegmentStatusShmemReset();
+}
+
+void MarkSegmentDown(int x)
+{
+	int octnum, bitnum;
+
+    if (x < 0 || x >= FTS_MAX_DBS)
+        elog(ERROR, "Segment ID is out of range (%d)", x);
+
+	octnum = x >> 3;
+	bitnum = x & 0x7;
+
+	/* Clear the bit. */
+	shm_segment_status[octnum] &= ~(1 << bitnum);
+}
+
+void MarkSegmentUp(int x)
+{
+	int octnum, bitnum;
+
+    if (x < 0 || x >= FTS_MAX_DBS)
+        elog(ERROR, "Segment ID is out of range (%d)", x);
+
+	octnum = x >> 3;
+	bitnum = x & 0x7;
+
+	/* Set the bit. */
+	shm_segment_status[octnum] |= (1 << bitnum);
+}
+
+bool IsSegmentDown(int x)
+{
+	int octnum, bitnum;
+
+    if (x < 0 || x >= FTS_MAX_DBS)
+        elog(ERROR, "Segment ID is out of range (%d)", x);
+
+	octnum = x >> 3;
+	bitnum = x & 0x7;
+
+	if ((shm_segment_status[octnum] & (1 << bitnum)) == 0)
+		return true;
+
+	return false;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 63357b4..9485dbf 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -78,6 +78,8 @@ int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, void *val2);
 int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2);
 int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2);
 
+extern void MarkSegmentUp(int x);
+
 /*
  * The balanced BST index comparing function. The segment containing most
  * available resource is ordered at the left most, the segment not in
@@ -411,6 +413,8 @@ void cleanup_segment_config()
 	PQExpBuffer sql = NULL;
 	PGresult* result = NULL;
 
+	SegmentStatusShmemReset();
+
 	sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' "
 			"dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT);
 	conn = PQconnectdb(conninfo);
@@ -647,6 +651,19 @@ void update_segment_status(int32_t id, char status, char* description)
 	else if (status == SEGMENT_STATUS_DOWN)
 		Assert(strlen(description) != 0);
 
+	/* For segment nodes only. */
+	if (id >= REGISTRATION_ORDER_OFFSET)
+	{
+		if (status == SEGMENT_STATUS_UP)
+			MarkSegmentUp(id - REGISTRATION_ORDER_OFFSET);
+		else if (status == SEGMENT_STATUS_DOWN)
+			MarkSegmentDown(id - REGISTRATION_ORDER_OFFSET);
+		else
+			elog(ERROR, "Unrecognized segment status character: '%c' "
+						"(Should be one of '%c' and '%c')", status,
+						SEGMENT_STATUS_UP, SEGMENT_STATUS_DOWN);
+	}
+
 	sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' "
 			"dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT);
 	conn = PQconnectdb(conninfo);
@@ -814,6 +831,19 @@ void add_segment_config_row(int32_t id,
 	PQExpBuffer sql = NULL;
 	PGresult* result = NULL;
 
+	/* For segment nodes only. */
+	if (id >= REGISTRATION_ORDER_OFFSET)
+	{
+		if (status == SEGMENT_STATUS_UP)
+			MarkSegmentUp(id - REGISTRATION_ORDER_OFFSET);
+		else if (status == SEGMENT_STATUS_DOWN)
+			MarkSegmentDown(id - REGISTRATION_ORDER_OFFSET);
+		else
+			elog(ERROR, "Unrecognized segment status character: '%c' "
+						"(Should be one of '%c' and '%c')", status,
+						SEGMENT_STATUS_UP, SEGMENT_STATUS_DOWN);
+	}
+
 	sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' "
 			"dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT);
 	conn = PQconnectdb(conninfo);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/storage/ipc/ipci.c
----------------------------------------------------------------------
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index ad73af1..f65f0ff 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -66,6 +66,8 @@
 #include "cdb/cdbtmpdir.h"
 #include "utils/session_state.h"
 
+#include "resourcemanager/dynrm.h"
+
 shmem_startup_hook_type shmem_startup_hook = NULL;
 
 static Size total_addin_request = 0;
@@ -168,6 +170,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, PersistentRelfile_ShmemSize());
 		size = add_size(size, Pass2Recovery_ShmemSize());
 		size = add_size(size, FSCredShmemSize());
+		size = add_size(size, SegmentStatus_ShmSize());
 
         if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
         {
@@ -363,6 +366,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 
 	FSCredShmemInit();
 
+	SegmentStatusShmemInit();
 #ifdef EXEC_BACKEND
 
 	/*
@@ -371,7 +375,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	if (!IsUnderPostmaster)
 		ShmemBackendArrayAllocation();
 #endif
-	
+
 	SPI_InitMemoryReservation();
 	
 	/*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/tcop/pquery.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index efa5bce..0a5b9d5 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -832,7 +832,8 @@ AllocateResource(QueryResourceLife   life,
 			seg->hdfsHostname 	= qdseginfo->QD_HdfsHostName == NULL ?
 								  NULL :
 								  pstrdup(qdseginfo->QD_HdfsHostName);
-			seg->segindex 	  	= i;
+			seg->segindex   	= i;
+			seg->ID 			= qdseginfo->QD_SegInfo->ID;
 			seg->master   		= qdseginfo->QD_SegInfo->master;
 			seg->port     		= qdseginfo->QD_SegInfo->port;
 			seg->standby  		= qdseginfo->QD_SegInfo->standby;
@@ -849,8 +850,8 @@ AllocateResource(QueryResourceLife   life,
 
 			elog(DEBUG3, "Get allocated segment located at : %s:%d,"
 					  "Address:%s,"
-					  "Master:%d,Standby:%d,Alive:%d,ID:%d, "
-					  "HDFS Host:%s",
+					  "Master:%d,Standby:%d,Alive:%d,internl index:%d, "
+					  "Global ID:%d, HDFS Host:%s",
 					  seg->hostname,
 					  seg->port,
 					  seg->hostip,
@@ -858,6 +859,7 @@ AllocateResource(QueryResourceLife   life,
 					  seg->standby,
 					  seg->alive,
 					  seg->segindex,
+					  seg->ID,
 					  (seg->hdfsHostname == NULL ? "NULL" : seg->hdfsHostname));
 		}
 	}
@@ -959,7 +961,6 @@ static void
 RemoveFromGlobalQueryResources(int resourceId, QueryResourceLife life)
 {
   ListCell *lc;
-  QueryResourceItem *newItem;
   MemoryContext old;
 
   if (life == QRL_NONE)
@@ -991,8 +992,6 @@ static void
 SetResourcesAllocatedSucceed(int resourceId, QueryResourceLife life)
 {
 	ListCell *lc;
-	QueryResourceItem *newItem;
-	MemoryContext old;
 
 	if (life == QRL_NONE)
 	{
@@ -1019,7 +1018,7 @@ FreeResource(QueryResource *resource)
 	ListCell	*lc;
 	int			ret;
 	char		errorbuf[1024];
-	bool found = false;
+	bool __MAYBE_UNUSED found = false;
 
 	if (!resource)
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/include/cdb/cdbutil.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbutil.h b/src/include/cdb/cdbutil.h
index 23b4acc..7962067 100644
--- a/src/include/cdb/cdbutil.h
+++ b/src/include/cdb/cdbutil.h
@@ -127,6 +127,9 @@ typedef struct Segment {
 	bool	alive;
 
 	int		segindex;
+
+	/* Global unique ID. */
+	int		ID;
 } Segment;
 
 extern Segment *GetMasterSegment(void);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/include/cdb/executormgr.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/executormgr.h b/src/include/cdb/executormgr.h
index d1c562e..7d9f3c0 100644
--- a/src/include/cdb/executormgr.h
+++ b/src/include/cdb/executormgr.h
@@ -70,9 +70,11 @@ extern void executormgr_get_executor_connection_info(struct QueryExecutor
*execu
 extern bool	executormgr_is_stop(struct QueryExecutor *executor);
 extern bool	executormgr_has_error(struct QueryExecutor *executor);
 extern int	executormgr_get_executor_slice_id(struct QueryExecutor *executor);
+extern int	executormgr_get_ID(struct QueryExecutor *executor);
 extern int	executormgr_get_fd(struct QueryExecutor *executor);
 extern bool	executormgr_cancel(struct QueryExecutor * executor);
 extern bool	executormgr_dispatch_and_run(struct DispatchData *data, struct QueryExecutor
*executor);
+extern bool	executormgr_check_segment_status(struct QueryExecutor *executor);
 extern bool	executormgr_consume(struct QueryExecutor *executor);
 extern bool	executormgr_discard(struct QueryExecutor *executor);
 extern void	executormgr_merge_error(struct QueryExecutor *exeutor);


Mime
View raw message