hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject incubator-hawq git commit: HAWQ-462. Adding dfs_address from pgfilespace along with fragment data sent to segments
Date Fri, 04 Mar 2016 18:49:22 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/HAWQ-462 [created] 50926e747


HAWQ-462. Adding dfs_address from pgfilespace along with fragment data sent to segments


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/50926e74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/50926e74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/50926e74

Branch: refs/heads/HAWQ-462
Commit: 50926e7470e4d5a6c6857d57063f8b8f5634d6f8
Parents: 7924d56
Author: Shivram Mani <shivram.mani@gmail.com>
Authored: Fri Mar 4 10:49:13 2016 -0800
Committer: Shivram Mani <shivram.mani@gmail.com>
Committed: Fri Mar 4 10:49:13 2016 -0800

----------------------------------------------------------------------
 src/backend/access/external/hd_work_mgr.c  |  7 +++++++
 src/backend/access/external/pxfuriparser.c | 10 +++++++++-
 src/bin/gpfusion/gpbridgeapi.c             | 26 ++++++++++---------------
 src/include/access/pxfuriparser.h          |  1 +
 4 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/50926e74/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index 61fd40e..49d9bea 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -768,6 +768,9 @@ make_allocation_output_string(List *segment_fragments)
 	initStringInfo(&segwork);
 	appendStringInfoString(&segwork, SEGWORK_PREFIX);
 	
+	char* dfs_address = NULL;
+	get_hdfs_location_from_filespace(&dfs_address);
+
 	foreach(frag_cell, segment_fragments)
 	{
 		AllocatedDataFragment *frag = (AllocatedDataFragment*)lfirst(frag_cell);
@@ -783,6 +786,10 @@ make_allocation_output_string(List *segment_fragments)
 		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
 		if (frag->fragment_md)
 			appendStringInfo(&fragment_str, "%s", frag->fragment_md);
+		/* Adding dfs_address from pg_filespace entry required for HAWQ-462 */
+		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
+		if (dfs_address)
+			appendStringInfo(&fragment_str, "%s", dfs_address);
 		if (frag->user_data)
 		{
 			appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/50926e74/src/backend/access/external/pxfuriparser.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfuriparser.c b/src/backend/access/external/pxfuriparser.c
index 0ce78bd..afa5d06 100644
--- a/src/backend/access/external/pxfuriparser.c
+++ b/src/backend/access/external/pxfuriparser.c
@@ -709,7 +709,15 @@ GPHDUri_parse_fragment(char* fragment, List* fragments)
 	*value_end = '\0';
 	fragment_data->index = pstrdup(value_start);
 	value_start = value_end + 1;
+
 	/* expect fragment metadata */
+	value_end = strchr(value_start, segwork_separator);
+	Assert(value_end != NULL);
+	*value_end = '\0';
+	fragment_data->fragment_md = pstrdup(value_start);
+	value_start = value_end + 1;
+
+	/* expect fragment dfs_address */
 	Assert(value_start);
 
 	/* check for user data */
@@ -719,7 +727,7 @@ GPHDUri_parse_fragment(char* fragment, List* fragments)
 		has_user_data = true;
 		*value_end = '\0';
 	}
-	fragment_data->fragment_md = pstrdup(value_start);
+	fragment_data->dfs_address = pstrdup(value_start);
 
 	/* read user data */
 	if (has_user_data)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/50926e74/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 5cdb624..9c72d82 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -52,7 +52,7 @@ void	append_churl_header_if_exists(gphadoop_context* context,
 void    set_current_fragment_headers(gphadoop_context* context);
 void	gpbridge_import_start(PG_FUNCTION_ARGS);
 void	gpbridge_export_start(PG_FUNCTION_ARGS);
-PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel);
+PxfServer* get_pxf_server(gphadoop_context* context, const Relation rel);
 size_t	gpbridge_read(PG_FUNCTION_ARGS);
 size_t	gpbridge_write(PG_FUNCTION_ARGS);
 void	parse_gphd_uri(gphadoop_context* context, bool is_import, PG_FUNCTION_ARGS);
@@ -60,7 +60,7 @@ void	build_uri_for_read(gphadoop_context* context);
 void 	build_file_name_for_write(gphadoop_context* context);
 void 	build_uri_for_write(gphadoop_context* context, PxfServer* rest_server);
 size_t	fill_buffer(gphadoop_context* context, char* start, size_t size);
-void	add_delegation_token(PxfInputData *inputData);
+void	add_delegation_token(PxfInputData *inputData, char* dfs_address);
 void	free_token_resources(PxfInputData *inputData);
 
 /* Custom protocol entry point for read
@@ -181,8 +181,8 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS)
 	inputData.gphduri = context->gphd_uri;
 	inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
 	inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
-	add_delegation_token(&inputData);
-	
+	add_delegation_token(&inputData, ((FragmentData*)lfirst(context->current_fragment))->dfs_address);
+
 	build_http_header(&inputData);
 	free_token_resources(&inputData);
 }
@@ -251,7 +251,7 @@ void gpbridge_export_start(PG_FUNCTION_ARGS)
 
 	/* get rest servers list and choose one */
 	Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
-	PxfServer* rest_server = get_pxf_server(context->gphd_uri, rel);
+	PxfServer* rest_server = get_pxf_server(context, rel);
 
 	if (!rest_server)
 		ereport(ERROR,
@@ -290,8 +290,9 @@ static void init_client_context(ClientContext *client_context)
  * if pxf_isilon is true, there are no PXF instances on the datanodes.
  * TODO: add locality
  */
-PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
+PxfServer* get_pxf_server(gphadoop_context* context, const Relation rel)
 {
+	GPHDUri* gphd_uri = context->gphd_uri;
 	ClientContext client_context; /* holds the communication info */
 	PxfInputData inputData = {0};
 	List	 	*rest_servers = NIL;
@@ -313,7 +314,6 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 	{
 		return NULL;
 	}
-
 	/*
 	 * Enrich the curl HTTP header
 	 */
@@ -321,7 +321,7 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 	inputData.gphduri = gphd_uri;
 	inputData.rel = rel;
 	inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */
-	add_delegation_token(&inputData);
+	add_delegation_token(&inputData, ((FragmentData*)lfirst(context->current_fragment))->dfs_address);
 	build_http_header(&inputData);
 
 	int port = atoi(gphd_uri->port);
@@ -505,20 +505,16 @@ size_t fill_buffer(gphadoop_context* context, char* start, size_t size)
  * Both regular and HA cases are handled the same way,
  * where a nameservice is parsed by HdfsParsePath()@fd.c
  */
-void add_delegation_token(PxfInputData *inputData)
+void add_delegation_token(PxfInputData *inputData, char *dfs_address)
 {
 	PxfHdfsTokenData *token = NULL;
-	char* dfs_address = NULL;
 
 	if (!enable_secure_filesystem)
 		return;
 
 	token = palloc0(sizeof(PxfHdfsTokenData));
 
-	get_hdfs_location_from_filespace(&dfs_address);
-
-    elog(DEBUG2, "locating token for %s", dfs_address);
-
+	elog(DEBUG2, "locating token for %s", dfs_address);
 	token->hdfs_token = find_filesystem_credential_with_uri(dfs_address);
 
 	if (token->hdfs_token == NULL)
@@ -526,8 +522,6 @@ void add_delegation_token(PxfInputData *inputData)
 	elog(DEBUG2, "Delegation token for %s found", dfs_address);
 
 	inputData->token = token;
-
-	pfree(dfs_address);
 }
 
 void free_token_resources(PxfInputData *inputData)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/50926e74/src/include/access/pxfuriparser.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfuriparser.h b/src/include/access/pxfuriparser.h
index 2c63ea5..80d2f18 100644
--- a/src/include/access/pxfuriparser.h
+++ b/src/include/access/pxfuriparser.h
@@ -45,6 +45,7 @@ typedef struct FragmentData
 	char	 *index;
 	char	 *source_name;
 	char	 *fragment_md;
+	char 	 *dfs_address;
 	char	 *user_data;
 } FragmentData;
 


Mime
View raw message