hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject [2/2] incubator-hawq git commit: HAWQ-1179. Call Bridge api with profile value read from Fragmenter call.
Date Thu, 22 Dec 2016 06:35:30 GMT
HAWQ-1179. Call Bridge api with profile value read from Fragmenter call.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/fbef55d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/fbef55d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/fbef55d4

Branch: refs/heads/master
Commit: fbef55d49ccce734d44434a6985845a2892d37cc
Parents: 9cd42ad
Author: Oleksandr Diachenko <odiachenko@pivotal.io>
Authored: Wed Dec 21 22:33:48 2016 -0800
Committer: Oleksandr Diachenko <odiachenko@pivotal.io>
Committed: Wed Dec 21 22:33:48 2016 -0800

----------------------------------------------------------------------
 src/backend/access/external/hd_work_mgr.c       |  21 +-
 src/backend/access/external/pxfheaders.c        |  14 +-
 src/backend/access/external/pxfmasterapi.c      |   9 +
 src/backend/access/external/pxfuriparser.c      | 156 +++++-
 .../access/external/test/hd_work_mgr_test.c     | 112 ++++
 .../access/external/test/pxfuriparser_test.c    | 558 ++++++++++++++++++-
 src/bin/gpfusion/gpbridgeapi.c                  |  11 +
 src/include/access/pxfmasterapi.h               |   1 +
 src/include/access/pxfuriparser.h               |   3 +
 9 files changed, 841 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index 6829de5..520b5de 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -53,6 +53,7 @@ typedef struct sAllocatedDataFragment
 	char *source_name; /* source name */
 	char *fragment_md; /* fragment meta data */
 	char *user_data; /* additional user data */
+	char *profile; /* recommended profile to work with fragment */
 } AllocatedDataFragment;
 
 /*
@@ -713,6 +714,7 @@ create_allocated_fragment(DataFragment *fragment)
 	allocated->source_name = pstrdup(fragment->source_name);
 	allocated->fragment_md = (fragment->fragment_md) ? pstrdup(fragment->fragment_md)
: NULL;
 	allocated->user_data = (fragment->user_data) ? pstrdup(fragment->user_data) : NULL;
+	allocated->profile = (fragment->profile) ? pstrdup(fragment->profile) : NULL;
 	return allocated;
 }
 
@@ -782,12 +784,22 @@ make_allocation_output_string(List *segment_fragments)
 		appendStringInfo(&fragment_str, "%d", frag->index);
 		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
 		if (frag->fragment_md)
+		{
 			appendStringInfo(&fragment_str, "%s", frag->fragment_md);
+		}
+
+		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
 		if (frag->user_data)
 		{
-			appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
 			appendStringInfo(&fragment_str, "%s", frag->user_data);
 		}
+		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
+		if (frag->profile)
+		{
+			appendStringInfo(&fragment_str, "%s", frag->profile);
+		}
+		appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
+
 		fragment_size = strlen(fragment_str.data);
 
 		appendStringInfo(&segwork, "%d", fragment_size);
@@ -817,6 +829,8 @@ free_allocated_frags(List *segment_fragments)
 			pfree(frag->fragment_md);
 		if (frag->user_data)
 			pfree(frag->user_data);
+		if (frag->profile)
+			pfree(frag->profile);
 		pfree(frag);
 	}
 	list_free(segment_fragments);
@@ -856,6 +870,11 @@ print_fragment_list(List *fragments)
 		{
 			appendStringInfo(&log_str, "user data: %s\n", frag->user_data);
 		}
+
+		if (frag->profile)
+		{
+			appendStringInfo(&log_str, "profile: %s\n", frag->profile);
+		}
 	}
 
 	elog(FRAGDEBUG, "%s", log_str.data);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/backend/access/external/pxfheaders.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c
index fe7cd6a..2381044 100644
--- a/src/backend/access/external/pxfheaders.c
+++ b/src/backend/access/external/pxfheaders.c
@@ -27,12 +27,12 @@
 #include "catalog/pg_exttable.h"
 #include "access/pxfheaders.h"
 #include "access/pxffilters.h"
+#include "utils/formatting.h"
 #include "utils/guc.h"
 
 static void add_alignment_size_httpheader(CHURL_HEADERS headers);
 static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel);
 static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphduri);
-static char* prepend_x_gp(const char* key);
 static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData *inputData);
 static void add_remote_credentials(CHURL_HEADERS headers);
 static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo,
List *qualsAttributes);
@@ -303,22 +303,12 @@ static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri
*gphd
 	foreach(option, gphduri->options)
 	{
 		OptionData *data = (OptionData*)lfirst(option);
-		char *x_gp_key = prepend_x_gp(data->key);
+		char *x_gp_key = normalize_key_name(data->key);
 		churl_headers_append(headers, x_gp_key, data->value);
 		pfree(x_gp_key);
 	}
 }
 
-/* Full name of the HEADER KEY expected by the PXF service */
-static char* prepend_x_gp(const char* key)
-{	
-	StringInfoData formatter;
-	initStringInfo(&formatter);
-	appendStringInfo(&formatter, "X-GP-%s", key);
-	
-	return formatter.data;
-}
-
 /*
  * The function will add delegation token headers.
  * Each token information piece will be serialized as HEX string.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/backend/access/external/pxfmasterapi.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfmasterapi.c b/src/backend/access/external/pxfmasterapi.c
index 833218a..4ee90ff 100644
--- a/src/backend/access/external/pxfmasterapi.c
+++ b/src/backend/access/external/pxfmasterapi.c
@@ -358,6 +358,11 @@ parse_get_fragments_response(List *fragments, StringInfo rest_buf)
 		if (js_user_data)
 			fragment->user_data = pstrdup(json_object_get_string(js_user_data));
 
+		/* 5. profile - recommended profile to work with fragment */
+		struct json_object *js_profile = json_object_object_get(js_fragment, "profile");
+		if (js_profile)
+			fragment->profile = pstrdup(json_object_get_string(js_profile));
+
 		/*
 		 * HD-2547:
 		 * Ignore fragment if it doesn't contain any host locations,
@@ -400,6 +405,10 @@ void free_fragment(DataFragment *fragment)
 
 	if (fragment->user_data)
 		pfree(fragment->user_data);
+
+	if (fragment->profile)
+		pfree(fragment->profile);
+
 	pfree(fragment);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/backend/access/external/pxfuriparser.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfuriparser.c b/src/backend/access/external/pxfuriparser.c
index 9e83714..17d5803 100644
--- a/src/backend/access/external/pxfuriparser.c
+++ b/src/backend/access/external/pxfuriparser.c
@@ -33,7 +33,7 @@ static void  GPHDUri_parse_protocol(GPHDUri *uri, char **cursor);
 static void  GPHDUri_parse_authority(GPHDUri *uri, char **cursor);
 static void  GPHDUri_parse_data(GPHDUri *uri, char **cursor);
 static void  GPHDUri_parse_options(GPHDUri *uri, char **cursor);
-static List* GPHDUri_parse_option(char* pair, List* options, const char* uri);
+static List* GPHDUri_parse_option(char* pair, GPHDUri *uri);
 static void  GPHDUri_free_options(GPHDUri *uri);
 static void  GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str);
 static List* GPHDUri_parse_fragment(char* fragment, List* fragments);
@@ -41,6 +41,7 @@ static void  GPHDUri_free_fragments(GPHDUri *uri);
 static void  GPHDUri_debug_print_options(GPHDUri *uri);
 static void  GPHDUri_debug_print_segwork(GPHDUri *uri);
 static void  GPHDUri_fetch_authority_from_ha_nn(GPHDUri *uri, char *nameservice);
+char* normalize_key_name(const char* key);
 
 /* parseGPHDUri
  *
@@ -119,6 +120,8 @@ freeGPHDUri(GPHDUri *uri)
 	pfree(uri->host);
 	pfree(uri->port);
 	pfree(uri->data);
+	if (uri->profile)
+		pfree(uri->profile);
 
 	GPHDUri_free_options(uri);
 	if (uri->ha_nodes)
@@ -133,6 +136,8 @@ freeGPHDUriForMetadata(GPHDUri *uri)
 
 	pfree(uri->host);
 	pfree(uri->port);
+	if (uri->profile)
+		pfree(uri->profile);
 
 	pfree(uri);
 }
@@ -472,7 +477,7 @@ GPHDUri_parse_options(GPHDUri *uri, char **cursor)
 			pair;
 			pair = strtok_r(NULL, sep, &strtok_context))
 	{
-		uri->options = GPHDUri_parse_option(pair, uri->options, uri->uri);
+		uri->options = GPHDUri_parse_option(pair, uri);
 	}
 
 	pfree(dup);
@@ -484,7 +489,7 @@ GPHDUri_parse_options(GPHDUri *uri, char **cursor)
  * to OptionData object (key and value).
  */
 static List*
-GPHDUri_parse_option(char* pair, List* options, const char* uri)
+GPHDUri_parse_option(char* pair, GPHDUri *uri)
 {
 
 	char	*sep;
@@ -498,33 +503,40 @@ GPHDUri_parse_option(char* pair, List* options, const char* uri)
 	if (sep == NULL) {
 		ereport(ERROR,
 				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("Invalid URI %s: option '%s' missing '='", uri, pair)));
+				 errmsg("Invalid URI %s: option '%s' missing '='", uri->uri, pair)));
 	}
 
 	if (strchr(sep + 1, '=') != NULL) {
 		ereport(ERROR,
 				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("Invalid URI %s: option '%s' contains duplicate '='", uri, pair)));
+				 errmsg("Invalid URI %s: option '%s' contains duplicate '='", uri->uri, pair)));
 	}
 
 	key_len = sep - pair;
 	if (key_len == 0) {
 		ereport(ERROR,
 				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("Invalid URI %s: option '%s' missing key before '='", uri, pair)));
+				 errmsg("Invalid URI %s: option '%s' missing key before '='", uri->uri, pair)));
 	}
 	
 	value_len = pair_len - key_len + 1;
 	if (value_len == EMPTY_VALUE_LEN) {
 		ereport(ERROR,
 				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("Invalid URI %s: option '%s' missing value after '='", uri, pair)));
+				 errmsg("Invalid URI %s: option '%s' missing value after '='", uri->uri, pair)));
 	}
     
 	option_data->key = pnstrdup(pair,key_len);
 	option_data->value = pnstrdup(sep + 1, value_len);
 
-	return lappend(options, option_data);
+	char *x_gp_key = normalize_key_name(option_data->key);
+	if (strcmp(x_gp_key, "X-GP-PROFILE") == 0)
+	{
+		uri->profile = pstrdup(option_data->value);
+	}
+	pfree(x_gp_key);
+
+	return lappend(uri->options, option_data);
 }
 
 /*
@@ -562,6 +574,11 @@ GPHDUri_debug_print(GPHDUri *uri)
 		 uri->port,
 		 uri->data);
 
+	if (uri->profile)
+	{
+		elog(NOTICE, "Profile: %s", uri->profile);
+	}
+
 	GPHDUri_debug_print_options(uri);
 	GPHDUri_debug_print_segwork(uri);
 }
@@ -611,6 +628,8 @@ GPHDUri_debug_print_segwork(GPHDUri *uri)
 				 	 	 data->fragment_md ? data->fragment_md : "NULL");
 		if (data->user_data)
 			appendStringInfo(&fragment_data, ", user data : %s", data->user_data);
+		if (data->profile)
+			appendStringInfo(&fragment_data, ", profile : %s", data->profile);
 		elog(NOTICE, "%s", fragment_data.data);
 		++count;
 		resetStringInfo(&fragment_data);
@@ -671,66 +690,123 @@ GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str)
 static List*
 GPHDUri_parse_fragment(char* fragment, List* fragments)
 {
+	if (!fragment)
+	{
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.fragment
string is null.")));
+	}
 
 	char	*dup_frag = pstrdup(fragment);
 	char	*value_start;
 	char	*value_end;
-	bool	has_user_data = false;
 
-	StringInfoData formatter;
+	StringInfoData authority_formatter;
 	FragmentData* fragment_data;
 
 	fragment_data = palloc0(sizeof(FragmentData));
-	initStringInfo(&formatter);
+	initStringInfo(&authority_formatter);
 
 	value_start = dup_frag;
+
 	/* expect ip */
 	value_end = strchr(value_start, segwork_separator);
-	Assert(value_end != NULL);
+	if (value_end == NULL)
+	{
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
+	}
 	*value_end = '\0';
-	appendStringInfo(&formatter, "%s:", value_start);
+	appendStringInfo(&authority_formatter, "%s:", value_start);
 	value_start = value_end + 1;
+
 	/* expect port */
 	value_end = strchr(value_start, segwork_separator);
-	Assert(value_end != NULL);
+	if (value_end == NULL)
+	{
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
+	}
 	*value_end = '\0';
-	appendStringInfo(&formatter, "%s", value_start);
-	fragment_data->authority = formatter.data;
+	appendStringInfo(&authority_formatter, "%s", value_start);
+	fragment_data->authority = pstrdup(authority_formatter.data);
+	pfree(authority_formatter.data);
 	value_start = value_end + 1;
+
 	/* expect source name */
 	value_end = strchr(value_start, segwork_separator);
-	Assert(value_end != NULL);
+	if (value_end == NULL)
+	{
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
+	}
 	*value_end = '\0';
 	fragment_data->source_name = pstrdup(value_start);
 	value_start = value_end + 1;
+
 	/* expect index */
 	value_end = strchr(value_start, segwork_separator);
-	Assert(value_end != NULL);
+	if (value_end == NULL)
+	{
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
+	}
 	*value_end = '\0';
 	fragment_data->index = pstrdup(value_start);
 	value_start = value_end + 1;
+
 	/* expect fragment metadata */
 	Assert(value_start);
-
-	/* check for user data */
 	value_end = strchr(value_start, segwork_separator);
-	if (value_end != NULL)
+	if (value_end == NULL)
 	{
-		has_user_data = true;
-		*value_end = '\0';
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
 	}
+	*value_end = '\0';
 	fragment_data->fragment_md = pstrdup(value_start);
+	value_start = value_end + 1;
+
+	/* expect user data */
+	Assert(value_start);
+	value_end = strchr(value_start, segwork_separator);
+	if (value_end == NULL)
+	{
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
+	}
+	*value_end = '\0';
+	fragment_data->user_data = pstrdup(value_start);
+	value_start = value_end + 1;
 
-	/* read user data */
-	if (has_user_data)
+	/* expect for profile */
+	Assert(value_start);
+	value_end = strchr(value_start, segwork_separator);
+	if (value_end == NULL)
 	{
-		fragment_data->user_data = pstrdup(value_end + 1);
+		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.
Fragment string is invalid.")));
 	}
+	*value_end = '\0';
+	if (strlen(value_start) > 0)
+		fragment_data->profile = pstrdup(value_start);
 
 	return lappend(fragments, fragment_data);
 }
 
 /*
+ * Free fragment data
+ */
+static void
+GPHDUri_free_fragment(FragmentData *data)
+{
+	if (data->authority)
+		pfree(data->authority);
+	if (data->fragment_md)
+		pfree(data->fragment_md);
+	if (data->index)
+		pfree(data->index);
+	if (data->profile)
+		pfree(data->profile);
+	if (data->source_name)
+		pfree(data->source_name);
+	if (data->user_data)
+		pfree(data->user_data);
+	pfree(data);
+}
+
+/*
  * Free fragments list
  */
 static void 
@@ -741,10 +817,7 @@ GPHDUri_free_fragments(GPHDUri *uri)
 	foreach(fragment, uri->fragments)
 	{
 		FragmentData *data = (FragmentData*)lfirst(fragment);
-		pfree(data->authority);
-		pfree(data->index);
-		pfree(data->source_name);
-		pfree(data);
+		GPHDUri_free_fragment(data);
 	}
 	list_free(uri->fragments);
 	uri->fragments = NIL;
@@ -823,3 +896,26 @@ bool RelationIsExternalPxfReadOnly(Relation rel, StringInfo location)
 
 	return false;
 }
+
+/*
+ * Full name of the HEADER KEY expected by the PXF service
+ * Converts input string to upper case and prepends "X-GP-" string
+ *
+ */
+char* normalize_key_name(const char* key)
+{
+	if (!key || strlen(key) == 0)
+	{
+		ereport(ERROR,
+			(errcode(ERRCODE_INTERNAL_ERROR),
+			 errmsg("internal error in pxfheaders.c:normalize_key_name. Parameter key is null or empty.")));
+	}
+
+	StringInfoData formatter;
+	initStringInfo(&formatter);
+	char* upperCasedKey = str_toupper(pstrdup(key), strlen(key));
+	appendStringInfo(&formatter, "X-GP-%s", upperCasedKey);
+	pfree(upperCasedKey);
+
+	return formatter.data;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/backend/access/external/test/hd_work_mgr_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_test.c b/src/backend/access/external/test/hd_work_mgr_test.c
index c39b2ac..759b461 100644
--- a/src/backend/access/external/test/hd_work_mgr_test.c
+++ b/src/backend/access/external/test/hd_work_mgr_test.c
@@ -28,12 +28,124 @@
 #include "hd_work_mgr_allocate_fragments_to_datanodes_test.c"
 #include "hd_work_mgr_distribute_work_2_gp_segments_test.c"
 
+/*
+ * Serialize output string when all fields of AllocatedDataFragment are passed
+ */
+void
+test__make_allocation_output_string(void **state)
+{
+
+	List *segment_fragments = NIL;
+	AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+	frag->index = 42;
+	frag->host = "HOST";
+	frag->rest_port = 1312;
+	frag->source_name = "TABLE_NAME";
+	frag->fragment_md = "FRAGMENT_METADATA";
+	frag->user_data = "USER_DATA";
+	frag->profile = "PROFILE";
+
+	segment_fragments = lappend(segment_fragments, frag);
+	char *output_str = make_allocation_output_string(segment_fragments);
+
+	assert_string_equal(output_str, "segwork=60@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@USER_DATA@PROFILE@");
+
+	pfree(frag);
+	pfree(output_str);
+	list_free(segment_fragments);
+}
+
+/*
+ * Serialize output string when profile is empty
+ */
+void
+test__make_allocation_output_string__empty_profile(void **state)
+{
+
+	List *segment_fragments = NIL;
+	AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+	frag->index = 42;
+	frag->host = "HOST";
+	frag->rest_port = 1312;
+	frag->source_name = "TABLE_NAME";
+	frag->fragment_md = "FRAGMENT_METADATA";
+	frag->user_data = "USER_DATA";
+
+	segment_fragments = lappend(segment_fragments, frag);
+	char *output_str = make_allocation_output_string(segment_fragments);
+
+	assert_string_equal(output_str, "segwork=53@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@USER_DATA@@");
+
+	pfree(frag);
+	pfree(output_str);
+	list_free(segment_fragments);
+}
+
+/*
+ * Serialize output string when user data is empty
+ */
+void
+test__make_allocation_output_string__empty_user_data(void **state)
+{
+
+	List *segment_fragments = NIL;
+	AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+	frag->index = 42;
+	frag->host = "HOST";
+	frag->rest_port = 1312;
+	frag->source_name = "TABLE_NAME";
+	frag->fragment_md = "FRAGMENT_METADATA";
+	frag->profile = "PROFILE";
+
+	segment_fragments = lappend(segment_fragments, frag);
+	char *output_str = make_allocation_output_string(segment_fragments);
+
+	assert_string_equal(output_str, "segwork=51@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@@PROFILE@");
+
+	pfree(frag);
+	pfree(output_str);
+	list_free(segment_fragments);
+}
+
+/*
+ * Serialize output string when profile and user data are empty
+ */
+void
+test__make_allocation_output_string__empty_user_data_profile(void **state)
+{
+
+	List *segment_fragments = NIL;
+	AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+	frag->index = 42;
+	frag->host = "HOST";
+	frag->rest_port = 1312;
+	frag->source_name = "TABLE_NAME";
+	frag->fragment_md = "FRAGMENT_METADATA";
+
+	segment_fragments = lappend(segment_fragments, frag);
+	char *output_str = make_allocation_output_string(segment_fragments);
+
+	assert_string_equal(output_str, "segwork=44@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@@@");
+
+	pfree(frag);
+	pfree(output_str);
+	list_free(segment_fragments);
+}
+
 int 
 main(int argc, char* argv[]) 
 {
 	cmockery_parse_arguments(argc, argv);
 
 	const UnitTest tests[] = {
+			unit_test(test__make_allocation_output_string),
+			unit_test(test__make_allocation_output_string__empty_profile),
+			unit_test(test__make_allocation_output_string__empty_user_data),
+			unit_test(test__make_allocation_output_string__empty_user_data_profile),
 			unit_test(test__do_segment_clustering_by_host__10SegmentsOn3Hosts),
 			unit_test(test__get_dn_processing_load),
 			unit_test(test__create_allocated_fragment__NoUserData),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/backend/access/external/test/pxfuriparser_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/pxfuriparser_test.c b/src/backend/access/external/test/pxfuriparser_test.c
index b97cef5..1912d2b 100644
--- a/src/backend/access/external/test/pxfuriparser_test.c
+++ b/src/backend/access/external/test/pxfuriparser_test.c
@@ -72,6 +72,7 @@ test__parseGPHDUri__ValidURI(void **state)
 	assert_string_equal(option->value, "SomeAnalyzer");
 
 	assert_true(parsed->fragments == NULL);
+	assert_true(parsed->profile == NULL);
 
 	freeGPHDUri(parsed);
 }
@@ -180,6 +181,7 @@ test__parseGPHDUri__NegativeTestNoProtocol(void **state)
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf:/1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -210,6 +212,7 @@ test__parseGPHDUri__NegativeTestNoOptions(void **state)
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl:
missing options section");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -240,6 +243,7 @@ test__parseGPHDUri__NegativeTestMissingEqual(void **state)
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER:
option 'FRAGMENTER' missing '='");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -270,6 +274,7 @@ test__parseGPHDUri__NegativeTestDuplicateEquals(void **state)
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter:
option 'FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter' contains duplicate '='");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -300,6 +305,7 @@ test__parseGPHDUri__NegativeTestMissingKey(void **state)
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?=HdfsDataFragmenter:
option '=HdfsDataFragmenter' missing key before '='");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -330,6 +336,7 @@ test__parseGPHDUri__NegativeTestMissingValue(void **state)
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=:
option 'FRAGMENTER=' missing value after '='");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -348,6 +355,7 @@ test__GPHDUri_verify_no_duplicate_options__ValidURI(void **state)
 	/* Setting the test -- code omitted -- */
 	GPHDUri* parsed = parseGPHDUri(valid_uri);
 	GPHDUri_verify_no_duplicate_options(parsed);
+	assert_string_equal(parsed->profile, "a");
 	freeGPHDUri(parsed);
 }
 
@@ -375,6 +383,7 @@ test__GPHDUri_verify_no_duplicate_options__NegativeTestDuplicateOpts(void
**stat
 		assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?Profile=a&Analyzer=b&PROFILE=c:
Duplicate option(s): PROFILE");
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -424,6 +433,7 @@ test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts(void
**stat
 		assert_true(edata->elevel == ERROR);
 		assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=a:
PROFILE or ACCESSOR and RESOLVER option(s) missing");
 		list_free(coreOptions);
+		elog_dismiss(INFO);
 		return;
 	}
 	PG_END_TRY();
@@ -431,9 +441,541 @@ test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts(void
**stat
 	assert_true(false);
 }
 
+/*
+ * Test GPHDUri_parse_fragment when fragment string is valid and all parameters are passed
+ */
+void
+test__GPHDUri_parse_fragment__ValidFragment(void **state) {
+
+	char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_DATA@PROFILE@";
+
+	List *fragments = NIL;
+
+	fragments = GPHDUri_parse_fragment(fragment, fragments);
+
+	ListCell *fragment_cell = list_head(fragments);
+	FragmentData *fragment_data = (FragmentData*) lfirst(fragment_cell);
+
+	assert_string_equal(fragment_data->authority, "HOST:REST_PORT");
+	assert_string_equal(fragment_data->fragment_md, "FRAGMENT_METADATA");
+	assert_string_equal(fragment_data->index, "INDEX");
+	assert_string_equal(fragment_data->profile, "PROFILE");
+	assert_string_equal(fragment_data->source_name, "TABLE_NAME");
+	assert_string_equal(fragment_data->user_data, "USER_DATA");
+
+	GPHDUri_free_fragment(fragment_data);
+	list_free(fragments);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string doesn't have profile
+ */
+void
+test__GPHDUri_parse_fragment__EmptyProfile(void **state) {
+	char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_DATA@@";
+
+	List *fragments = NIL;
+
+	fragments = GPHDUri_parse_fragment(fragment, fragments);
+
+	ListCell *fragment_cell = list_head(fragments);
+	FragmentData *fragment_data = (FragmentData*) lfirst(fragment_cell);
+
+	assert_string_equal(fragment_data->authority, "HOST:REST_PORT");
+	assert_string_equal(fragment_data->fragment_md, "FRAGMENT_METADATA");
+	assert_string_equal(fragment_data->index, "INDEX");
+	assert_true(!fragment_data->profile);
+	assert_string_equal(fragment_data->source_name, "TABLE_NAME");
+	assert_string_equal(fragment_data->user_data, "USER_DATA");
+
+	GPHDUri_free_fragment(fragment_data);
+	list_free(fragments);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string is null
+ */
+void
+test__GPHDUri_parse_fragment__NullFragment(void **state) {
+
+	char* fragment = NULL;
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment.fragment
string is null.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string is empty
+ */
+void
+test__GPHDUri_parse_fragment__EmptyString(void **state) {
+
+	char* fragment = "";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingIpHost(void **state) {
+
+	char* fragment = "@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingPort(void **state) {
+
+	char* fragment = "@HOST@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingSourceName(void **state) {
+
+	char* fragment = "@HOST@PORT@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingIndex(void **state) {
+
+
+	char* fragment = "@HOST@PORT@SOURCE_NAME@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingFragmentMetadata(void **state) {
+
+	char* fragment = "@HOST@PORT@SOURCE_NAME@42@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingUserData(void **state) {
+
+	char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingProfile(void **state) {
+
+	char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_METADATA@";
+
+	List *fragments = NIL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment
string is invalid.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		fragments = GPHDUri_parse_fragment(fragment, fragments);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		list_free(fragments);
+		pfree(err_msg->data);
+		pfree(err_msg);
+		elog_dismiss(INFO);
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+
+void
+test__normalize_key_name_Positive(void **state)
+{
+	char *input_key = strdup("mIxEdCaSeVaLuE");
+	char *normalized_key = normalize_key_name(input_key);
+	assert_string_equal(normalized_key, "X-GP-MIXEDCASEVALUE");
+
+	pfree(input_key);
+	pfree(normalized_key);
+}
+
+void
+test__normalize_key_name_PositiveUpperCase(void **state)
+{
+	char *input_key = strdup("ALREADY_UPPER_CASE");
+	char *normalized_key = normalize_key_name(input_key);
+	assert_string_equal(normalized_key, "X-GP-ALREADY_UPPER_CASE");
+
+	pfree(input_key);
+	pfree(normalized_key);
+}
+
+void
+test__normalize_key_name_Negative__key_is_null(void **state)
+{
+	char *input_key = NULL;
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfheaders.c:normalize_key_name. Parameter
key is null or empty.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		char *normalized_key = normalize_key_name(input_key);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		pfree(err_msg->data);
+		pfree(err_msg);
+
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
+void
+test__normalize_key_name_Negative__key_is_empty(void **state)
+{
+	char *input_key = "";
+
+	StringInfo err_msg = makeStringInfo();
+	appendStringInfo(err_msg, "internal error in pxfheaders.c:normalize_key_name. Parameter
key is null or empty.");
+
+	/* Expect error */
+	PG_TRY();
+	{
+		/* This will throw a ereport(ERROR).*/
+		char *normalized_key = normalize_key_name(input_key);
+	}
+	PG_CATCH();
+	{
+		CurrentMemoryContext = 1;
+		ErrorData *edata = CopyErrorData();
+
+		/* Validate the type of expected error */
+		assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+		assert_true(edata->elevel == ERROR);
+		assert_string_equal(edata->message, err_msg->data);
+
+		pfree(err_msg->data);
+		pfree(err_msg);
+
+		return;
+	}
+	PG_END_TRY();
+
+	/* should not reach here*/
+	assert_true(false);
+
+}
+
 int 
 main(int argc, char* argv[]) 
 {
+
 	cmockery_parse_arguments(argc, argv);
 
 	const UnitTest tests[] = {
@@ -449,7 +991,21 @@ main(int argc, char* argv[])
 			unit_test(test__GPHDUri_verify_no_duplicate_options__ValidURI),
 			unit_test(test__GPHDUri_verify_no_duplicate_options__NegativeTestDuplicateOpts),
 			unit_test(test__GPHDUri_verify_core_options_exist__ValidURI),
-			unit_test(test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts)
+			unit_test(test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts),
+			unit_test(test__GPHDUri_parse_fragment__EmptyProfile),
+			unit_test(test__GPHDUri_parse_fragment__ValidFragment),
+			unit_test(test__GPHDUri_parse_fragment__EmptyString),
+			unit_test(test__GPHDUri_parse_fragment__MissingIpHost),
+			unit_test(test__GPHDUri_parse_fragment__MissingPort),
+			unit_test(test__GPHDUri_parse_fragment__MissingSourceName),
+			unit_test(test__GPHDUri_parse_fragment__MissingIndex),
+			unit_test(test__GPHDUri_parse_fragment__MissingFragmentMetadata),
+			unit_test(test__GPHDUri_parse_fragment__MissingUserData),
+			unit_test(test__GPHDUri_parse_fragment__MissingProfile),
+			unit_test(test__normalize_key_name_Positive),
+			unit_test(test__normalize_key_name_PositiveUpperCase),
+			unit_test(test__normalize_key_name_Negative__key_is_null),
+			unit_test(test__normalize_key_name_Negative__key_is_empty)
 	};
 	return run_tests(tests);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 2751b1b..b524df8 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -226,6 +226,17 @@ void set_current_fragment_headers(gphadoop_context* context)
 		churl_headers_remove(context->churl_headers, "X-GP-FRAGMENT-USER-DATA", true);
 	}
 
+	/* if current fragment has optimal profile set it*/
+	if (frag_data->profile)
+	{
+		churl_headers_override(context->churl_headers, "X-GP-PROFILE", frag_data->profile);
+	} else if (context->gphd_uri->profile)
+	{
+		/* if current fragment doesn't have any optimal profile, set to use profile from url */
+		churl_headers_override(context->churl_headers, "X-GP-PROFILE", context->gphd_uri->profile);
+	}
+	/* if there is no profile passed in url, we expect to have accessor+fragmenter+resolver
so no action needed by this point */
+
 }
 
 void gpbridge_import_start(PG_FUNCTION_ARGS)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/include/access/pxfmasterapi.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfmasterapi.h b/src/include/access/pxfmasterapi.h
index 4b9ecd9..b487b41 100644
--- a/src/include/access/pxfmasterapi.h
+++ b/src/include/access/pxfmasterapi.h
@@ -56,6 +56,7 @@ typedef struct sDataFragment
 	List *replicas;
 	char *fragment_md; /* fragment meta data (start, end, length, etc.) */
 	char *user_data;
+	char *profile;
 } DataFragment;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fbef55d4/src/include/access/pxfuriparser.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfuriparser.h b/src/include/access/pxfuriparser.h
index d050325..ac614c5 100644
--- a/src/include/access/pxfuriparser.h
+++ b/src/include/access/pxfuriparser.h
@@ -46,6 +46,7 @@ typedef struct FragmentData
 	char	 *source_name;
 	char	 *fragment_md;
 	char	 *user_data;
+	char	 *profile;
 } FragmentData;
 
 typedef struct OptionData
@@ -66,6 +67,7 @@ typedef struct GPHDUri
 	char			*host;		/* host name str			*/
 	char			*port;		/* port number as string	*/
 	char			*data;      /* data location (path)     */
+	char			*profile;   /* profile option			*/
 	List			*fragments; /* list of FragmentData		*/
 
 	/* options */
@@ -88,5 +90,6 @@ int		 GPHDUri_get_value_for_opt(GPHDUri *uri, char *key, char **val, bool
emit_e
 bool 	 RelationIsExternalPxfReadOnly(Relation rel, StringInfo location);
 void 	 GPHDUri_verify_no_duplicate_options(GPHDUri *uri);
 void 	 GPHDUri_verify_core_options_exist(GPHDUri *uri, List *coreOptions);
+char* normalize_key_name(const char* key);
 
 #endif	// _PXF_URIPARSER_H_



Mime
View raw message