hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ztao1...@apache.org
Subject incubator-hawq git commit: HAWQ-390. Fix memory leak in dispatcher.
Date Fri, 05 Feb 2016 02:08:41 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master d240a4b56 -> 3426cb0e3


HAWQ-390. Fix memory leak in dispatcher.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3426cb0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3426cb0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3426cb0e

Branch: refs/heads/master
Commit: 3426cb0e37e4340ce40df16abce271f78977f8db
Parents: d240a4b
Author: zhenglin tao <zhenglin.taozl@gmail.com>
Authored: Thu Feb 4 15:37:17 2016 +0800
Committer: zhenglin tao <zhenglin.taozl@gmail.com>
Committed: Fri Feb 5 10:06:23 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/dispatcher.c       | 164 +++++++++++++++++++++-----------
 src/backend/cdb/dispatcher_mgt.c   |  21 ----
 src/backend/executor/execUtils.c   |   1 +
 src/backend/executor/nodeSubplan.c |   4 +-
 src/include/cdb/dispatcher.h       |   3 +
 src/include/cdb/dispatcher_mgt.h   |  19 ++++
 6 files changed, 132 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/cdb/dispatcher.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c
index 0312ea2..f72aa35 100644
--- a/src/backend/cdb/dispatcher.c
+++ b/src/backend/cdb/dispatcher.c
@@ -197,6 +197,7 @@ typedef struct DispatchStatement {
 
 /* Global states: DO NOT add global state unless you have to. */
 static MemoryContext	DispatchDataContext;
+static int	            DispatchInitCount = -1;
 
 
 /* Static function */
@@ -368,6 +369,9 @@ fillSliceVector(SliceTable *sliceTbl, int rootIdx, sliceVec *sliceVector,
int sl
 static void
 aggregateQueryResource(QueryResource *queryRes)
 {
+	MemoryContext	old;
+	old = MemoryContextSwitchTo(DispatchDataContext);
+
 	if (queryRes &&
 		queryRes->segments &&
 		(list_length(queryRes->segments) > 0))
@@ -407,21 +411,26 @@ aggregateQueryResource(QueryResource *queryRes)
 			queryRes->segment_vcore_agg[i++] = nseg;
 		}
 	}
+
+	MemoryContextSwitchTo(old);
 }
 
 static void
 dispatch_init_env(void)
 {
-	if (DispatchDataContext)
+	++DispatchInitCount;
+	if (DispatchDataContext) {
+		if (DispatchInitCount == 0)
+			MemoryContextResetAndDeleteChildren(DispatchDataContext);
 		return;
-
+	}
 	DispatchDataContext = AllocSetContextCreate(TopMemoryContext,
 			"Dispatch Data Context",
 			ALLOCSET_DEFAULT_MINSIZE,
 			ALLOCSET_DEFAULT_INITSIZE,
 			ALLOCSET_DEFAULT_MAXSIZE);
 
-	executormgr_setup_env(DispatchDataContext);
+	executormgr_setup_env(TopMemoryContext);
 }
 
 
@@ -740,6 +749,9 @@ prepare_dispatch_statement_node(struct DispatchData *data,
 static void
 dispatcher_split_logical_tasks_for_query_desc(DispatchData *data)
 {
+	MemoryContext	old;
+	old = MemoryContextSwitchTo(DispatchDataContext);
+
 	sliceVec	*sliceVector = NULL;
 	int			i;
 	int			slice_num;
@@ -839,6 +851,8 @@ dispatcher_split_logical_tasks_for_query_desc(DispatchData *data)
 	}
 
 	pfree(sliceVector);
+
+	MemoryContextSwitchTo(old);
 }
 
 /*
@@ -847,6 +861,7 @@ dispatcher_split_logical_tasks_for_query_desc(DispatchData *data)
 static void
 dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data)
 {
+	MemoryContext	old;
 	int 	i;
 	int		segment_num, segment_num_entrydb;
 
@@ -858,6 +873,8 @@ dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data)
 		return;
 	}
 
+	old = MemoryContextSwitchTo(DispatchDataContext);
+
 	data->job.used_slices_num = segment_num_entrydb + 1;
 	data->job.all_slices_num = segment_num_entrydb + 1;
 	data->job.slices = palloc0(data->job.used_slices_num * sizeof(DispatchSlice));
@@ -899,6 +916,8 @@ dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data)
     task->id.init = true;
   }
 	data->pQueryParms->primary_gang_id = -1;
+
+	MemoryContextSwitchTo(old);
 }
 
 /*
@@ -1266,6 +1285,74 @@ dispatch_wait(DispatchData *data)
   }
 }
 
+/*
+ * free_dispatch_data
+ *	free the DispatchData allocated resource
+ */
+void free_dispatch_data(struct DispatchData *data) {
+	if (data->pQueryParms) {
+		pfree(data->pQueryParms);
+		data->pQueryParms = NULL;
+	}
+	if (data->job.slices) {
+		for (int i = 0; i < data->job.used_slices_num; ++i) {
+			if (data->job.slices[i].tasks) {
+				pfree(data->job.slices[i].tasks);
+				data->job.slices[i].tasks = NULL;
+			}
+		}
+		pfree(data->job.slices);
+		data->job.slices = NULL;
+	}
+	if (data->query_executor_team) {
+		QueryExecutorGroup *qeGrp = data->query_executor_team->query_executor_groups;
+		if (qeGrp) {
+			if (qeGrp->fds) {
+				pfree(qeGrp->fds);
+				qeGrp->fds = NULL;
+			}
+			for (int j = 0; j < qeGrp->query_executor_num; ++j) {
+				pfree(qeGrp->query_executors[j]);
+				qeGrp->query_executors[j] = NULL;
+			}
+		}
+		pfree(data->query_executor_team);
+		data->query_executor_team = NULL;
+	}
+	pfree(data);
+}
+
+/*
+ * dispatch_end_env
+ * accompanied with dispatch_init_env
+ */
+void dispatch_end_env(struct DispatchData *data) {
+	Assert(data != NULL);
+
+	--DispatchInitCount;
+
+	cdbdisp_destroyDispatchResults(data->results);
+	data->results = NULL;
+	dispatcher_set_state_done(data);
+
+	PG_TRY();
+	{
+		dispatcher_unbind_executor(data);
+		if (data->resource_is_mine)
+		{
+			FreeResource(data->resource);
+			data->resource_is_mine = false;
+		}
+	}
+	PG_CATCH();
+	{
+		free_dispatch_data(data);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+	free_dispatch_data(data);
+}
+
 /* 
  * dispatch_cleanup
  *	Cleanup the workermgr. Report error if something happended on executors.
@@ -1273,6 +1360,10 @@ dispatch_wait(DispatchData *data)
 void
 dispatch_cleanup(DispatchData *data)
 {
+	/* to avoid duplicate dispatch cleanup */
+	if (data == NULL) return;
+	if (dispatcher_is_state_done(data)) return;
+
 	/*
 	 * We should not get here when dispatcher hit an exception. But
 	 * executors may have some troubles.
@@ -1285,24 +1376,7 @@ dispatch_cleanup(DispatchData *data)
 		return;	/* should not hit */
 	}
 	
-	if (dispatcher_is_state_done(data))
-		return;
-
-	/*
-	 * dispatch_throw_error needs to access error information, free here is safe.
-	 */
-	dispatcher_unbind_executor(data);
-	if (data->resource_is_mine)
-	{
-		FreeResource(data->resource);
-		data->resource_is_mine = false;
-	}
-
-	cdbdisp_destroyDispatchResults(data->results);
-	data->results = NULL;
-	dispatcher_set_state_done(data);
-	
-	pfree(data);
+	dispatch_end_env(data);
 }
 
 /*
@@ -1365,28 +1439,8 @@ dispatch_catch_error(DispatchData *data)
 		{}						/* nop; fall thru */
 		PG_END_TRY();
 	}
-	else
-	{
-		/*
-		 * Discard any remaining results from QEs; don't confuse matters by
-		 * throwing a new error.  Any results of interest presumably should
-		 * have been examined before raising the error that the caller is
-		 * currently handling.
-		 */
-		dispatcher_set_state_done(data);
-		cdbdisp_destroyDispatchResults(data->results);
-		data->results = NULL;
-	}
 
-	/*
-	 * dispatch_throw_error needs access error information, so return them here.
-	 */
-	dispatcher_unbind_executor(data);
-	if (data->resource_is_mine)
-	{
-		FreeResource(data->resource);
-		data->resource_is_mine = false;
-	}
+	dispatch_end_env(data);
 }
 
 void
@@ -1528,13 +1582,16 @@ dispatcher_fill_query_param(const char *strCommand,
 {
 	DispatchCommandQueryParms	*queryParms;
 	Segment		*master = NULL;
+	MemoryContext	old;
 
 	if (resource)
 	{
 	  master = resource->master;
 	}
 
+	old = MemoryContextSwitchTo(DispatchDataContext);
 	queryParms = palloc0(sizeof(*queryParms));
+	MemoryContextSwitchTo(old);
 
 	queryParms->strCommand = strCommand;
 	queryParms->strCommandlen = strCommand ? strlen(strCommand) + 1 : 0;
@@ -1644,6 +1701,9 @@ dispatch_get_task_identity(DispatchTask *task)
 static bool
 dispatch_collect_executors_error(DispatchData *data)
 {
+	MemoryContext	old;
+	old = MemoryContextSwitchTo(DispatchDataContext);
+
 	QueryExecutorIterator	iterator;
 	struct QueryExecutor	*executor;
 	bool ret = false;
@@ -1669,6 +1729,9 @@ dispatch_collect_executors_error(DispatchData *data)
 	  pfree(errHostName[i]);
 	}
 	pfree(errHostName);
+
+	MemoryContextSwitchTo(old);
+
 	return ret;
 }
 
@@ -1682,26 +1745,11 @@ dispatch_throw_error(DispatchData *data)
 	initStringInfo(&buf);
 	cdbdisp_dumpDispatchResults(data->results, &buf, false);
 
-	cdbdisp_destroyDispatchResults(data->results);
-	data->results = NULL;
-	/* Error was consumed, mark it done to prevent error check again. */
-	dispatcher_set_state_done(data);
-
 	/* Too bad, our gang got an error. */
 	PG_TRY();
 	{
-		/*
-		 * No one needs the error information here.
-		 */
-		dispatcher_unbind_executor(data);
-		if (data->resource_is_mine)
-		{
-			FreeResource(data->resource);
-			data->resource_is_mine = false;
-		}
-
 		ereport(ERROR, (errcode(errorcode),
-                        errOmitLocation(true),
+						errOmitLocation(true),
 						errmsg("%s", buf.data)));
 	}
 	PG_CATCH();

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/cdb/dispatcher_mgt.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c
index 69d6b0a..2920249 100644
--- a/src/backend/cdb/dispatcher_mgt.c
+++ b/src/backend/cdb/dispatcher_mgt.c
@@ -47,27 +47,6 @@ typedef enum DispMgtConstant {
 	DISPMGT_POLL_TIME = 2 * 1000,
 } DispMgtConstant;
 
-
-/*
- * QueryExecutorTeam/QueryExecutorGroup
- */
-typedef struct QueryExecutorGroup {
-	struct QueryExecutorTeam	*team;		/* Reference to the parent */
-	int						query_executor_num;
-
-	struct QueryExecutor	**query_executors;
-	struct pollfd			*fds;
-} QueryExecutorGroup;
-
-typedef struct QueryExecutorTeam {
-	/* Must same with thread_num. */
-	int						query_executor_group_num;
-	QueryExecutorGroup		*query_executor_groups;
-
-	/* Reference to other data structure */
-	struct DispatchData 	*refDispatchData;
-} QueryExecutorTeam;
-
 /*
  * QueryExecutorInGroupIterator/QueryExecutorGroupIterator
  */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/executor/execUtils.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index ffd756d..eb1124d 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -2279,6 +2279,7 @@ void mppExecutorCleanup(QueryDesc *queryDesc)
 			ExecSquelchNode(queryDesc->planstate);
 
 		dispatch_catch_error(estate->dispatch_data);
+		estate->dispatch_data = NULL;
 	}
 
 	/* Clean up the interconnect. */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/executor/nodeSubplan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c
index 292abaf..1a9b31b 100644
--- a/src/backend/executor/nodeSubplan.c
+++ b/src/backend/executor/nodeSubplan.c
@@ -1367,8 +1367,10 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext, QueryDesc
*gbl_query
 		 * Wait for them to finish and clean up the dispatching structures.
          * Replace current error info with QE error info if more interesting.
 		 */
-        if (shouldDispatch && queryDesc && queryDesc->estate &&
queryDesc->estate->dispatch_data)
+        if (shouldDispatch && queryDesc && queryDesc->estate &&
queryDesc->estate->dispatch_data) {
 			dispatch_catch_error(queryDesc->estate->dispatch_data);
+			queryDesc->estate->dispatch_data = NULL;
+        }
 		
 		/* teardown the sequence server */
 		TeardownSequenceServer();

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/include/cdb/dispatcher.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/dispatcher.h b/src/include/cdb/dispatcher.h
index 099f592..3175705 100644
--- a/src/include/cdb/dispatcher.h
+++ b/src/include/cdb/dispatcher.h
@@ -84,5 +84,8 @@ extern void dispatcher_print_statistics(StringInfo buf, struct DispatchData
*dat
 extern ProcessIdentity *dispatch_get_task_identity(struct DispatchTask *task);
 extern struct DispatchCommandQueryParms *dispatcher_get_QueryParms(struct DispatchData *data);
 
+void dispatch_end_env(struct DispatchData *data);
+void free_dispatch_data(struct DispatchData *data);
+
 #endif	/* DISPATCHER_H */
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/include/cdb/dispatcher_mgt.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/dispatcher_mgt.h b/src/include/cdb/dispatcher_mgt.h
index a67db13..f5ca7b7 100644
--- a/src/include/cdb/dispatcher_mgt.h
+++ b/src/include/cdb/dispatcher_mgt.h
@@ -40,6 +40,25 @@ typedef struct QueryExecutorIterator
 	int		executor_id;
 } QueryExecutorIterator;
 
+/*
+ * QueryExecutorTeam/QueryExecutorGroup
+ */
+typedef struct QueryExecutorGroup {
+	struct QueryExecutorTeam	*team;		/* Reference to the parent */
+	int						query_executor_num;
+
+	struct QueryExecutor	**query_executors;
+	struct pollfd			*fds;
+} QueryExecutorGroup;
+
+typedef struct QueryExecutorTeam {
+	/* Must same with thread_num. */
+	int						query_executor_group_num;
+	QueryExecutorGroup		*query_executor_groups;
+
+	/* Reference to other data structure */
+	struct DispatchData 	*refDispatchData;
+} QueryExecutorTeam;
 
 /* Iterate all of executors in all groups. */
 extern void dispmgt_init_query_executor_iterator(struct QueryExecutorTeam *team,


Mime
View raw message