Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 26EDF184B1 for ; Fri, 5 Feb 2016 02:08:44 +0000 (UTC) Received: (qmail 23363 invoked by uid 500); 5 Feb 2016 02:08:44 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 23331 invoked by uid 500); 5 Feb 2016 02:08:44 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 23322 invoked by uid 99); 5 Feb 2016 02:08:44 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Feb 2016 02:08:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 9FC6CC236E for ; Fri, 5 Feb 2016 02:08:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.68 X-Spam-Level: X-Spam-Status: No, score=-3.68 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.46] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id rngbnJgJfcYx for ; Fri, 5 Feb 2016 02:08:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id BEF88439AB for ; Fri, 5 Feb 2016 02:08:41 +0000 (UTC) Received: (qmail 23309 invoked by uid 99); 5 Feb 2016 02:08:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Feb 2016 02:08:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0971FDFCE4; Fri, 5 Feb 2016 02:08:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ztao1987@apache.org To: commits@hawq.incubator.apache.org Message-Id: <453c6f9345234d69886f888d81935380@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-hawq git commit: HAWQ-390. Fix memory leak in dispatcher. Date: Fri, 5 Feb 2016 02:08:41 +0000 (UTC) Repository: incubator-hawq Updated Branches: refs/heads/master d240a4b56 -> 3426cb0e3 HAWQ-390. Fix memory leak in dispatcher. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3426cb0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3426cb0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3426cb0e Branch: refs/heads/master Commit: 3426cb0e37e4340ce40df16abce271f78977f8db Parents: d240a4b Author: zhenglin tao Authored: Thu Feb 4 15:37:17 2016 +0800 Committer: zhenglin tao Committed: Fri Feb 5 10:06:23 2016 +0800 ---------------------------------------------------------------------- src/backend/cdb/dispatcher.c | 164 +++++++++++++++++++++----------- src/backend/cdb/dispatcher_mgt.c | 21 ---- src/backend/executor/execUtils.c | 1 + src/backend/executor/nodeSubplan.c | 4 +- src/include/cdb/dispatcher.h | 3 + src/include/cdb/dispatcher_mgt.h | 19 ++++ 6 files changed, 132 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/cdb/dispatcher.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c index 0312ea2..f72aa35 100644 --- a/src/backend/cdb/dispatcher.c +++ b/src/backend/cdb/dispatcher.c @@ -197,6 +197,7 @@ typedef struct DispatchStatement { /* Global states: DO NOT add global state unless you have to. */ static MemoryContext DispatchDataContext; +static int DispatchInitCount = -1; /* Static function */ @@ -368,6 +369,9 @@ fillSliceVector(SliceTable *sliceTbl, int rootIdx, sliceVec *sliceVector, int sl static void aggregateQueryResource(QueryResource *queryRes) { + MemoryContext old; + old = MemoryContextSwitchTo(DispatchDataContext); + if (queryRes && queryRes->segments && (list_length(queryRes->segments) > 0)) @@ -407,21 +411,26 @@ aggregateQueryResource(QueryResource *queryRes) queryRes->segment_vcore_agg[i++] = nseg; } } + + MemoryContextSwitchTo(old); } static void dispatch_init_env(void) { - if (DispatchDataContext) + ++DispatchInitCount; + if (DispatchDataContext) { + if (DispatchInitCount == 0) + MemoryContextResetAndDeleteChildren(DispatchDataContext); return; - + } DispatchDataContext = AllocSetContextCreate(TopMemoryContext, "Dispatch Data Context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - executormgr_setup_env(DispatchDataContext); + executormgr_setup_env(TopMemoryContext); } @@ -740,6 +749,9 @@ prepare_dispatch_statement_node(struct DispatchData *data, static void dispatcher_split_logical_tasks_for_query_desc(DispatchData *data) { + MemoryContext old; + old = MemoryContextSwitchTo(DispatchDataContext); + sliceVec *sliceVector = NULL; int i; int slice_num; @@ -839,6 +851,8 @@ dispatcher_split_logical_tasks_for_query_desc(DispatchData *data) } pfree(sliceVector); + + MemoryContextSwitchTo(old); } /* @@ -847,6 +861,7 @@ dispatcher_split_logical_tasks_for_query_desc(DispatchData *data) static void dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data) { + MemoryContext old; int i; int segment_num, segment_num_entrydb; @@ -858,6 +873,8 @@ dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data) return; } + old = MemoryContextSwitchTo(DispatchDataContext); + data->job.used_slices_num = segment_num_entrydb + 1; data->job.all_slices_num = segment_num_entrydb + 1; data->job.slices = palloc0(data->job.used_slices_num * sizeof(DispatchSlice)); @@ -899,6 +916,8 @@ dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data) task->id.init = true; } data->pQueryParms->primary_gang_id = -1; + + MemoryContextSwitchTo(old); } /* @@ -1266,6 +1285,74 @@ dispatch_wait(DispatchData *data) } } +/* + * free_dispatch_data + * free the DispatchData allocated resource + */ +void free_dispatch_data(struct DispatchData *data) { + if (data->pQueryParms) { + pfree(data->pQueryParms); + data->pQueryParms = NULL; + } + if (data->job.slices) { + for (int i = 0; i < data->job.used_slices_num; ++i) { + if (data->job.slices[i].tasks) { + pfree(data->job.slices[i].tasks); + data->job.slices[i].tasks = NULL; + } + } + pfree(data->job.slices); + data->job.slices = NULL; + } + if (data->query_executor_team) { + QueryExecutorGroup *qeGrp = data->query_executor_team->query_executor_groups; + if (qeGrp) { + if (qeGrp->fds) { + pfree(qeGrp->fds); + qeGrp->fds = NULL; + } + for (int j = 0; j < qeGrp->query_executor_num; ++j) { + pfree(qeGrp->query_executors[j]); + qeGrp->query_executors[j] = NULL; + } + } + pfree(data->query_executor_team); + data->query_executor_team = NULL; + } + pfree(data); +} + +/* + * dispatch_end_env + * accompanied with dispatch_init_env + */ +void dispatch_end_env(struct DispatchData *data) { + Assert(data != NULL); + + --DispatchInitCount; + + cdbdisp_destroyDispatchResults(data->results); + data->results = NULL; + dispatcher_set_state_done(data); + + PG_TRY(); + { + dispatcher_unbind_executor(data); + if (data->resource_is_mine) + { + FreeResource(data->resource); + data->resource_is_mine = false; + } + } + PG_CATCH(); + { + free_dispatch_data(data); + PG_RE_THROW(); + } + PG_END_TRY(); + free_dispatch_data(data); +} + /* * dispatch_cleanup * Cleanup the workermgr. Report error if something happended on executors. @@ -1273,6 +1360,10 @@ dispatch_wait(DispatchData *data) void dispatch_cleanup(DispatchData *data) { + /* to avoid duplicate dispatch cleanup */ + if (data == NULL) return; + if (dispatcher_is_state_done(data)) return; + /* * We should not get here when dispatcher hit an exception. But * executors may have some troubles. @@ -1285,24 +1376,7 @@ dispatch_cleanup(DispatchData *data) return; /* should not hit */ } - if (dispatcher_is_state_done(data)) - return; - - /* - * dispatch_throw_error needs to access error information, free here is safe. - */ - dispatcher_unbind_executor(data); - if (data->resource_is_mine) - { - FreeResource(data->resource); - data->resource_is_mine = false; - } - - cdbdisp_destroyDispatchResults(data->results); - data->results = NULL; - dispatcher_set_state_done(data); - - pfree(data); + dispatch_end_env(data); } /* @@ -1365,28 +1439,8 @@ dispatch_catch_error(DispatchData *data) {} /* nop; fall thru */ PG_END_TRY(); } - else - { - /* - * Discard any remaining results from QEs; don't confuse matters by - * throwing a new error. Any results of interest presumably should - * have been examined before raising the error that the caller is - * currently handling. - */ - dispatcher_set_state_done(data); - cdbdisp_destroyDispatchResults(data->results); - data->results = NULL; - } - /* - * dispatch_throw_error needs access error information, so return them here. - */ - dispatcher_unbind_executor(data); - if (data->resource_is_mine) - { - FreeResource(data->resource); - data->resource_is_mine = false; - } + dispatch_end_env(data); } void @@ -1528,13 +1582,16 @@ dispatcher_fill_query_param(const char *strCommand, { DispatchCommandQueryParms *queryParms; Segment *master = NULL; + MemoryContext old; if (resource) { master = resource->master; } + old = MemoryContextSwitchTo(DispatchDataContext); queryParms = palloc0(sizeof(*queryParms)); + MemoryContextSwitchTo(old); queryParms->strCommand = strCommand; queryParms->strCommandlen = strCommand ? strlen(strCommand) + 1 : 0; @@ -1644,6 +1701,9 @@ dispatch_get_task_identity(DispatchTask *task) static bool dispatch_collect_executors_error(DispatchData *data) { + MemoryContext old; + old = MemoryContextSwitchTo(DispatchDataContext); + QueryExecutorIterator iterator; struct QueryExecutor *executor; bool ret = false; @@ -1669,6 +1729,9 @@ dispatch_collect_executors_error(DispatchData *data) pfree(errHostName[i]); } pfree(errHostName); + + MemoryContextSwitchTo(old); + return ret; } @@ -1682,26 +1745,11 @@ dispatch_throw_error(DispatchData *data) initStringInfo(&buf); cdbdisp_dumpDispatchResults(data->results, &buf, false); - cdbdisp_destroyDispatchResults(data->results); - data->results = NULL; - /* Error was consumed, mark it done to prevent error check again. */ - dispatcher_set_state_done(data); - /* Too bad, our gang got an error. */ PG_TRY(); { - /* - * No one needs the error information here. - */ - dispatcher_unbind_executor(data); - if (data->resource_is_mine) - { - FreeResource(data->resource); - data->resource_is_mine = false; - } - ereport(ERROR, (errcode(errorcode), - errOmitLocation(true), + errOmitLocation(true), errmsg("%s", buf.data))); } PG_CATCH(); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/cdb/dispatcher_mgt.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c index 69d6b0a..2920249 100644 --- a/src/backend/cdb/dispatcher_mgt.c +++ b/src/backend/cdb/dispatcher_mgt.c @@ -47,27 +47,6 @@ typedef enum DispMgtConstant { DISPMGT_POLL_TIME = 2 * 1000, } DispMgtConstant; - -/* - * QueryExecutorTeam/QueryExecutorGroup - */ -typedef struct QueryExecutorGroup { - struct QueryExecutorTeam *team; /* Reference to the parent */ - int query_executor_num; - - struct QueryExecutor **query_executors; - struct pollfd *fds; -} QueryExecutorGroup; - -typedef struct QueryExecutorTeam { - /* Must same with thread_num. */ - int query_executor_group_num; - QueryExecutorGroup *query_executor_groups; - - /* Reference to other data structure */ - struct DispatchData *refDispatchData; -} QueryExecutorTeam; - /* * QueryExecutorInGroupIterator/QueryExecutorGroupIterator */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/executor/execUtils.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index ffd756d..eb1124d 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -2279,6 +2279,7 @@ void mppExecutorCleanup(QueryDesc *queryDesc) ExecSquelchNode(queryDesc->planstate); dispatch_catch_error(estate->dispatch_data); + estate->dispatch_data = NULL; } /* Clean up the interconnect. */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/backend/executor/nodeSubplan.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 292abaf..1a9b31b 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -1367,8 +1367,10 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext, QueryDesc *gbl_query * Wait for them to finish and clean up the dispatching structures. * Replace current error info with QE error info if more interesting. */ - if (shouldDispatch && queryDesc && queryDesc->estate && queryDesc->estate->dispatch_data) + if (shouldDispatch && queryDesc && queryDesc->estate && queryDesc->estate->dispatch_data) { dispatch_catch_error(queryDesc->estate->dispatch_data); + queryDesc->estate->dispatch_data = NULL; + } /* teardown the sequence server */ TeardownSequenceServer(); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/include/cdb/dispatcher.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/dispatcher.h b/src/include/cdb/dispatcher.h index 099f592..3175705 100644 --- a/src/include/cdb/dispatcher.h +++ b/src/include/cdb/dispatcher.h @@ -84,5 +84,8 @@ extern void dispatcher_print_statistics(StringInfo buf, struct DispatchData *dat extern ProcessIdentity *dispatch_get_task_identity(struct DispatchTask *task); extern struct DispatchCommandQueryParms *dispatcher_get_QueryParms(struct DispatchData *data); +void dispatch_end_env(struct DispatchData *data); +void free_dispatch_data(struct DispatchData *data); + #endif /* DISPATCHER_H */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3426cb0e/src/include/cdb/dispatcher_mgt.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/dispatcher_mgt.h b/src/include/cdb/dispatcher_mgt.h index a67db13..f5ca7b7 100644 --- a/src/include/cdb/dispatcher_mgt.h +++ b/src/include/cdb/dispatcher_mgt.h @@ -40,6 +40,25 @@ typedef struct QueryExecutorIterator int executor_id; } QueryExecutorIterator; +/* + * QueryExecutorTeam/QueryExecutorGroup + */ +typedef struct QueryExecutorGroup { + struct QueryExecutorTeam *team; /* Reference to the parent */ + int query_executor_num; + + struct QueryExecutor **query_executors; + struct pollfd *fds; +} QueryExecutorGroup; + +typedef struct QueryExecutorTeam { + /* Must same with thread_num. */ + int query_executor_group_num; + QueryExecutorGroup *query_executor_groups; + + /* Reference to other data structure */ + struct DispatchData *refDispatchData; +} QueryExecutorTeam; /* Iterate all of executors in all groups. */ extern void dispmgt_init_query_executor_iterator(struct QueryExecutorTeam *team,