hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nh...@apache.org
Subject incubator-hawq git commit: HAWQ-257. Use user defined port in PXF queries.
Date Tue, 22 Dec 2015 02:54:30 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master bfe15a053 -> ec8ec9309


HAWQ-257. Use user defined port in PXF queries.

pxf_service_port should only be used in HA and Isilon cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ec8ec930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ec8ec930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ec8ec930

Branch: refs/heads/master
Commit: ec8ec930988d8939eb520fcb0cc359adf47e75d0
Parents: bfe15a0
Author: Noa Horn <nhorn@pivotal.io>
Authored: Mon Dec 21 18:42:54 2015 -0800
Committer: Noa Horn <nhorn@pivotal.io>
Committed: Mon Dec 21 18:42:54 2015 -0800

----------------------------------------------------------------------
 src/backend/access/external/hd_work_mgr.c       |  9 +-
 src/backend/access/external/pxfuriparser.c      | 12 ++-
 .../access/external/test/pxfuriparser_test.c    | 86 +++++++++++++++++++-
 src/backend/utils/misc/guc.c                    | 10 ---
 src/bin/gpfusion/gpbridgeapi.c                  |  8 +-
 5 files changed, 104 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ec8ec930/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index 6aa0736..b5bf6b5 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -166,7 +166,8 @@ char** map_hddata_2gp_segments(char* uri, int total_segs, int working_segs,
Rela
 	 * 2. Get the fragments data from the PXF service
 	 */
 	data_fragments = get_data_fragment_list(hadoop_uri, &client_context);
-	assign_pxf_port_to_fragments(pxf_service_port, data_fragments);
+
+	assign_pxf_port_to_fragments(atoi(hadoop_uri->port), data_fragments);
 
 	/* debug - enable when tracing */
 	print_fragment_list(data_fragments);
@@ -174,12 +175,12 @@ char** map_hddata_2gp_segments(char* uri, int total_segs, int working_segs,
Rela
 	/*
 	 * 3. Finally, call the actual work allocation algorithm
 	 */
-	  segs_data = distribute_work_2_gp_segments(data_fragments, total_segs, working_segs);
+	segs_data = distribute_work_2_gp_segments(data_fragments, total_segs, working_segs);
 
 	/*
 	 * 4. For each segment transform the list of allocated fragments into an output string
 	 */
-	  segs_work_map = create_output_strings(segs_data, total_segs);
+	segs_work_map = create_output_strings(segs_data, total_segs);
 
 	/*
 	 * 5. Release memory
@@ -280,7 +281,7 @@ static GPHDUri* init(char* uri, ClientContext* cl_context)
 	 * 1. Cherrypick the data relevant for HADOOP from the input uri
 	 */
 	GPHDUri* hadoop_uri = parseGPHDUri(uri);
-	
+
 	/*
 	 * 2. Communication with the Hadoop back-end
 	 *    Initialize churl client context and header

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ec8ec930/src/backend/access/external/pxfuriparser.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfuriparser.c b/src/backend/access/external/pxfuriparser.c
index 47cfe82..0ce78bd 100644
--- a/src/backend/access/external/pxfuriparser.c
+++ b/src/backend/access/external/pxfuriparser.c
@@ -81,8 +81,6 @@ parseGPHDUri(const char *uri_str)
 	GPHDUri_parse_data(uri, &cursor);
 	GPHDUri_parse_options(uri, &cursor);
 
-	port_to_str(&(uri->port), pxf_service_port);
-
 	return uri;
 }
 
@@ -363,7 +361,15 @@ GPHDUri_parse_authority(GPHDUri *uri, char **cursor)
 		ereport(ERROR,
 				(errcode(ERRCODE_SYNTAX_ERROR),
 				 errmsg("Invalid port: %s for authority host %s",
-						uri->port, uri->host)));	
+						uri->port, uri->host)));
+
+	/* if pxf_isilon is true, ignore the port in the uri
+	 * and use pxf_service_port instead to access PXF.
+	 */
+	if (pxf_isilon)
+	{
+		sprintf(uri->port, "%d", pxf_service_port);
+	}
 }
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ec8ec930/src/backend/access/external/test/pxfuriparser_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/pxfuriparser_test.c b/src/backend/access/external/test/pxfuriparser_test.c
index cd63725..b97cef5 100644
--- a/src/backend/access/external/test/pxfuriparser_test.c
+++ b/src/backend/access/external/test/pxfuriparser_test.c
@@ -44,7 +44,8 @@ test__parseGPHDUri__ValidURI(void **state)
 
 	assert_string_equal(parsed->protocol, "pxf");
 	assert_string_equal(parsed->host, "1.2.3.4");
-	assert_string_not_equal(parsed->port, "5678"); /* it should be pxf_service_port */
+	assert_string_equal(parsed->port, "5678");
+	assert_true(parsed->ha_nodes == NULL);
 	assert_string_equal(parsed->data, "some/path/and/table.tbl");
 
 	options = parsed->options;
@@ -76,6 +77,87 @@ test__parseGPHDUri__ValidURI(void **state)
 }
 
 /*
+ * Test parsing of valid uri with with nameservice instead of host and port
+ * as given in LOCATION in a PXF external table.
+ */
+void
+test__parseGPHDUri__ValidURI_HA(void **state)
+{
+	char* uri = "pxf://hanameservice/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer";
+	List* options = NIL;
+	ListCell* cell = NULL;
+	OptionData* option = NULL;
+
+	/* mock GPHD_HA_load_nodes */
+	NNHAConf* ha_conf = (NNHAConf *)palloc0(sizeof(NNHAConf));
+	ha_conf->nameservice = "hanameservice";
+	ha_conf->numn = 2;
+	ha_conf->nodes = ((char**)palloc0(sizeof(char*) * 2));
+	ha_conf->nodes[0] = "node1";
+	ha_conf->restports = ((char**)palloc0(sizeof(char*) * 2));
+	ha_conf->restports[0] = "1001";
+	expect_string(GPHD_HA_load_nodes, nameservice, "hanameservice");
+	will_return(GPHD_HA_load_nodes, ha_conf);
+
+	/* mock GPHD_HA_release_nodes */
+	expect_value(GPHD_HA_release_nodes, conf, ha_conf);
+	will_be_called(GPHD_HA_release_nodes);
+
+	GPHDUri* parsed = parseGPHDUri(uri);
+
+	assert_true(parsed != NULL);
+	assert_string_equal(parsed->uri, uri);
+
+	assert_string_equal(parsed->protocol, "pxf");
+	assert_string_equal(parsed->host, "node1"); /* value should be taken from ha_nodes */
+	assert_string_equal(parsed->port, "1001"); /* it should be taken from ha_nodes */
+	assert_false(parsed->ha_nodes == NULL);
+	assert_string_equal(parsed->data, "some/path/and/table.tbl");
+
+	freeGPHDUri(parsed);
+
+	/* free NNHAConf */
+	if (ha_conf)
+	{
+		pfree(ha_conf->nodes);
+		pfree(ha_conf->restports);
+		pfree(ha_conf);
+	}
+}
+
+/*
+ * Test parsing of valid uri as given in LOCATION in a PXF external table,
+ * with pxf_isilon set to true.
+ */
+void
+test__parseGPHDUri__ValidURI_Isilon(void **state)
+{
+	char* uri = "pxf://servername:5000/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer";
+	List* options = NIL;
+	ListCell* cell = NULL;
+	OptionData* option = NULL;
+
+	/* set pxf_isilon to true */
+	pxf_isilon = true;
+
+	GPHDUri* parsed = parseGPHDUri(uri);
+
+	assert_true(parsed != NULL);
+	assert_string_equal(parsed->uri, uri);
+
+	assert_string_equal(parsed->protocol, "pxf");
+	assert_string_equal(parsed->host, "servername");
+	assert_int_equal(atoi(parsed->port), pxf_service_port); /* it should be pxf_service_port
*/
+	assert_true(parsed->ha_nodes == NULL);
+	assert_string_equal(parsed->data, "some/path/and/table.tbl");
+
+	freeGPHDUri(parsed);
+
+	/* set pxf_isilon back to false */
+	pxf_isilon = false;
+}
+
+/*
  * Negative test: parsing of uri without protocol delimiter "://"
  */
 void
@@ -356,6 +438,8 @@ main(int argc, char* argv[])
 
 	const UnitTest tests[] = {
 			unit_test(test__parseGPHDUri__ValidURI),
+			unit_test(test__parseGPHDUri__ValidURI_HA),
+			unit_test(test__parseGPHDUri__ValidURI_Isilon),
 			unit_test(test__parseGPHDUri__NegativeTestNoProtocol),
 			unit_test(test__parseGPHDUri__NegativeTestNoOptions),
 			unit_test(test__parseGPHDUri__NegativeTestMissingEqual),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ec8ec930/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4897b93..462437a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -6178,16 +6178,6 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
-		{"pxf_service_port", PGC_POSTMASTER, EXTERNAL_TABLES,
-			gettext_noop("PXF service port"),
-			NULL,
-			GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
-		},
-		&pxf_service_port,
-		51200, 1, 65535, NULL, NULL
-	},
-
-	{
 		{"hawq_master_address_port", PGC_POSTMASTER, PRESET_OPTIONS,
 			gettext_noop("master server address port number"),
 			NULL

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ec8ec930/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 126d4e8..84b232a 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -268,7 +268,7 @@ static void init_client_context(ClientContext *client_context)
 /*
  * get list of data nodes' rest servers,
  * and choose one (based on modulo segment id).
- * if pxf_isilon is true, there are no PXF instances on the datanodes .
+ * if pxf_isilon is true, there are no PXF instances on the datanodes.
  * TODO: add locality
  */
 PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
@@ -291,7 +291,9 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 	/* set HTTP header that guarantees response in JSON format */
 	churl_headers_append(client_context.http_headers, REST_HEADER_JSON_RESPONSE, NULL);
 	if (!client_context.http_headers)
+	{
 		return NULL;
+	}
 
 	/*
 	 * Enrich the curl HTTP header
@@ -303,7 +305,7 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 	add_delegation_token(&inputData);
 	build_http_header(&inputData);
 
-	int port = pxf_service_port;
+	int port = atoi(gphd_uri->port);
 
 	if (!pxf_isilon)
 	{
@@ -342,7 +344,7 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 	}
 	else /* Isilon */
 	{
-		ret_server->host = pstrdup("localhost");
+		ret_server->host = pstrdup("localhost"); /* TODO: should it always be localhost? */
 		ret_server->port = port;
 		elog(DEBUG2, "get_pxf_server: writing data to an Isilon target storage system");
 	}


Mime
View raw message