hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject incubator-hawq git commit: HAWQ-462. Moved dfs address from fragment to segment metadata
Date Tue, 08 Mar 2016 19:49:08 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/HAWQ-462 c5d3e4a3b -> f996eb8a6


HAWQ-462. Moved dfs address from fragment to segment metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f996eb8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f996eb8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f996eb8a

Branch: refs/heads/HAWQ-462
Commit: f996eb8a69a565ac8f7afd202247bf27ae92c0ff
Parents: c5d3e4a
Author: Shivram Mani <shivram.mani@gmail.com>
Authored: Tue Mar 8 11:48:58 2016 -0800
Committer: Shivram Mani <shivram.mani@gmail.com>
Committed: Tue Mar 8 11:48:58 2016 -0800

----------------------------------------------------------------------
 src/backend/access/external/hd_work_mgr.c  |  9 ++++----
 src/backend/access/external/pxfuriparser.c | 30 +++++++++++++------------
 src/bin/gpfusion/gpbridgeapi.c             | 18 ++++++++-------
 src/include/access/pxfuriparser.h          |  4 +++-
 4 files changed, 33 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f996eb8a/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index a80eb01..f07ea7c 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -98,6 +98,7 @@ typedef struct sGpHost
 
 static const char *SEGWORK_PREFIX = "segwork=";
 static const char SEGWORK_IN_PAIR_DELIM = '@';
+static const char SEGWORK_DFS_DELIM = '&';
 
 static List* free_fragment_list(List *fragments);
 static List** distribute_work_2_gp_segments(List *data_fragments_list, int num_segs, int
working_segs);
@@ -768,8 +769,11 @@ make_allocation_output_string(List *segment_fragments)
 	initStringInfo(&segwork);
 	appendStringInfoString(&segwork, SEGWORK_PREFIX);
 	
+	/* Add dfs_address from pg_filespace to the segment data. Fixes HAWQ-462 *//* dfs_address
from pg_filespace entry */
 	char* dfs_address = NULL;
 	get_hdfs_location_from_filespace(&dfs_address);
+	appendStringInfoString(&segwork, dfs_address);
+	appendStringInfoChar(&segwork, SEGWORK_DFS_DELIM);
 
 	foreach(frag_cell, segment_fragments)
 	{
@@ -786,10 +790,6 @@ make_allocation_output_string(List *segment_fragments)
 		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
 		if (frag->fragment_md)
 			appendStringInfo(&fragment_str, "%s", frag->fragment_md);
-		/* Adding dfs_address from pg_filespace entry required for HAWQ-462 */
-		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
-		if (dfs_address)
-			appendStringInfo(&fragment_str, "%s", dfs_address);
 		if (frag->user_data)
 		{
 			appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
@@ -801,7 +801,6 @@ make_allocation_output_string(List *segment_fragments)
 		appendStringInfoChar(&segwork, SEGWORK_IN_PAIR_DELIM);
 		appendStringInfoString(&segwork, fragment_str.data);
 		pfree(fragment_str.data);
-
 	}
 	pfree(dfs_address);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f996eb8a/src/backend/access/external/pxfuriparser.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfuriparser.c b/src/backend/access/external/pxfuriparser.c
index 35ead79..24399e2 100644
--- a/src/backend/access/external/pxfuriparser.c
+++ b/src/backend/access/external/pxfuriparser.c
@@ -27,6 +27,7 @@
 
 static const char* segwork_substring = "segwork=";
 static const char segwork_separator = '@';
+static const char segwork_dfs_separator = '&';
 static const int EMPTY_VALUE_LEN = 2;
 
 static void  GPHDUri_parse_protocol(GPHDUri *uri, char **cursor);
@@ -124,6 +125,9 @@ freeGPHDUri(GPHDUri *uri)
 	if (uri->ha_nodes)
 		GPHD_HA_release_nodes(uri->ha_nodes);
 
+	if (uri->dfs_address)
+		pfree(uri->dfs_address);
+
 	pfree(uri);
 }
 
@@ -576,7 +580,6 @@ GPHDUri_debug_print_options(GPHDUri *uri)
 {
 	ListCell	*item;
 	int			count = 0;
-
 	elog(NOTICE, "options section data: ");
 	foreach(item, uri->options)
 	{
@@ -620,7 +623,7 @@ GPHDUri_debug_print_segwork(GPHDUri *uri)
 
 /*
  * GPHDUri_parse_segwork parses the segwork section of the uri.
- * ...&segwork=<size>@<ip>@<port>@<index><size>@<ip>@<port>@<index><size>...
+ * ...&segwork=dfs_address@<size>@<ip>@<port>@<index><size>@<ip>@<port>@<index><size>...
  */
 static void
 GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str)
@@ -634,9 +637,17 @@ GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str)
 	segwork = strstr(uri_str, segwork_substring);
 	if (segwork == NULL)
 		return;
-
 	segwork += strlen(segwork_substring);
 
+	/* parse dfs address */
+	size_end = strchr(segwork, segwork_dfs_separator);
+	if(size_end != NULL)
+	{
+		*size_end = '\0';
+		uri->dfs_address = pnstrdup(segwork, size_end-segwork);
+		segwork = size_end + 1;
+	}
+
 	/*
 	 * read next segment.
 	 * each segment is prefixed its size.
@@ -663,7 +674,7 @@ GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str)
 
 /*
  * Parsed a fragment string in the form:
- * <ip>@<port>@<index>@<fragment_md>@<dfs_address>[@user_data]
- 192.168.1.1@1422@1@<fragment metadata>@hdfs://nameservice/hawq_default[@user_data]
+ * <ip>@<port>@<index>@<fragment_md>[@user_data] - 192.168.1.1@1422@1@<fragment
metadata>[@user_data]
  * to authority ip:port - 192.168.1.1:1422
  * to index - 1
  * user data is optional
@@ -709,15 +720,7 @@ GPHDUri_parse_fragment(char* fragment, List* fragments)
 	*value_end = '\0';
 	fragment_data->index = pstrdup(value_start);
 	value_start = value_end + 1;
-
 	/* expect fragment metadata */
-	value_end = strchr(value_start, segwork_separator);
-	Assert(value_end != NULL);
-	*value_end = '\0';
-	fragment_data->fragment_md = pstrdup(value_start);
-	value_start = value_end + 1;
-
-	/* expect fragment dfs_address */
 	Assert(value_start);
 
 	/* check for user data */
@@ -727,7 +730,7 @@ GPHDUri_parse_fragment(char* fragment, List* fragments)
 		has_user_data = true;
 		*value_end = '\0';
 	}
-	fragment_data->dfs_address = pstrdup(value_start);
+	fragment_data->fragment_md = pstrdup(value_start);
 
 	/* read user data */
 	if (has_user_data)
@@ -753,7 +756,6 @@ GPHDUri_free_fragments(GPHDUri *uri)
 		pfree(data->index);
 		pfree(data->source_name);
 		pfree(data->fragment_md);
-		pfree(data->dfs_address);
 		if(data->user_data)
 			pfree(data->user_data);
 		pfree(data);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f996eb8a/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 9c72d82..053f391 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -52,7 +52,7 @@ void	append_churl_header_if_exists(gphadoop_context* context,
 void    set_current_fragment_headers(gphadoop_context* context);
 void	gpbridge_import_start(PG_FUNCTION_ARGS);
 void	gpbridge_export_start(PG_FUNCTION_ARGS);
-PxfServer* get_pxf_server(gphadoop_context* context, const Relation rel);
+PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel);
 size_t	gpbridge_read(PG_FUNCTION_ARGS);
 size_t	gpbridge_write(PG_FUNCTION_ARGS);
 void	parse_gphd_uri(gphadoop_context* context, bool is_import, PG_FUNCTION_ARGS);
@@ -60,7 +60,7 @@ void	build_uri_for_read(gphadoop_context* context);
 void 	build_file_name_for_write(gphadoop_context* context);
 void 	build_uri_for_write(gphadoop_context* context, PxfServer* rest_server);
 size_t	fill_buffer(gphadoop_context* context, char* start, size_t size);
-void	add_delegation_token(PxfInputData *inputData, char* dfs_address);
+void	add_delegation_token(PxfInputData *inputData);
 void	free_token_resources(PxfInputData *inputData);
 
 /* Custom protocol entry point for read
@@ -181,8 +181,8 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS)
 	inputData.gphduri = context->gphd_uri;
 	inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
 	inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
-	add_delegation_token(&inputData, ((FragmentData*)lfirst(context->current_fragment))->dfs_address);
-
+	add_delegation_token(&inputData);
+	
 	build_http_header(&inputData);
 	free_token_resources(&inputData);
 }
@@ -251,7 +251,7 @@ void gpbridge_export_start(PG_FUNCTION_ARGS)
 
 	/* get rest servers list and choose one */
 	Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
-	PxfServer* rest_server = get_pxf_server(context, rel);
+	PxfServer* rest_server = get_pxf_server(context->gphd_uri, rel);
 
 	if (!rest_server)
 		ereport(ERROR,
@@ -290,7 +290,7 @@ static void init_client_context(ClientContext *client_context)
  * if pxf_isilon is true, there are no PXF instances on the datanodes.
  * TODO: add locality
  */
-PxfServer* get_pxf_server(gphadoop_context* context, const Relation rel)
+PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 {
 	GPHDUri* gphd_uri = context->gphd_uri;
 	ClientContext client_context; /* holds the communication info */
@@ -314,6 +314,7 @@ PxfServer* get_pxf_server(gphadoop_context* context, const Relation rel)
 	{
 		return NULL;
 	}
+
 	/*
 	 * Enrich the curl HTTP header
 	 */
@@ -321,7 +322,7 @@ PxfServer* get_pxf_server(gphadoop_context* context, const Relation rel)
 	inputData.gphduri = gphd_uri;
 	inputData.rel = rel;
 	inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */
-	add_delegation_token(&inputData, ((FragmentData*)lfirst(context->current_fragment))->dfs_address);
+	add_delegation_token(&inputData);
 	build_http_header(&inputData);
 
 	int port = atoi(gphd_uri->port);
@@ -505,9 +506,10 @@ size_t fill_buffer(gphadoop_context* context, char* start, size_t size)
  * Both regular and HA cases are handled the same way,
  * where a nameservice is parsed by HdfsParsePath()@fd.c
  */
-void add_delegation_token(PxfInputData *inputData, char *dfs_address)
+void add_delegation_token(PxfInputData *inputData)
 {
 	PxfHdfsTokenData *token = NULL;
+	char *dfs_address = inputData->gphduri->dfs_address;
 
 	if (!enable_secure_filesystem)
 		return;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f996eb8a/src/include/access/pxfuriparser.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfuriparser.h b/src/include/access/pxfuriparser.h
index 80d2f18..b349029 100644
--- a/src/include/access/pxfuriparser.h
+++ b/src/include/access/pxfuriparser.h
@@ -45,7 +45,6 @@ typedef struct FragmentData
 	char	 *index;
 	char	 *source_name;
 	char	 *fragment_md;
-	char 	 *dfs_address;
 	char	 *user_data;
 } FragmentData;
 
@@ -77,6 +76,9 @@ typedef struct GPHDUri
 	 * NNHAConf  will also occupy <host> and <port> members
 	 */
 	NNHAConf        *ha_nodes;
+
+	/* dfs address from pg_filespace (optional) */
+	char			*dfs_address;
 } GPHDUri;
 
 GPHDUri	*parseGPHDUri(const char *uri_str);


Mime
View raw message