hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject [34/50] incubator-hawq git commit: HAWQ-529. Allocate resource for udf in resource negotiator.
Date Mon, 21 Mar 2016 22:02:03 GMT
HAWQ-529. Allocate resource for udf in resource negotiator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/4d997b57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/4d997b57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/4d997b57

Branch: refs/heads/HAWQ-459
Commit: 4d997b5714bb09ed9020a56acfce13b1d6fd637c
Parents: 2c033d9
Author: hzhang2 <zhanghuan929@163.com>
Authored: Fri Mar 18 10:14:42 2016 +0800
Committer: hzhang2 <zhanghuan929@163.com>
Committed: Fri Mar 18 10:42:44 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbdatalocality.c    | 213 ++++++++++++++++++++----------
 src/backend/executor/spi.c           |  71 ++++++++--
 src/backend/optimizer/plan/planner.c |  59 ++++++++-
 src/backend/optimizer/util/clauses.c |  11 +-
 src/backend/optimizer/util/walkers.c |   1 +
 src/include/cdb/cdbdatalocality.h    |  19 ++-
 src/include/executor/spi.h           |   7 +
 src/include/optimizer/planner.h      |   4 +
 8 files changed, 294 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index eec87b4..3f8fcb5 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -33,6 +33,9 @@
 #include "access/filesplit.h"
 #include "access/parquetsegfiles.h"
 #include "catalog/catalog.h"
+#include "catalog/catquery.h"
+#include "catalog/pg_inherits.h"
+#include "catalog/pg_proc.h"
 #include "cdb/cdbdatalocality.h"
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
@@ -41,6 +44,7 @@
 #include "utils/tqual.h"
 #include "utils/memutils.h"
 #include "executor/execdesc.h"
+#include "executor/spi.h"
 #include "nodes/nodes.h"
 #include "nodes/parsenodes.h"
 #include "optimizer/walkers.h"
@@ -54,11 +58,7 @@
 #include "catalog/pg_proc.h"
 #include "postgres.h"
 #include "resourcemanager/utils/hashtable.h"
-#include "catalog/pg_inherits.h"
-//#include "utils/misc/guc.c"
 
-#define PRONAME 1
-#define PROISAGG 5
 /* We need to build a mapping from host name to host index */
 
 extern bool		optimizer; /* Enable the optimizer */
@@ -376,7 +376,7 @@ static Block_Host_Index * update_data_dist_stat(
 static HostDataVolumeInfo *search_host_in_stat_context(
 		split_to_segment_mapping_context *context, char *hostname);
 
-static bool IsAggFunction(char* funcName);
+static bool IsBuildInFunction(Oid funcOid);
 
 static bool allocate_hash_relation(Relation_Data* rel_data,
 		Assignment_Log_Context *log_context, TargetSegmentIDMap* idMap,
@@ -638,39 +638,35 @@ static void collect_range_tables(Query *query, List* full_range_table,
 /*
  *
  */
-static bool IsAggFunction(char* funcName) {
-	if (funcName == NULL) {
-		return false;
-	}
-	Relation pg_proc_rel;
-	TupleDesc pg_proc_dsc;
-	HeapTuple tuple;
-	SysScanDesc pg_proc_scan;
-
-	pg_proc_rel = heap_open(ProcedureRelationId, AccessShareLock);
-	pg_proc_dsc = RelationGetDescr(pg_proc_rel);
-	ScanKeyData skey;
+static bool IsBuildInFunction(Oid foid) {
 
-	ScanKeyInit(&skey, PRONAME, BTEqualStrategyNumber,
-	F_NAMEEQ, CStringGetDatum(funcName));
-
-	pg_proc_scan = systable_beginscan(pg_proc_rel, InvalidOid, FALSE,
-			ActiveSnapshot, 1, &skey);
-	while (HeapTupleIsValid(tuple = systable_getnext(pg_proc_scan))) {
+	cqContext  *pcqCtx;
+	HeapTuple	procedureTuple;
+	Form_pg_proc procedureStruct;
 
-		bool isAgg = DatumGetBool(fastgetattr(tuple, PROISAGG, pg_proc_dsc, NULL));
-		systable_endscan(pg_proc_scan);
-		heap_close(pg_proc_rel, AccessShareLock);
-		if (isAgg) {
-			return true;
-		} else {
-			return false;
-		}
+	/*
+	 * get the procedure tuple corresponding to the given function Oid
+	 */
+	pcqCtx = caql_beginscan(
+			NULL,
+			cql("SELECT * FROM pg_proc "
+				" WHERE oid = :1 ",
+				ObjectIdGetDatum(foid)));
+
+	procedureTuple = caql_getnext(pcqCtx);
+
+	if (!HeapTupleIsValid(procedureTuple))
+		elog(ERROR, "cache lookup failed for function %u", foid);
+	procedureStruct = (Form_pg_proc) GETSTRUCT(procedureTuple);
+	caql_endscan(pcqCtx);
+	/* we treat proc namespace = 11 to build in function.*/
+	if (procedureStruct->pronamespace == 11) {
+		return true;
+	} else {
+		return false;
 	}
-	systable_endscan(pg_proc_scan);
-	heap_close(pg_proc_rel, AccessShareLock);
-	return true;
 }
+
 /*
  *
  */
@@ -684,25 +680,6 @@ static void convert_range_tables_to_oids_and_check_table_functions(List
**range_
 	foreach(old_lc, *range_tables)
 	{
 		RangeTblEntry *entry = (RangeTblEntry *) lfirst(old_lc);
-		if (entry->rtekind == RTE_FUNCTION || entry->rtekind == RTE_TABLEFUNCTION) {
-			*isTableFunctionExists = true;
-		}
-		if (entry->rtekind == RTE_SUBQUERY) {
-			Query* subQuery = entry->subquery;
-			ListCell *lc;
-			foreach(lc, subQuery->targetList)
-			{
-				TargetEntry *te = (TargetEntry *) lfirst(lc);
-				bool isAggFunc = IsAggFunction(te->resname);
-				// if target list of subquery contains non aggregate function,
-				// then we consider the query contains and use default_segment_num guc
-				// as the number of virtual segment
-				if (!isAggFunc) {
-					*isTableFunctionExists = true;
-				}
-			}
-
-		}
 		if (entry->rtekind != RTE_RELATION) {
 			continue;
 		}
@@ -1833,7 +1810,7 @@ static int select_random_host_algorithm(Relation_Assignment_Context
*context,
 	}
 	if (debug_fake_datalocality) {
 		fprintf(fp,
-				"cur_size_of_whole_query is:"INT64_FORMAT", avg_size_of_whole_query is: %.3f",
+				"cur_size_of_whole_query is:%.0f, avg_size_of_whole_query is: %.3f",
 				context->totalvols_with_penalty[minindex] + net_disk_ratio * splitsize,
 				context->avg_size_of_whole_query);
 	}
@@ -2393,7 +2370,7 @@ static Relation_File** change_file_order_based_on_continuity(
 		Relation_Data *rel_data, TargetSegmentIDMap* idMap, int host_num,
 		int* fileCount, Relation_Assignment_Context *assignment_context) {
 
-	Relation_File** file_vector;
+	Relation_File** file_vector = NULL;
 	int* isBlockContinue = (int *) palloc(sizeof(int) * host_num);
 	for (int i = 0; i < host_num; i++) {
 		isBlockContinue[i] = 0;
@@ -3429,45 +3406,45 @@ static void print_datalocality_overall_log_information(SplitAllocResult
*result,
 			if(log_context->minSegmentNumofHost > 0 ){
 				fprintf(fpratio, "segmentnumber_perhost_max/min=%.2f\n", (double)(log_context->maxSegmentNumofHost
/ log_context->minSegmentNumofHost));
 			}else{
-				fprintf(fpratio, "segmentnumber_perhost_max/min="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "segmentnumber_perhost_max/min=%lld\n", INT64_MAX);
 			}
 			if(log_context->avgSegmentNumofHost > 0 ){
 				fprintf(fpratio, "segmentnumber_perhost_max/avg=%.2f\n", (double)(log_context->maxSegmentNumofHost
/ log_context->avgSegmentNumofHost));
 			}else{
-				fprintf(fpratio, "segmentnumber_perhost_max/avg="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "segmentnumber_perhost_max/avg=%lld\n", INT64_MAX);
 			}
 
 			if (log_context->minSizeSegmentOverall > 0){
 				fprintf(fpratio, "segments_size_max/min=%.5f\n", (double)log_context->maxSizeSegmentOverall
/ (double)log_context->minSizeSegmentOverall);
 			}else{
-				fprintf(fpratio, "segments_size_max/min="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "segments_size_max/min=%lld\n", INT64_MAX);
 			}
 			if (log_context->avgSizeOverall > 0){
 				fprintf(fpratio, "segments_size_max/avg=%.5f\n", log_context->maxSizeSegmentOverall
/ log_context->avgSizeOverall);
 			}else{
-				fprintf(fpratio, "segments_size_max/avg="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "segments_size_max/avg=%lld\n", INT64_MAX);
 			}
 
 			if (log_context->minSizeSegmentOverallPenalty > 0){
 				fprintf(fpratio, "segments_size_penalty_max/min=%.5f\n",(double)log_context->maxSizeSegmentOverallPenalty
/ (double)log_context->minSizeSegmentOverallPenalty);
 			}else{
-				fprintf(fpratio, "segments_size_penalty_max/min="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "segments_size_penalty_max/min=%lld\n", INT64_MAX);
 			}
 			if (log_context->avgSizeOverallPenalty > 0){
 				fprintf(fpratio, "segments_size_penalty_max/avg=%.5f\n",log_context->maxSizeSegmentOverallPenalty
/ log_context->avgSizeOverallPenalty);
 			}else{
-				fprintf(fpratio, "segments_size_penalty_max/avg="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "segments_size_penalty_max/avg=%lld\n", INT64_MAX);
 			}
 
 			if (log_context->minContinuityOverall > 0){
 				fprintf(fpratio, "continuity_max/min=%.5f\n",log_context->maxContinuityOverall / log_context->minContinuityOverall);
 			}else{
-				fprintf(fpratio, "continuity_max/min="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "continuity_max/min=%lld\n", INT64_MAX);
 			}
 			if (log_context->avgContinuityOverall > 0){
 				fprintf(fpratio, "continuity_max/avg=%.5f\n",log_context->maxContinuityOverall / log_context->avgContinuityOverall);
 			}else{
-				fprintf(fpratio, "continuity_max/avg="INT64_FORMAT"\n", INT64_MAX);
+				fprintf(fpratio, "continuity_max/avg=%lld\n", INT64_MAX);
 			}
 			fflush(fpratio);
 			fclose(fpratio);
@@ -3976,11 +3953,54 @@ static void cleanup_allocation_algorithm(
 }
 
 /*
+ * udf_collector_walker: the routine to file udfs.
+ */
+bool udf_collector_walker(Node *node,
+		udf_collector_context *context) {
+	if (node == NULL) {
+		return false;
+	}
+
+	if (IsA(node, Query)) {
+		return query_tree_walker((Query *) node, udf_collector_walker,
+				(void *) context,
+				QTW_EXAMINE_RTES);
+	}
+
+	/*For Aggref, we don't consider it as udf.*/
+
+	if(IsA(node,FuncExpr)){
+		if(!IsBuildInFunction(((FuncExpr *) node)->funcid)){
+			context->udf_exist = true;
+		}
+		return false;
+	}
+
+	return expression_tree_walker(node, udf_collector_walker,
+			(void *) context);
+
+	return false;
+}
+
+/*
+ * find_udf: collect all udf, and store them into the udf_collector_context.
+ */
+void find_udf(Query *query, udf_collector_context *context) {
+
+	query_tree_walker(query, udf_collector_walker, (void *) context,
+	QTW_EXAMINE_RTES);
+
+	return;
+}
+
+
+/*
  * calculate_planner_segment_num
+ * fixedVsegNum is used by PBE, since all the execute should use the same number of vsegs.
  */
 SplitAllocResult *
 calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
-		List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum) {
+		List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum) {
 	SplitAllocResult *result = NULL;
 	QueryResource *resource = NULL;
 	QueryResourceParameters *resource_parameters = NULL;
@@ -3989,14 +4009,14 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 	List *alloc_result = NIL;
 	split_to_segment_mapping_context context;
 
-	int planner_segments = -1; /*virtual segments number for explain statement */
+	int planner_segments = 0; /*virtual segments number for explain statement */
 
 	result = (SplitAllocResult *) palloc(sizeof(SplitAllocResult));
 	result->resource = NULL;
 	result->resource_parameters = NULL;
 	result->alloc_results = NIL;
 	result->relsType = NIL;
-	result->planner_segments = -1;
+	result->planner_segments = 0;
 	result->datalocalityInfo = makeStringInfo();
     result->datalocalityTime = 0;
 
@@ -4014,7 +4034,7 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 		result->resource_parameters = NULL;
 		result->alloc_results = NIL;
 		result->relsType = NIL;
-		result->planner_segments = -1;
+		result->planner_segments = 0;
 		return result;
 	}
 
@@ -4039,6 +4059,11 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 		 * 5 data size of random "from" relation
 		 */
 
+		udf_collector_context udf_context;
+		udf_context.udf_exist = false;
+
+		find_udf(query, &udf_context);
+		isTableFunctionExists = udf_context.udf_exist;
 		/*convert range table list to oid list and check whether table function exists
 		 *we keep a full range table list and a range table list without result relation separately
 		 */
@@ -4065,7 +4090,15 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 
 		/*use inherit resource*/
 		if (resourceLife == QRL_INHERIT) {
-			resource = AllocateResource(resourceLife, sliceNum, 0, 0, 0, NULL, 0);
+
+			if ( SPI_IsInPrepare() && (GetActiveQueryResource() == NULL) )
+			{
+				resource = NULL;
+			}
+			else
+			{
+				resource = AllocateResource(resourceLife, sliceNum, 0, 0, 0, NULL, 0);
+			}
 
 			saveQueryResourceParameters(
 							resource_parameters,  /* resource_parameters */
@@ -4216,6 +4249,11 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 				maxTargetSegmentNumber = enforce_virtual_segment_number;
 				minTargetSegmentNumber = enforce_virtual_segment_number;
 			}
+			/* in PBE mode, the execute should use the same vseg number. */
+			if(fixedVsegNum > 0 ){
+				maxTargetSegmentNumber = fixedVsegNum;
+				minTargetSegmentNumber = fixedVsegNum;
+			}
 			uint64_t before_rm_allocate_resource = gettime_microsec();
 
 			/* cost is use by RM to balance workload between hosts. the cost is at least one block
size*/
@@ -4223,9 +4261,40 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 			mincost <<= 20;
 			int64 queryCost = context.total_size < mincost ? mincost : context.total_size;
 			if (QRL_NONE != resourceLife) {
-				resource = AllocateResource(QRL_ONCE, sliceNum, queryCost,
-						maxTargetSegmentNumber, minTargetSegmentNumber,
-						context.host_context.hostnameVolInfos, context.host_context.size);
+
+				if (SPI_IsInPrepare())
+				{
+					resource = NULL;
+					/*
+					 * prepare need to get resource quota from RM
+					 * and pass quota(planner_segments) to Orca or Planner to generate plan
+					 * the following executes(in PBE) should reallocate the same number
+					 * of resources.
+					 */
+					uint32 seg_num;
+					uint32 seg_num_min;
+					uint32 seg_memory_mb;
+					double seg_core;
+
+					GetResourceQuota(maxTargetSegmentNumber,
+					                 minTargetSegmentNumber,
+					                 &seg_num,
+					                 &seg_num_min,
+					                 &seg_memory_mb,
+					                 &seg_core);
+
+					planner_segments = seg_num;
+					minTargetSegmentNumber = planner_segments;
+					maxTargetSegmentNumber = planner_segments;
+				}
+				else
+				{
+					resource = AllocateResource(QRL_ONCE, sliceNum, queryCost,
+					                            maxTargetSegmentNumber,
+					                            minTargetSegmentNumber,
+					                            context.host_context.hostnameVolInfos,
+					                            context.host_context.size);
+				}
 
 				saveQueryResourceParameters(
 								resource_parameters,                   /* resource_parameters */
@@ -4258,7 +4327,7 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 
 			if (resource == NULL) {
 				result->resource = NULL;
-				result->resource_parameters = NULL;
+				result->resource_parameters = resource_parameters;
 				result->alloc_results = NIL;
 				result->relsType = NIL;
 				result->planner_segments = planner_segments;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/executor/spi.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 07812aa..d0029fc 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -46,6 +46,7 @@
 #include "cdb/memquota.h"
 #include "executor/nodeFunctionscan.h"
 #include "nodes/stack.h"
+#include "cdb/cdbdatalocality.h"
 
 extern char *savedSeqServerHost;
 extern int savedSeqServerPort;
@@ -74,6 +75,7 @@ static int	_SPI_curid = -1;
 
 static PGconn *_QD_conn = NULL; /* To call back to the QD for SQL execution */
 static char *_QD_currently_prepared_stmt = NULL;
+static int SPI_prepare_depth = 0;
 
 static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan);
 
@@ -106,6 +108,32 @@ static bool _SPI_checktuples(void);
 
 /* =================== interface functions =================== */
 
+bool SPI_IsInPrepare(void)
+{
+	if (SPI_prepare_depth > 0)
+	{
+		return true;
+	}
+	else if (SPI_prepare_depth < 0)
+	{
+		elog(ERROR, "Invalid SPI_prepare_depth %d while getting SPI prepare depth",
+		            SPI_prepare_depth);
+	}
+
+	return false;
+}
+
+void SPI_IncreasePrepareDepth(void)
+{
+	SPI_prepare_depth++;
+}
+
+void SPI_DecreasePrepareDepth(void)
+{
+	SPI_prepare_depth--;
+}
+
+
 int
 SPI_connect(void)
 {
@@ -566,6 +594,8 @@ SPI_prepare(const char *src, int nargs, Oid *argtypes)
 	_SPI_plan	plan;
 	_SPI_plan  *result;
 
+	SPI_IncreasePrepareDepth();
+
 	if (src == NULL || nargs < 0 || (nargs > 0 && argtypes == NULL))
 	{
 		SPI_result = SPI_ERROR_ARGUMENT;
@@ -591,9 +621,13 @@ SPI_prepare(const char *src, int nargs, Oid *argtypes)
 
 		/* copy plan to procedure context */
 		result = _SPI_copy_plan(&plan, _SPI_CPLAN_PROCXT);
+
+		SPI_DecreasePrepareDepth();
 	}
 	PG_CATCH();
 	{
+		SPI_DecreasePrepareDepth();
+
 		_SPI_end_call(true);
 		PG_RE_THROW();
 	}
@@ -1112,6 +1146,20 @@ SPI_cursor_open(const char *name, SPIPlanPtr plan,
 	qtlist = copyObject(qtlist);
 	ptlist = copyObject(ptlist);
 
+	PlannedStmt* stmt = (PlannedStmt*)linitial(ptlist);
+
+	if ( (Gp_role == GP_ROLE_DISPATCH) &&
+			 (stmt->resource_parameters != NULL) )
+	{
+		/*
+		 * Now, we want to allocate resource.
+		 */
+		stmt->resource = AllocateResource(stmt->resource_parameters->life, stmt->resource_parameters->slice_size,
+				stmt->resource_parameters->iobytes, stmt->resource_parameters->max_target_segment_num,
+				stmt->resource_parameters->min_target_segment_num, stmt->resource_parameters->vol_info,
+				stmt->resource_parameters->vol_info_size);
+	}
+
 	/* If the plan has parameters, set them up */
 	if (spiplan->nargs > 0)
 	{
@@ -1812,22 +1860,21 @@ _SPI_execute_plan(_SPI_plan * plan, Datum *Values, const char *Nulls,
 				 * We only allocate resource for multiple executions of queries, NOT for utility commands.
 				 * SELECT/INSERT are supported at present.
 				 */
-				if( (queryTree->commandType == CMD_SELECT) ||
-				    (queryTree->commandType == CMD_INSERT) )
+				if((queryTree->commandType == CMD_SELECT) ||
+						(queryTree->commandType == CMD_INSERT))
 				{
-					if ( (Gp_role == GP_ROLE_DISPATCH) &&
-					     (stmt->resource == NULL) &&
-					     (stmt->resource_parameters != NULL) )
+					if ((Gp_role == GP_ROLE_DISPATCH) &&
+							(stmt->resource == NULL) &&
+							(stmt->resource_parameters != NULL))
 					{
 						stmt->resource = AllocateResource(stmt->resource_parameters->life,
-						                        stmt->resource_parameters->slice_size,
-						                        stmt->resource_parameters->iobytes,
-						                        stmt->resource_parameters->max_target_segment_num,
-						                        stmt->resource_parameters->min_target_segment_num,
-						                        stmt->resource_parameters->vol_info,
-						                        stmt->resource_parameters->vol_info_size);
+								stmt->resource_parameters->slice_size,
+								stmt->resource_parameters->iobytes,
+								stmt->resource_parameters->max_target_segment_num,
+								stmt->resource_parameters->min_target_segment_num,
+								stmt->resource_parameters->vol_info,
+								stmt->resource_parameters->vol_info_size);
 					}
-
 					originalStmt->resource = NULL;
 				}
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/optimizer/plan/planner.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 6b2e13e..ebad24f 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -91,6 +91,8 @@ planner_hook_type planner_hook = NULL;
 
 ParamListInfo PlannerBoundParamList = NULL;		/* current boundParams */
 
+static int PlanningDepth = 0;		/* Planning depth */
+
 /* Expression kind codes for preprocess_expression */
 #define EXPRKIND_QUAL			0
 #define EXPRKIND_TARGET			1
@@ -165,6 +167,32 @@ static void sort_canonical_gs_list(List *gs, int *p_nsets, Bitmapset
***p_sets);
 static Plan *pushdown_preliminary_limit(Plan *plan, Node *limitCount, int64 count_est, Node
*limitOffset, int64 offset_est);
 bool is_dummy_plan(Plan *plan);
 
+
+bool is_in_planning_phase(void)
+{
+	if (PlanningDepth > 0)
+	{
+		return true;
+	}
+	else if (PlanningDepth < 0)
+	{
+		elog(ERROR, "Invalid PlanningDepth %d while getting planning phase", PlanningDepth);
+	}
+
+	return false;
+}
+
+void increase_planning_depth(void)
+{
+	PlanningDepth++;
+}
+
+void decrease_planning_depth(void)
+{
+	PlanningDepth--;
+}
+
+
 #ifdef USE_ORCA
 /**
  * Logging of optimization outcome
@@ -285,7 +313,7 @@ planner(Query *parse, int cursorOptions,
 	PlannedStmt *result = NULL;
 	instr_time	starttime, endtime;
 	ResourceNegotiatorResult *ppResult = (ResourceNegotiatorResult *) palloc(sizeof(ResourceNegotiatorResult));
-	SplitAllocResult initResult = {NULL, NULL, NIL, -1, NIL, NULL};
+	SplitAllocResult initResult = {NULL, NULL, NIL, 0, NIL, NULL};
 	ppResult->saResult = initResult;
 	ppResult->stmt = NULL;
 	static int plannerLevel = 0;
@@ -300,6 +328,8 @@ planner(Query *parse, int cursorOptions,
 	 * resource to run this query. After gaining the resource, we can perform the
 	 * actual optimization.
 	 */
+	increase_planning_depth();
+
 	plannerLevel++;
 	if (!resourceNegotiateDone)
 	{
@@ -309,6 +339,8 @@ planner(Query *parse, int cursorOptions,
       {
         resource_negotiator(parse, cursorOptions, boundParams, resourceLife, &ppResult);
 
+		decrease_planning_depth();
+
 		if(ppResult->stmt && ppResult->stmt->planTree)
 		{
 			isDispatchParallel = ppResult->stmt->planTree->dispatch == DISPATCH_PARALLEL;
@@ -318,6 +350,8 @@ planner(Query *parse, int cursorOptions,
 	  }
 	  PG_CATCH();
 	  {
+		decrease_planning_depth();
+
 		if ((ppResult != NULL))
 		{
 		  pfree(ppResult);
@@ -430,7 +464,7 @@ planner(Query *parse, int cursorOptions,
 	  }
 	  else
 	  {
-	    gp_segments_for_planner = -1;
+	    gp_segments_for_planner = 0;
 	  }
 	  SetActiveQueryResource(savedQueryResource);
 	  if ((ppResult != NULL))
@@ -450,7 +484,7 @@ planner(Query *parse, int cursorOptions,
 		}
 		else
 		{
-			gp_segments_for_planner = -1;
+			gp_segments_for_planner = 0;
 		}
 		SetActiveQueryResource(savedQueryResource);
 
@@ -471,6 +505,7 @@ planner(Query *parse, int cursorOptions,
 	return result;
 }
 
+
 /*
  * The new framework for HAWQ 2.0 query optimizer
  */
@@ -481,6 +516,8 @@ resource_negotiator(Query *parse, int cursorOptions, ParamListInfo boundParams,
   PlannedStmt *plannedstmt = NULL;
   do
   {
+  		udf_collector_context udf_context;
+  		udf_context.udf_exist = false;
     SplitAllocResult *allocResult = NULL;
     Query *my_parse = copyObject(parse);
     ParamListInfo my_boundParams = copyParamList(boundParams);
@@ -496,12 +533,26 @@ resource_negotiator(Query *parse, int cursorOptions, ParamListInfo boundParams,
        */
       allocResult = calculate_planner_segment_num(my_parse, resourceLife,
                                           plannedstmt->rtable, plannedstmt->intoPolicy,
-                                          plannedstmt->nMotionNodes + plannedstmt->nInitPlans
+ 1);
+                                          plannedstmt->nMotionNodes + plannedstmt->nInitPlans
+ 1,
+                                          -1);
 
       Assert(allocResult);
 
       (*result)->saResult = *allocResult;
       pfree(allocResult);
+    }else{
+    		find_udf(my_parse, &udf_context);
+    		if(udf_context.udf_exist){
+    			if ((resourceLife == QRL_ONCE) || (resourceLife == QRL_NONE)) {
+    				int64 mincost = min_cost_for_each_query;
+    				mincost <<= 20;
+    				int avgSliceNum = 3;
+    				(*result)->saResult.resource = AllocateResource(QRL_ONCE, avgSliceNum, mincost,
+    						GetUserDefinedFunctionVsegNum(),GetUserDefinedFunctionVsegNum(),NULL, 0);
+    			}else{
+    				(*result)->saResult.resource = AllocateResource(resourceLife, 0, 0, 0, 0, NULL,
0);
+    			}
+    		}
     }
   } while (0);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/optimizer/util/clauses.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index a80737f..a3d67f2 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2831,8 +2831,15 @@ simplify_function(Oid funcid, Oid result_type, List *args,
 	if (!HeapTupleIsValid(func_tuple))
 		elog(ERROR, "cache lookup failed for function %u", funcid);
 
-	newexpr = evaluate_function(funcid, result_type, args,
-								func_tuple, context);
+	if (is_in_planning_phase())
+	{
+		newexpr = NULL;
+	}
+	else
+	{
+		newexpr = evaluate_function(funcid, result_type, args,
+		                            func_tuple, context);
+	}
 
 	if (large_const(newexpr, context->max_size))
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/optimizer/util/walkers.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/util/walkers.c b/src/backend/optimizer/util/walkers.c
index 6f1ee51..a8222fb 100644
--- a/src/backend/optimizer/util/walkers.c
+++ b/src/backend/optimizer/util/walkers.c
@@ -162,6 +162,7 @@ expression_tree_walker(Node *node,
 		case T_PartBoundExpr:
 		case T_PartBoundInclusionExpr:
 		case T_PartBoundOpenExpr:
+		case T_RangeTblEntry	:
 			/* primitive node types with no expression subnodes */
 			break;
 		case T_Aggref:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/include/cdb/cdbdatalocality.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbdatalocality.h b/src/include/cdb/cdbdatalocality.h
index d28fc3e..224cb53 100644
--- a/src/include/cdb/cdbdatalocality.h
+++ b/src/include/cdb/cdbdatalocality.h
@@ -50,6 +50,13 @@ typedef struct SplitAllocResult
 } SplitAllocResult;
 
 /*
+ * structure containing all relation range table entries.
+ */
+typedef struct udf_collector_context {
+	bool udf_exist;
+} udf_collector_context;
+
+/*
  * structure containing rel and type when execution
  */
 typedef struct CurrentRelType {
@@ -87,7 +94,17 @@ void saveQueryResourceParameters(
  * we calculate the appropriate planner segment_num.
  */
 SplitAllocResult * calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
-                                                List *rtable, GpPolicy *intoPolicy, int sliceNum);
+                                                List *rtable, GpPolicy *intoPolicy, int sliceNum,
int fixedVsegNum);
+
+/*
+ * udf_collector_walker: the routine to file udfs.
+ */
+bool udf_collector_walker(Node *node,	udf_collector_context *context);
+
+/*
+ * find_udf: collect all udf, and store them into the udf_collector_context.
+ */
+void find_udf(Query *query, udf_collector_context *context);
 
 FILE *fp;
 FILE *fpaoseg;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/include/executor/spi.h
----------------------------------------------------------------------
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 867daf5..7b4ba1b 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -155,4 +155,11 @@ extern uint64 SPI_GetMemoryReservation(void);
 extern void SPI_ReserveMemory(uint64 mem_reserved);
 extern bool SPI_IsMemoryReserved(void);
 
+/**
+ * Query resource related routines.
+ */
+extern bool SPI_IsInPrepare(void);
+extern void SPI_IncreasePrepareCounter(void);
+extern void SPI_DecreasePrepareCounter(void);
+
 #endif   /* SPI_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/include/optimizer/planner.h
----------------------------------------------------------------------
diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h
index 48de348..cc2eb7d 100644
--- a/src/include/optimizer/planner.h
+++ b/src/include/optimizer/planner.h
@@ -50,4 +50,8 @@ extern bool choose_hashed_grouping(PlannerInfo *root,
 								   double dNumGroups, 
 								   AggClauseCounts *agg_counts);
 
+extern bool is_in_planning_phase(void);
+extern void increase_planning_depth(void);
+extern void decrease_planning_depth(void);
+
 #endif   /* PLANNER_H */


Mime
View raw message