hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject incubator-hawq git commit: HAWQ-157. Add refactored implementation of allocating vsegs in resource pool
Date Fri, 13 Nov 2015 03:39:54 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 939769288 -> 8c633bf53


HAWQ-157. Add refactored implementation of allocating vsegs in resource pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8c633bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8c633bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8c633bf5

Branch: refs/heads/master
Commit: 8c633bf5365fa6f1b6e10aa7afbdae2b53f087fd
Parents: 9397692
Author: Yi Jin <yjin@pivotal.io>
Authored: Fri Nov 13 11:39:34 2015 +0800
Committer: Yi Jin <yjin@pivotal.io>
Committed: Fri Nov 13 11:39:34 2015 +0800

----------------------------------------------------------------------
 .../resourcemanager/include/resourcepool.h      |  15 +
 src/backend/resourcemanager/resourcepool.c      | 373 +++++++++++++++++++
 src/backend/utils/misc/guc.c                    |   2 +-
 3 files changed, 389 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c633bf5/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index 494e3e7..e81e108 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -559,6 +559,21 @@ int allocateResourceFromResourcePoolIOBytes(int32_t 	nodecount,
 										    int32_t    *totalvsegcount,
 										    int64_t    *vsegiobytes);
 
+int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
+										     int32_t	 minnodecount,
+										     uint32_t 	 memory,
+										     double 	 core,
+										     int64_t	 iobytes,
+										     int32_t   	 slicesize,
+											 int32_t	 vseglimitpseg,
+										     int 		 preferredcount,
+										     char 	   **preferredhostname,
+										     int64_t    *preferredscansize,
+										     bool		 fixnodecount,
+										     List 	   **vsegcounters,
+										     int32_t    *totalvsegcount,
+										     int64_t    *vsegiobytes);
+
 int allocateResourceFromResourcePool(int32_t 	nodecount,
 									 int32_t	minnodecount,
 		 	 	 	 	 	 	 	 uint32_t 	memory,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c633bf5/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 3c6bfee..d25b142 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -352,6 +352,8 @@ void initializeResourcePoolManager(void)
 		PRESPOOL->allocateResFuncs[i] = NULL;
 	}
 	PRESPOOL->allocateResFuncs[0] = allocateResourceFromResourcePoolIOBytes;
+	PRESPOOL->allocateResFuncs[1] = allocateResourceFromResourcePoolIOBytes2;
+
 
 	PRESPOOL->SlavesFileTimestamp = 0;
 	PRESPOOL->SlavesHostCount	  = 0;
@@ -2144,6 +2146,377 @@ int allocateResourceFromResourcePoolIOBytes(int32_t 	nodecount,
 	validateResourcePoolStatus(false);
 	return FUNC_RETURN_OK;
 }
+
+int allocateResourceFromResourcePoolIOBytes2(int32_t 	 nodecount,
+										     int32_t	 minnodecount,
+										     uint32_t 	 memory,
+										     double 	 core,
+										     int64_t	 iobytes,
+										     int32_t   	 slicesize,
+											 int32_t	 vseglimitpseg,
+										     int 		 preferredcount,
+										     char 	   **preferredhostname,
+										     int64_t    *preferredscansize,
+										     bool		 fixnodecount,
+										     List 	   **vsegcounters,
+										     int32_t    *totalvsegcount,
+										     int64_t    *vsegiobytes)
+{
+	int 			res			  		= FUNC_RETURN_OK;
+	uint32_t 		ratio 		  		= memory/core;
+	BBST			nodetree	  		= &(PRESPOOL->OrderedIOBytesWorkload);
+	BBSTNode		leftnode	  		= NULL;
+	SegResource		segresource   		= NULL;
+	List 		   *tmplist				= NULL;
+	int32_t			segid		  		= SEGSTAT_ID_INVALID;
+	GRMContainerSet containerset  		= NULL;
+	int				nodecountleft 		= nodecount;
+	int				impossiblecount   	= 0;
+	bool			skipchosenmachine 	= true;
+	int 			fullcount 			= nodetree->NodeIndex->NodeCount;
+	int 			clustersize 		= PRESPOOL->AvailNodeCount;
+	/* This hash saves all selected hosts containing at least one segment.    */
+	HASHTABLEData	vsegcnttbl;
+
+	initializeHASHTABLE(&vsegcnttbl,
+						PCONTEXT,
+						HASHTABLE_SLOT_VOLUME_DEFAULT,
+						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
+						HASHTABLE_KEYTYPE_UINT32,
+						NULL);
+	/*
+	 *--------------------------------------------------------------------------
+	 * stage 1 allocate based on locality, only 1 segment allocated in one host.
+	 *--------------------------------------------------------------------------
+	 */
+	if ( nodecount < clustersize )
+	{
+		elog(RMLOG, "Resource manager tries to find host based on locality data.");
+
+		for ( uint32_t i = 0 ; i < preferredcount ; ++i )
+		{
+			/*
+			 * Get machine identified by HDFS host name. The HDFS host names does
+			 * not have to be a YARN or HAWQ FTS recognized host name. Therefore,
+			 * getNodeIDByHDFSHostName() is responsible to find one mapped HAWQ
+			 * FTS unified host.
+			 */
+			res = getSegIDByHDFSHostName(preferredhostname[i],
+										 strlen(preferredhostname[i]),
+										 &segid);
+			if ( res != FUNC_RETURN_OK )
+			{
+				/* Can not find the machine, skip this machine. */
+				elog(LOG, "Resource manager failed to resolve HDFS host identified "
+						  "by %s. This host is skipped temporarily.",
+						  preferredhostname[i]);
+				continue;
+			}
+
+			/* Get the resource counter of this host. */
+			segresource = getSegResource(segid);
+
+			if (!IS_SEGRESOURCE_USABLE(segresource))
+			{
+				elog(RMLOG, "Segment %s has unavailable status:"
+						    "RUAlivePending: %d, Available :%d.",
+						    preferredhostname[i],
+						    segresource->RUAlivePending,
+						    segresource->Stat->FTSAvailable);
+				continue;
+			}
+
+			/* Get the allocated resource of this host with specified ratio */
+			res = getGRMContainerSet(segresource, ratio, &containerset);
+			if ( res != FUNC_RETURN_OK )
+			{
+				/* This machine does not have the resource with matching ratio.*/
+				elog(RMLOG, "Segment %s does not contain expected "
+						    "resource of %d MB per core. This host is skipped.",
+						    preferredhostname[i],
+						    ratio);
+				continue;
+			}
+
+			/* Decide how many segments can be allocated based on locality data.*/
+			int segcountact = containerset == NULL ?
+							  0 :
+							  containerset->Available.MemoryMB / memory;
+			if ( segcountact == 0 )
+			{
+				elog(RMLOG, "Segment %s does not have more resource to allocate. "
+							"This segment is skipped.",
+							preferredhostname[i]);
+				continue;
+			}
+
+			/*
+			 * We expect only 1 segment working in this preferred host. Therefore,
+			 * we check one virtual segment containing slicesize slices.
+			 *
+			 * NOTE: In this loop we always try to find segments not breaking
+			 * 		 slice limit. Because we will gothrough all segments later
+			 * 		 if not enough segments are found in this loop.
+			 */
+			if ( segresource->SliceWorkload + slicesize > rm_nslice_perseg_limit )
+			{
+				elog(LOG, "Segment %s contains %d slices working now, it can "
+						  "not afford %d more slices.",
+						  preferredhostname[i],
+						  segresource->SliceWorkload,
+						  slicesize);
+				continue;
+			}
+
+			elog(RMLOG, "Resource manager chooses segment %s to allocate vseg.",
+						 GET_SEGRESOURCE_HOSTNAME(segresource));
+
+			/* Allocate resource from selected host. */
+			allocateResourceFromSegment(segresource,
+									 	containerset,
+										memory,
+										core,
+										slicesize);
+
+			/* Reorder the changed host. */
+			reorderSegResourceAvailIndex(segresource, ratio);
+
+			/* Track the mapping from host information to hdfs host name index.*/
+			VSegmentCounterInternal vsegcnt = createVSegmentCounter(i, segresource);
+
+			setHASHTABLENode(&vsegcnttbl,
+							 TYPCONVERT(void *, segresource->Stat->ID),
+							 TYPCONVERT(void *, vsegcnt),
+							 false);
+
+			/* Check if we have gotten expected number of segments. */
+			nodecountleft--;
+			if ( nodecountleft == 0 )
+			{
+				break;
+			}
+		}
+	}
+
+	elog(RMLOG, "After choosing vseg based on locality, %d vsegs allocated, "
+				"expect %d vsegs.",
+				nodecount-nodecountleft,
+				nodecount);
+
+	/*
+	 *--------------------------------------------------------------------------
+	 * stage 2 allocate based on io workload.
+	 *--------------------------------------------------------------------------
+	 */
+
+	while( nodetree->Root != NULL )
+	{
+		leftnode = getLeftMostNode(nodetree);
+		removeBBSTNode(nodetree, &leftnode);
+		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+		tmplist= lappend(tmplist, leftnode);
+		MEMORY_CONTEXT_SWITCH_BACK
+	}
+
+	int round = -1;
+	bool allocated = true;
+	while( nodecountleft > 0 && (allocated || round < 1) )
+	{
+		round++;
+		allocated = false;
+
+		ListCell *curnodec 	= NULL;
+		foreach(curnodec, tmplist)
+		{
+			BBSTNode curnode = (BBSTNode)lfirst(curnodec);
+			SegResource currresinfo = (SegResource)(curnode->Data);
+
+			elog(RMLOG, "Try segment %s to allocate resource by round-robin.",
+						GET_SEGRESOURCE_HOSTNAME(currresinfo));
+
+			if ( !IS_SEGRESOURCE_USABLE(currresinfo) )
+			{
+				elog(RMLOG, "Segment %s is not resource usable, status %d pending %d",
+							GET_SEGRESOURCE_HOSTNAME(currresinfo),
+							currresinfo->Stat->FTSAvailable,
+							currresinfo->RUAlivePending?1:0);
+				continue;
+			}
+
+			VSegmentCounterInternal curhost = NULL;
+			PAIR pair = getHASHTABLENode(&vsegcnttbl,
+										 TYPCONVERT(void *,
+													currresinfo->Stat->ID));
+			if ( pair != NULL )
+			{
+				Assert(!currresinfo->RUAlivePending);
+				Assert(IS_SEGSTAT_FTSAVAILABLE(currresinfo->Stat));
+
+				curhost = (VSegmentCounterInternal)(pair->Value);
+
+				/* Host should not break vseg num limit. */
+				if ( !fixnodecount && curhost->VSegmentCount >= vseglimitpseg )
+				{
+					elog(RMLOG, "Segment %s can not container more vsegs for "
+								"current statement, allocated %d vsegs.",
+								GET_SEGRESOURCE_HOSTNAME(curhost->Resource),
+								curhost->VSegmentCount);
+					continue;
+				}
+
+				/*
+				 * In the first round, we dont choose the segments who already
+				 * have vseg allocated.
+				 */
+				if ( round == 0 )
+				{
+					elog(RMLOG, "Segment %s is skipped temporarily.",
+								GET_SEGRESOURCE_HOSTNAME(curhost->Resource));
+					continue;
+				}
+			}
+
+			if ( currresinfo->SliceWorkload + slicesize > rm_nslice_perseg_limit )
+			{
+				elog(LOG, "Segment %s contains %d slices working now, "
+						  "it can not afford %d more slices.",
+						  GET_SEGRESOURCE_HOSTNAME(currresinfo),
+						  currresinfo->SliceWorkload,
+						  slicesize);
+				continue;
+			}
+
+			res = getGRMContainerSet(currresinfo, ratio, &containerset);
+			if ( res != FUNC_RETURN_OK )
+			{
+				/* This machine does not have the resource with matching ratio.
+				 * In fact should never occur. */
+				elog(RMLOG, "Segment %s does not contain resource of %d MBPCORE",
+							GET_SEGRESOURCE_HOSTNAME(currresinfo),
+							ratio);
+				continue;
+			}
+
+			if ( containerset->Available.MemoryMB < memory ||
+				 containerset->Available.Core < core )
+			{
+				elog(RMLOG, "Segment %s does not contain enough resource of "
+							"%d MBPCORE",
+							GET_SEGRESOURCE_HOSTNAME(currresinfo),
+							ratio);
+				continue;
+			}
+
+			/* Try to allocate resource in the selected host. */
+			elog(RMLOG, "Resource manager chooses host %s to allocate vseg.",
+						GET_SEGRESOURCE_HOSTNAME(currresinfo));
+
+			/* Allocate resource. */
+			allocateResourceFromSegment(currresinfo,
+										containerset,
+										memory,
+										core,
+										slicesize);
+			/* Reorder the changed host. */
+			reorderSegResourceAvailIndex(currresinfo, ratio);
+
+			/*
+			 * Check if the selected host has hdfs host name passed in. If true
+			 * we just simply add the counter, otherwise, we create a new segment
+			 * counter instance.
+			 */
+			if ( curhost != NULL )
+			{
+				curhost->VSegmentCount++;
+			}
+			else
+			{
+				uint32_t hdfsnameindex = preferredcount;
+				int32_t  syncid		   = SEGSTAT_ID_INVALID;
+
+				for ( uint32_t k = 0 ; k < preferredcount ; ++k )
+				{
+					res=getSegIDByHDFSHostName(preferredhostname[k],
+											   strlen(preferredhostname[k]),
+											   &syncid);
+					if(syncid == currresinfo->Stat->ID)
+					{
+						hdfsnameindex = k;
+						break;
+					}
+				}
+
+				VSegmentCounterInternal vsegcnt = createVSegmentCounter(hdfsnameindex,
+																		currresinfo);
+				if (hdfsnameindex == preferredcount)
+				{
+					if (debug_print_split_alloc_result)
+					{
+						elog(LOG, "Segment %s mismatched HDFS host name.",
+								  GET_SEGRESOURCE_HOSTNAME(vsegcnt->Resource));
+					}
+				}
+
+				setHASHTABLENode(&vsegcnttbl,
+								 TYPCONVERT(void *, currresinfo->Stat->ID),
+								 TYPCONVERT(void *, vsegcnt),
+								 false);
+			}
+
+			nodecountleft--;
+			allocated = true;
+			if ( nodecountleft == 0 )
+			{
+				break;
+			}
+		}
+	}
+
+	/*
+	 * Insert all nodes saved in tmplist back to the tree to restore the resource
+	 * tree for the next time.
+	 */
+	while( list_length(tmplist) > 0 )
+	{
+		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+		insertBBSTNode(nodetree, (BBSTNode)(lfirst(list_head(tmplist))));
+		tmplist = list_delete_first(tmplist);
+		MEMORY_CONTEXT_SWITCH_BACK
+	}
+
+	/* STEP 3. Refresh io bytes workload. */
+	*vsegiobytes = (nodecount - nodecountleft) > 0 ?
+					iobytes / (nodecount - nodecountleft) :
+					0;
+
+	List 	 *vsegcntlist = NULL;
+	ListCell *cell		  = NULL;
+	getAllPAIRRefIntoList(&vsegcnttbl, &vsegcntlist);
+	foreach(cell, vsegcntlist)
+	{
+		VSegmentCounterInternal vsegcounter = (VSegmentCounterInternal)
+											  ((PAIR)(lfirst(cell)))->Value;
+		vsegcounter->Resource->IOBytesWorkload +=
+									(*vsegiobytes) * vsegcounter->VSegmentCount;
+		reorderSegResourceIOBytesWorkloadIndex(vsegcounter->Resource);
+	}
+
+    /* STEP 4. Build result. */
+	foreach(cell, vsegcntlist)
+	{
+		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+			(*vsegcounters) = lappend((*vsegcounters),
+									  ((PAIR)(lfirst(cell)))->Value);
+		MEMORY_CONTEXT_SWITCH_BACK
+	}
+	freePAIRRefList(&vsegcnttbl, &vsegcntlist);
+	cleanHASHTABLE(&vsegcnttbl);
+	*totalvsegcount = nodecount - nodecountleft;
+
+	validateResourcePoolStatus(false);
+	return FUNC_RETURN_OK;
+}
+
 /**
  * Return the resource to each original hosts.
  */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c633bf5/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e11f574..5b2a360 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -6405,7 +6405,7 @@ static struct config_int ConfigureNamesInt[] =
 			GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
 		},
 		&rm_allocation_policy,
-		0, 0, 10, NULL, NULL
+		1, 0, 10, NULL, NULL
 	},
 
     {


Mime
View raw message