hawq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huor <...@git.apache.org>
Subject [GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Date Wed, 08 Aug 2018 01:49:22 GMT
Github user huor commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434847
  
    --- Diff: src/backend/cdb/cdbdatalocality.c ---
    @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation,
     	return;
     }
     
    +static void InvokeHDFSProtocolBlockLocation(Oid    procOid,
    +                                            List  *locs,
    +                                            List **blockLocations)
    +{
    +	ExtProtocolValidatorData   *validator_data;
    +	FmgrInfo				   *validator_udf;
    +	FunctionCallInfoData		fcinfo;
    +
    +	validator_data = (ExtProtocolValidatorData *)
    +					 palloc0 (sizeof(ExtProtocolValidatorData));
    +	validator_udf = palloc(sizeof(FmgrInfo));
    +	fmgr_info(procOid, validator_udf);
    +
    +	validator_data->type 		= T_ExtProtocolValidatorData;
    +	validator_data->url_list 	= locs;
    +	validator_data->format_opts = NULL;
    +	validator_data->errmsg		= NULL;
    +	validator_data->direction 	= EXT_VALIDATE_READ;
    +	validator_data->action		= EXT_VALID_ACT_GETBLKLOC;
    +
    +	InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
    +							 /* FmgrInfo */ validator_udf,
    +							 /* nArgs */ 0,
    +							 /* Call Context */ (Node *) validator_data,
    +							 /* ResultSetInfo */ NULL);
    +
    +	/* invoke validator. if this function returns - validation passed */
    +	FunctionCallInvoke(&fcinfo);
    +
    +	ExtProtocolBlockLocationData *bls =
    +		(ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
    +	/* debug output block location. */
    +	if (bls != NULL)
    +	{
    +		ListCell *c;
    +		foreach(c, bls->files)
    +		{
    +			blocklocation_file *blf = (blocklocation_file *)(lfirst(c));
    +			elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
    +					     blf->file_uri, blf->block_num);
    +			for ( int i = 0 ; i < blf->block_num ; ++i )
    +			{
    +				BlockLocation *pbl = &(blf->locations[i]);
    +				elog(DEBUG3, "DEBUG LOCATION for block %d : %d, "
    +						     INT64_FORMAT ", " INT64_FORMAT ", %d",
    +						     i,
    +						     pbl->corrupt, pbl->length, pbl->offset,
    +							 pbl->numOfNodes);
    +				for ( int j = 0 ; j < pbl->numOfNodes ; ++j )
    +				{
    +					elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s",
    +							     i,
    +							     pbl->hosts[j], pbl->names[j],
    +								 pbl->topologyPaths[j]);
    +				}
    +			}
    +		}
    +	}
     
    +	elog(DEBUG3, "after invoking get block location API");
    +
    +	/* get location data from fcinfo.resultinfo. */
    +	if (bls != NULL)
    +	{
    +		Assert(bls->type == T_ExtProtocolBlockLocationData);
    +		while(list_length(bls->files) > 0)
    +		{
    +			void *v = lfirst(list_head(bls->files));
    +			bls->files = list_delete_first(bls->files);
    +			*blockLocations = lappend(*blockLocations, v);
    +		}
    +	}
    +	pfree(validator_data);
    +	pfree(validator_udf);
    +}
    +
    +Oid
    +LookupCustomProtocolBlockLocationFunc(char *protoname)
    +{
    +	List*	funcname 	= NIL;
    +	Oid		procOid		= InvalidOid;
    +	Oid		argList[1];
    +	Oid		returnOid;
    +
    +	char*   new_func_name = (char *)palloc0(strlen(protoname) + 16);
    +	sprintf(new_func_name, "%s_blocklocation", protoname);
    +	funcname = lappend(funcname, makeString(new_func_name));
    +	returnOid = VOIDOID;
    +	procOid = LookupFuncName(funcname, 0, argList, true);
    +
    +	if (!OidIsValid(procOid))
    +		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
    +						errmsg("protocol function %s was not found.",
    +								new_func_name),
    +						errhint("Create it with CREATE FUNCTION."),
    +						errOmitLocation(true)));
    +
    +	/* check return type matches */
    +	if (get_func_rettype(procOid) != returnOid)
    +		ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    +						errmsg("protocol function %s has an incorrect return type",
    +								new_func_name),
    +						errOmitLocation(true)));
    +
    +	/* check allowed volatility */
    +	if (func_volatile(procOid) != PROVOLATILE_STABLE)
    +		ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    +				 	 	errmsg("protocol function %s is not declared STABLE.",
    +						new_func_name),
    +						errOmitLocation(true)));
    +	pfree(new_func_name);
    +
    +	return procOid;
    +}
    +
    +static void ExternalGetHdfsFileDataLocation(
    +				Relation relation,
    +				split_to_segment_mapping_context *context,
    +				int64 splitsize,
    +				Relation_Data *rel_data,
    +				int* allblocks) {
    +	ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
    +	Assert(ext_entry->locations != NIL);
    +	int64 total_size = 0;
    +	int segno = 1;
    +
    +	/*
    +	 * Step 1. get external HDFS location from URI.
    +	 */
    +	char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations)));
    +	/* We must have at least one location. */
    +	Assert(first_uri_str != NULL);
    +	Uri* uri = ParseExternalTableUri(first_uri_str);
    +	bool isHdfs = false;
    +	if (uri != NULL && is_hdfs_protocol(uri)) {
    +		isHdfs = true;
    +	}
    +	Assert(isHdfs);  /* Currently, we accept HDFS only. */
    +
    +    /*
    +     * Step 2. Get function to call for getting location information. This work
    +     * is done by validator function registered for this external protocol.
    +     */
    +	Oid procOid = InvalidOid;
    +	if (isHdfs) {
    +		procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
    +	}
    +	else
    +	{
    +		Assert(false);
    +	}
    +
    +	/*
    +	 * Step 3. Call validator to get location data.
    +	 */
    +
    +	/* Prepare function call parameter by passing into location string. This is
    +	 * only called at dispatcher side. */
    +	List *bls = NULL; /* Block locations */
    +	if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
    +	{
    +		InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls);
    +	}
    +
    +	/*
    +	 * Step 4. Build data location info for optimization after this call.
    +	 */
    +
    +	/* Go through each files */
    +	ListCell *cbl = NULL;
    +	foreach(cbl, bls)
    +	{
    +		blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
    +		BlockLocation *locations = f->locations;
    +		int block_num = f->block_num;
    +		int64 logic_len = 0;
    +		*allblocks += block_num;
    +		if ((locations != NULL) && (block_num > 0)) {
    +			// calculate length for one specific file
    +			for (int j = 0; j < block_num; ++j) {
    +				logic_len += locations[j].length;
    +		//		locations[j].lowerBoundInc = NULL;
    +		//		locations[j].upperBoundExc = NULL;
    +			}
    +			total_size += logic_len;
    +
    +			Block_Host_Index * host_index = update_data_dist_stat(context,
    +					locations, block_num);
    +
    +			Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File));
    +			if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
    +			  file->segno = atoi(strrchr(f->file_uri, '/') + 1);
    +			else
    +			  file->segno = segno++;
    +			file->block_num = block_num;
    +			file->locations = locations;
    +			file->hostIDs = host_index;
    +			file->logic_len = logic_len;
    +
    +			// do the split logic
    +			int realSplitNum = 0;
    +			int split_num = file->block_num;
    +			int64 offset = 0;
    +			File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
    +			while (realSplitNum < split_num) {
    +					splits[realSplitNum].host = -1;
    +					splits[realSplitNum].is_local_read = true;
    +					splits[realSplitNum].offset = offset;
    +					splits[realSplitNum].length = file->locations[realSplitNum].length;
    +					splits[realSplitNum].logiceof = logic_len;
    +					splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri);
    +					splits[realSplitNum].fs_lb_len = 0;
    +					splits[realSplitNum].fs_ub_len = 0;
    +				//	file->locations[realSplitNum].lowerBoundInc = NULL;
    +					splits[realSplitNum].lower_bound_inc = NULL;
    --- End diff --
    
    Remove


---

Mime
View raw message