hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject [48/50] incubator-hawq git commit: HAWQ-562. Refactor bucket number of external table.
Date Mon, 21 Mar 2016 22:02:17 GMT
HAWQ-562. Refactor bucket number of external table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7daee400
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7daee400
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7daee400

Branch: refs/heads/HAWQ-459
Commit: 7daee4002780797587c3dcca265c646fd0c92e1e
Parents: 695606f
Author: hzhang2 <zhanghuan929@163.com>
Authored: Mon Mar 21 10:26:11 2016 +0800
Committer: hzhang2 <zhanghuan929@163.com>
Committed: Mon Mar 21 16:29:42 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbcopy.c         |  2 +-
 src/backend/cdb/cdbdatalocality.c | 97 ++++++++++++++++++++--------------
 src/backend/cdb/cdbfilesplit.c    | 67 +++++++----------------
 src/backend/commands/tablecmds.c  | 11 ++--
 src/backend/parser/analyze.c      |  2 +-
 src/include/access/filesplit.h    |  2 +-
 src/include/cdb/cdbdatalocality.h |  1 -
 7 files changed, 88 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/cdb/cdbcopy.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbcopy.c b/src/backend/cdb/cdbcopy.c
index 17ffa96..ee2ff80 100644
--- a/src/backend/cdb/cdbcopy.c
+++ b/src/backend/cdb/cdbcopy.c
@@ -222,7 +222,7 @@ cdbCopyStart(CdbCopy *c, char *copyCmd, Oid relid, Oid relerror, List
*err_aoseg
 		List *scantable_splits = NIL;
 		prepareDispatchedCatalogRelation(q->contextdisp, relid, FALSE, NULL);
 		scantable_splits = AssignAOSegFileSplitToSegment(relid, NIL,
-														true, c->partition_num,
+														c->partition_num,
 														scantable_splits);
 		((CopyStmt *)q->utilityStmt)->scantable_splits = scantable_splits;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index 3f8fcb5..4c67429 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -34,6 +34,7 @@
 #include "access/parquetsegfiles.h"
 #include "catalog/catalog.h"
 #include "catalog/catquery.h"
+#include "catalog/pg_exttable.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
 #include "cdb/cdbdatalocality.h"
@@ -296,11 +297,13 @@ typedef struct split_to_segment_mapping_context {
 	int64 split_size;
 	MemoryContext old_memorycontext;
 	MemoryContext datalocality_memorycontext;
-	int externTableSegNum;  //expected virtual segment number when external table exists
+	int externTableOnClauseSegNum;  //expected virtual segment number when external table exists
+	int externTableLocationSegNum;  //expected virtual segment number when external table exists
 	int tableFuncSegNum;  //expected virtual segment number when table function exists
 	int hashSegNum;  // expected virtual segment number when there is hash table in from clause
 	int randomSegNum; // expected virtual segment number when there is random table in from
clause
 	int resultRelationHashSegNum; // expected virtual segment number when hash table as a result
relation
+	int minimum_segment_num; //default is 1.
 	int64 randomRelSize; //all the random relation size
 	int64 hashRelSize; //all the hash relation size
 
@@ -529,13 +532,15 @@ static void init_datalocality_context(split_to_segment_mapping_context
*context)
 	context->rtc_context.range_tables = NIL;
 	context->rtc_context.full_range_tables = NIL;
 
-	context->externTableSegNum = 0;
+	context->externTableOnClauseSegNum = 0;
+	context->externTableLocationSegNum = 0;
 	context->tableFuncSegNum = 0;
 	context->hashSegNum = 0;
 	context->resultRelationHashSegNum = 0;
 	context->randomSegNum = 0;
 	context->randomRelSize = 0;
 	context->hashRelSize = 0;
+	context->minimum_segment_num = 1;
 	/*
 	 * initialize the data distribution
 	 * static context.
@@ -762,17 +767,24 @@ static void check_keep_hash_and_external_table(
 			/* targetPolicy->bucketnum is bucket number of external table,
 			 * whose default value is set to default_segment_num
 			 */
-			if (context->externTableSegNum == 0) {
-				context->externTableSegNum = targetPolicy->bucketnum;
-			} else {
-				if (context->externTableSegNum < targetPolicy->bucketnum) {
-					context->externTableSegNum = targetPolicy->bucketnum;
-					/*
-					 * In this case, two external table join but with different bucket number
-					 * we cannot allocate the right segment number.
-					 */
-					//now we just restrict that vseg num > bucket number of external table
-					//elog(ERROR, "All external tables in one query must have the same bucket number!");
+			ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
+			if(extEnrty->isweb){
+				if (context->externTableOnClauseSegNum == 0) {
+					context->externTableOnClauseSegNum = targetPolicy->bucketnum;
+				} else {
+					if (context->externTableOnClauseSegNum != targetPolicy->bucketnum) {
+						/*
+						 * In this case, two external table join but with different bucket number
+						 * we cannot allocate the right segment number.
+						 */
+						elog(ERROR, "All external tables in one query must have the same bucket number!");
+					}
+				}
+			}
+			else{
+				if (context->externTableLocationSegNum < targetPolicy->bucketnum) {
+					context->externTableLocationSegNum = targetPolicy->bucketnum;
+					context->minimum_segment_num =  targetPolicy->bucketnum;
 				}
 			}
 		}
@@ -4074,17 +4086,17 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 				&(context.rtc_context.range_tables), &isTableFunctionExists,
 				context.datalocality_memorycontext);
 
-		/*Table Function VSeg Number = default_segment_number(configured in GUC) if table function
exists,
+		/* set expected virtual segment number for hash table and external table*/
+		/* calculate hashSegNum, externTableSegNum, resultRelationHashSegNum */
+		check_keep_hash_and_external_table(&context, query, intoPolicy);
+
+		/*Table Function VSeg Number = default_segment_number(configured in GUC) if table function
exists or gpfdist exists,
 		 *0 Otherwise.
 		 */
 		if (isTableFunctionExists) {
 			context.tableFuncSegNum = GetUserDefinedFunctionVsegNum();
 		}
 
-		/* set expected virtual segment number for hash table and external table*/
-		/* calculate hashSegNum, externTableSegNum, resultRelationHashSegNum */
-		check_keep_hash_and_external_table(&context, query, intoPolicy);
-
 		/* get block location and calculate relation size*/
 		get_block_locations_and_claculte_table_size(&context);
 
@@ -4163,13 +4175,11 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 				context.randomSegNum = expected_segment_num_with_max_filecount;
 			}
 			/* Step4 we at least use one segment*/
-			if (context.randomSegNum < minimum_segment_num) {
-				context.randomSegNum = minimum_segment_num;
+			if (context.randomSegNum < context.minimum_segment_num) {
+				context.randomSegNum = context.minimum_segment_num;
 			}
 
 			int maxExpectedNonRandomSegNum = 0;
-			if (maxExpectedNonRandomSegNum < context.externTableSegNum)
-				maxExpectedNonRandomSegNum = context.externTableSegNum;
 			if (maxExpectedNonRandomSegNum < context.tableFuncSegNum)
 				maxExpectedNonRandomSegNum = context.tableFuncSegNum;
 			if (maxExpectedNonRandomSegNum < context.hashSegNum)
@@ -4183,7 +4193,7 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 				fprintf(fpsegnum, "Result relation hash segment num : %d.\n", context.resultRelationHashSegNum);
 				fprintf(fpsegnum, "\n");
 				fprintf(fpsegnum, "Table  function      segment num : %d.\n", context.tableFuncSegNum);
-				fprintf(fpsegnum, "Extern table         segment num : %d.\n", context.externTableSegNum);
+				fprintf(fpsegnum, "Extern table         segment num : %d.\n", context.externTableOnClauseSegNum);
 				fprintf(fpsegnum, "From hash relation   segment num : %d.\n", context.hashSegNum);
 				fprintf(fpsegnum, "MaxExpectedNonRandom segment num : %d.\n", maxExpectedNonRandomSegNum);
 				fprintf(fpsegnum, "\n");
@@ -4193,29 +4203,33 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 			int maxTargetSegmentNumber = 0;
 			/* we keep resultRelationHashSegNum in the highest priority*/
 			if (context.resultRelationHashSegNum != 0) {
-				if (context.resultRelationHashSegNum < context.externTableSegNum
-						&& context.externTableSegNum != 0) {
+				if ((context.resultRelationHashSegNum != context.externTableOnClauseSegNum
+						&& context.externTableOnClauseSegNum != 0)
+						|| (context.resultRelationHashSegNum < context.externTableLocationSegNum)) {
 					cleanup_allocation_algorithm(&context);
 					elog(ERROR, "Could not allocate enough memory! "
 							"bucket number of result hash table and external table should match each other");
 				}
 				maxTargetSegmentNumber = context.resultRelationHashSegNum;
 				minTargetSegmentNumber = context.resultRelationHashSegNum;
-			} else if (maxExpectedNonRandomSegNum > 0) {
+			}
+			else if(context.externTableOnClauseSegNum > 0){
 				/* bucket number of external table must be the same with the number of virtual segments*/
-				if (maxExpectedNonRandomSegNum == context.externTableSegNum) {
-					context.externTableSegNum =
-							context.externTableSegNum < minimum_segment_num ?
-									minimum_segment_num : context.externTableSegNum;
-					maxTargetSegmentNumber = context.externTableSegNum;
-					minTargetSegmentNumber = context.externTableSegNum;
-				} else if (maxExpectedNonRandomSegNum == context.hashSegNum) {
+				if(context.externTableOnClauseSegNum < context.externTableLocationSegNum){
+					cleanup_allocation_algorithm(&context);
+					elog(ERROR, "external table bucket number should match each other");
+				}
+				maxTargetSegmentNumber = context.externTableOnClauseSegNum;
+				minTargetSegmentNumber = context.externTableOnClauseSegNum;
+			}
+			else if (maxExpectedNonRandomSegNum > 0) {
+				if (maxExpectedNonRandomSegNum == context.hashSegNum) {
 					/* in general, we keep bucket number of hash table equals to the number of virtual segments
 					 * but this rule can be broken when there is a large random table in the range tables
list
 					 */
 					context.hashSegNum =
-							context.hashSegNum < minimum_segment_num ?
-							minimum_segment_num : context.hashSegNum;
+							context.hashSegNum < context.minimum_segment_num ?
+							context.minimum_segment_num : context.hashSegNum;
 					double considerRandomWhenHashExistRatio = 1.5;
 					/*if size of random table >1.5 *hash table, we consider relax the restriction of
hash bucket number*/
 					if (context.randomRelSize
@@ -4224,7 +4238,7 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 							context.randomSegNum = context.hashSegNum;
 						}
 						maxTargetSegmentNumber = context.randomSegNum;
-						minTargetSegmentNumber = minimum_segment_num;
+						minTargetSegmentNumber = context.minimum_segment_num;
 					} else {
 						maxTargetSegmentNumber = context.hashSegNum;
 						minTargetSegmentNumber = context.hashSegNum;
@@ -4232,17 +4246,17 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 				} else if (maxExpectedNonRandomSegNum == context.tableFuncSegNum) {
 					/* if there is a table function, we should at least use tableFuncSegNum virtual segments*/
 					context.tableFuncSegNum =
-							context.tableFuncSegNum < minimum_segment_num ?
-									minimum_segment_num : context.tableFuncSegNum;
+							context.tableFuncSegNum < context.minimum_segment_num ?
+									context.minimum_segment_num : context.tableFuncSegNum;
 					if (context.randomSegNum < context.tableFuncSegNum) {
 						context.randomSegNum = context.tableFuncSegNum;
 					}
 					maxTargetSegmentNumber = context.randomSegNum;
-					minTargetSegmentNumber = minimum_segment_num;
+					minTargetSegmentNumber = context.minimum_segment_num;
 				}
 			} else {
 				maxTargetSegmentNumber = context.randomSegNum;
-				minTargetSegmentNumber = minimum_segment_num;
+				minTargetSegmentNumber = context.minimum_segment_num;
 			}
 
 			if (enforce_virtual_segment_number > 0) {
@@ -4254,6 +4268,9 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 				maxTargetSegmentNumber = fixedVsegNum;
 				minTargetSegmentNumber = fixedVsegNum;
 			}
+			if(maxTargetSegmentNumber < minTargetSegmentNumber){
+				maxTargetSegmentNumber = minTargetSegmentNumber;
+			}
 			uint64_t before_rm_allocate_resource = gettime_microsec();
 
 			/* cost is use by RM to balance workload between hosts. the cost is at least one block
size*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/cdb/cdbfilesplit.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbfilesplit.c b/src/backend/cdb/cdbfilesplit.c
index 4a2ad04..128f949 100644
--- a/src/backend/cdb/cdbfilesplit.c
+++ b/src/backend/cdb/cdbfilesplit.c
@@ -44,8 +44,7 @@ static void fileSplit_free(FileSplitNode *split);
 
 static List *
 computeSplitToSegmentMaps(Oid relid, GpPolicy *targetPolicy, List *splits,
-						List *segment_infos, bool keep_hash_policy,
-						int target_segment_num);
+						List *segment_infos, int target_segment_num);
 
 static List *
 postProcessSplitsPerSegment(List *oldSplitToSegmentMaps);
@@ -54,7 +53,7 @@ static int
 fileSplitNodeCmp(const void *left, const void *right);
 
 static SegFileSplitMapNode *
-AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_policy,
int target_segment_num)
+AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, int target_segment_num)
 {
 	SegFileSplitMapNode *result = NULL;
 	char storageChar;
@@ -108,7 +107,7 @@ AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool
keep_ha
 	 * data locality into account.
 	 */
 	splitToSegmentMaps = computeSplitToSegmentMaps(relid, targetPolicy, splits, segment_infos,
-											keep_hash_policy, target_segment_num);
+											target_segment_num);
 	Assert(splitToSegmentMaps);
 	list_free(splits);
 
@@ -118,7 +117,7 @@ AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool
keep_ha
 }
 
 List *
-AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_policy, int
target_segment_num, List *existings)
+AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, int target_segment_num, List
*existings)
 {
 	SegFileSplitMapNode *result = NULL;
 
@@ -130,7 +129,7 @@ AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_pol
 		foreach (child, children)
 		{
 			Oid myrelid = lfirst_oid(child);
-			result = AssignSingleAOSegFileSplitToSegment(myrelid, segment_infos, keep_hash_policy,
+			result = AssignSingleAOSegFileSplitToSegment(myrelid, segment_infos,
 					target_segment_num);
 			existings = lappend(existings, result);
 		}
@@ -140,7 +139,7 @@ AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_pol
 	}
 	else
 	{
-		result = AssignSingleAOSegFileSplitToSegment(relid, segment_infos, keep_hash_policy,
+		result = AssignSingleAOSegFileSplitToSegment(relid, segment_infos,
 													target_segment_num);
 		existings = lappend(existings, result);
 	}
@@ -149,25 +148,14 @@ AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_pol
 }
 
 /*
- * If keep_hash_policy is false, then the table is treated as randomly
- * distributed, regardless of its distribution policy.
  *
  * If segment_infos is NIL, then data locality is not needed.
  */
 static List *
 computeSplitToSegmentMaps(Oid relid, GpPolicy *targetPolicy, List *splits, List *segment_infos,
-						bool keep_hash_policy, int target_segment_num)
+						int target_segment_num)
 {
 	List *splitToSegmentMaps = NIL;
-	if (keep_hash_policy)
-	{
-		/*
-		 * If we want to keep the hash distribution policy,
-		 * we need to make sure the data partition number
-		 * equals to the number of target segments.
-		 */
-		Assert(targetPolicy->bucketnum == target_segment_num);
-	}
 
 	if (segment_infos != NIL)
 	{
@@ -184,37 +172,22 @@ computeSplitToSegmentMaps(Oid relid, GpPolicy *targetPolicy, List *splits,
List
 	 */
 	if (segment_infos == NIL)
 	{
-		/*
-		 * Sub case 1: we need to keep the hash distribution policy.
-		 *
-		 * In this case, we just need consider the segment file number.
-		 * This case works exactly the same as HAWQ1.x.
-		 */
-		if (keep_hash_policy)
+		ListCell *lc;
+		int i;
+		for (i = 0; i < target_segment_num; i++)
 		{
-			ListCell *lc;
-			int i;
-			for (i = 0; i < target_segment_num; i++)
-			{
-				splitToSegmentMaps = lappend(splitToSegmentMaps, NIL);
-			}
-
-			foreach(lc, splits)
-			{
-				int assigned_seg_no;
-				ListCell *per_seg_split;
-				FileSplit split = (FileSplitNode *)lfirst(lc);
-				Assert(split);
-				assigned_seg_no = (split->segno - 1) % target_segment_num;
-				per_seg_split = list_nth_cell(splitToSegmentMaps, assigned_seg_no);
-				lfirst(per_seg_split) = lappend((List *)lfirst(per_seg_split), split);
-			}
+			splitToSegmentMaps = lappend(splitToSegmentMaps, NIL);
 		}
-		else
+
+		foreach(lc, splits)
 		{
-			ereport(ERROR,
-					(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-					errmsg("Assigning splits to segment without keeping hash distribution policy is not
allowed")));
+			int assigned_seg_no;
+			ListCell *per_seg_split;
+			FileSplit split = (FileSplitNode *)lfirst(lc);
+			Assert(split);
+			assigned_seg_no = (split->segno - 1) % target_segment_num;
+			per_seg_split = list_nth_cell(splitToSegmentMaps, assigned_seg_no);
+			lfirst(per_seg_split) = lappend((List *)lfirst(per_seg_split), split);
 		}
 	}
 	else

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/commands/tablecmds.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c47b828..259821b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -968,7 +968,12 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt)
 			locationUris = transformLocationUris(exttypeDesc->location_list,
 												 createExtStmt->formatOpts,
 												 isweb, iswritable);
-			
+			int locLength = list_length(exttypeDesc->location_list);
+			if (createStmt->policy && locLength > 0)
+			{
+				createStmt->policy->bucketnum = locLength;
+			}
+
 			break;
 
 		case EXTTBL_TYPE_EXECUTE:
@@ -6437,7 +6442,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
 										 tab->relid,
 										 false,
 										 NULL);
-		ar_tab->scantable_splits = AssignAOSegFileSplitToSegment(tab->relid, NIL, true,
+		ar_tab->scantable_splits = AssignAOSegFileSplitToSegment(tab->relid, NIL,
 		                    target_segment_num, ar_tab->scantable_splits);
 		/*
 		 * Specify the segno directly as we don't have segno mapping here.
@@ -16719,7 +16724,7 @@ ATPExecPartSplit(Relation rel,
      * Dispatch split-related metadata.
      */
     scantable_splits = AssignAOSegFileSplitToSegment((Oid)intVal((Value *)pc->partid),
-              NIL, true, target_segment_num, scantable_splits);
+              NIL, target_segment_num, scantable_splits);
 
     pc->scantable_splits = scantable_splits;
     pc->newpart_aosegnos = segment_segnos;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/parser/analyze.c
----------------------------------------------------------------------
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index f17c90a..1f48c36 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -2724,7 +2724,7 @@ transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
 										 sizeof(p->attrs[0]));
 			p->ptype = POLICYTYPE_PARTITIONED;
 			p->nattrs = 0;
-			p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetDefaultPartitionNum());
+			p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum());
 			p->attrs[0] = 1;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/include/access/filesplit.h
----------------------------------------------------------------------
diff --git a/src/include/access/filesplit.h b/src/include/access/filesplit.h
index e075b18..e627b07 100644
--- a/src/include/access/filesplit.h
+++ b/src/include/access/filesplit.h
@@ -65,7 +65,7 @@ typedef SegFileSplitMapNode *SegFileSplitMap;
  * table data.
  */
 List *
-AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_policy, int
target_segment_num, List *existings);
+AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, int target_segment_num, List
*existings);
 
 /*
  * Given the relid, and the segment index, return the splits assigned

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/include/cdb/cdbdatalocality.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbdatalocality.h b/src/include/cdb/cdbdatalocality.h
index 224cb53..21e4c68 100644
--- a/src/include/cdb/cdbdatalocality.h
+++ b/src/include/cdb/cdbdatalocality.h
@@ -33,7 +33,6 @@
 #include "nodes/parsenodes.h"
 #include "executor/execdesc.h"
 
-#define minimum_segment_num 1
 /*
  * structure containing information about data residence
  * at the host.


Mime
View raw message