hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nh...@apache.org
Subject incubator-hawq git commit: HAWQ-317. Get delegation token for PXF from dfs_url
Date Fri, 26 Feb 2016 20:40:57 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master b8b9a0e84 -> 495bb2a2b


HAWQ-317. Get delegation token for PXF from dfs_url


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/495bb2a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/495bb2a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/495bb2a2

Branch: refs/heads/master
Commit: 495bb2a2b264da533be237d312507d5e97faec53
Parents: b8b9a0e
Author: Noa Horn <nhorn@apache.org>
Authored: Fri Feb 26 12:40:37 2016 -0800
Committer: Noa Horn <nhorn@apache.org>
Committed: Fri Feb 26 12:40:37 2016 -0800

----------------------------------------------------------------------
 .../access/appendonly/appendonlywriter.c        |  2 +-
 src/backend/access/external/hd_work_mgr.c       | 25 +++++-------
 src/backend/access/external/pxfutils.c          | 23 +++++++++++
 src/backend/cdb/cdbquerycontextdispatching.c    | 31 +++++++--------
 src/bin/gpfusion/gpbridgeapi.c                  | 42 ++++++++------------
 src/include/access/pxfutils.h                   |  3 ++
 6 files changed, 68 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/495bb2a2/src/backend/access/appendonly/appendonlywriter.c
----------------------------------------------------------------------
diff --git a/src/backend/access/appendonly/appendonlywriter.c b/src/backend/access/appendonly/appendonlywriter.c
index 834d310..40d097c 100644
--- a/src/backend/access/appendonly/appendonlywriter.c
+++ b/src/backend/access/appendonly/appendonlywriter.c
@@ -115,7 +115,7 @@ struct TspSupportTruncate
 static struct TspSupportTruncate TspSupportTruncateMap = {NULL, NULL, 0, 0};
 
 
-    static AORelHelp
+static AORelHelp
 GetAOHashTableHelpEntry(Oid relid)
 {
     int             i;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/495bb2a2/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index c7c3e20..923bc81 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -900,36 +900,31 @@ static void init_client_context(ClientContext *client_context)
  * is created. We cannot use that token because hd_work_mgr.c code gets 
  * executed before a portal is created.
  *
- * The function uses a hdfs uri in the form of hdfs://host:port/
- * where port is 8020. In the case of HA the function uses the form
- * hdfs://nameservice/
- *
- * TODO 8020 is hard-coded. Must find the NameNode port somehow.
+ * The function uses a hdfs uri in the form of hdfs://host:port/path
+ * or hdfs://nameservice/path.
+ * This value is taken from pg_filespace_entry which is populated
+ * based on hawq-site.xml's hawq_dfs_url entry.
  */
 static void generate_delegation_token(PxfInputData *inputData)
 {
-	StringInfoData hdfs_uri;
+	char* dfs_address = NULL;
 
 	if (!enable_secure_filesystem)
 		return;
 
-	initStringInfo(&hdfs_uri);
-    if (inputData->gphduri->ha_nodes)
-        appendStringInfo(&hdfs_uri, "hdfs://%s/", inputData->gphduri->ha_nodes->nameservice);
-    else
-        appendStringInfo(&hdfs_uri, "hdfs://%s:8020/", inputData->gphduri->host);
+	get_hdfs_location_from_filespace(&dfs_address);
 
-    elog(DEBUG2, "about to acquire delegation token for %s", hdfs_uri.data);
+    elog(DEBUG2, "about to acquire delegation token for %s", dfs_address);
 
 	inputData->token = palloc0(sizeof(PxfHdfsTokenData));
 
-	inputData->token->hdfs_token = HdfsGetDelegationToken(hdfs_uri.data, 
+	inputData->token->hdfs_token = HdfsGetDelegationToken(dfs_address,
 														  &inputData->token->hdfs_handle);
 
 	if (inputData->token->hdfs_token == NULL)
-		elog(ERROR, "Failed to acquire a delegation token for uri %s", hdfs_uri.data);
+		elog(ERROR, "Failed to acquire a delegation token for uri %s", dfs_address);
 
-	pfree(hdfs_uri.data);
+	pfree(dfs_address);
 }
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/495bb2a2/src/backend/access/external/pxfutils.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfutils.c b/src/backend/access/external/pxfutils.c
index 25e806e..6d19ce6 100644
--- a/src/backend/access/external/pxfutils.c
+++ b/src/backend/access/external/pxfutils.c
@@ -18,6 +18,11 @@
  */
 
 #include "access/pxfutils.h"
+
+#include "catalog/catalog.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/dbcommands.h"
+#include "miscadmin.h"
 #include "utils/builtins.h"
 
 /* Wrapper for libchurl */
@@ -47,6 +52,24 @@ void port_to_str(char **port, int new_port)
 }
 
 /*
+ * get_hdfs_location_from_filespace
+ *
+ * Get hdfs location from pg_filespace_entry
+ * The returned path needs to be pfreed by the caller.
+ */
+void get_hdfs_location_from_filespace(char** path)
+{
+	Assert(NULL != path);
+	Oid dtsoid = get_database_dts(MyDatabaseId);
+	GetFilespacePathForTablespace(dtsoid, path);
+
+	Assert(NULL != *path);
+	Assert(strlen(*path) < FilespaceLocationBlankPaddedWithNullTermLen);
+
+	elog(DEBUG2, "found hdfs location is %s", *path);
+}
+
+/*
  * call_rest
  *
  * Creates the REST message and sends it to the PXF service located on

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/495bb2a2/src/backend/cdb/cdbquerycontextdispatching.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c
index ca4d6b2..52f85f1 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -2869,34 +2869,31 @@ static char* GetExtTableFirstLocation(Datum *array)
 }
 
 /* 
- * Using host from uri, dispatch HDFS credentials to 
+ * Using HAWQ's HDFS location, dispatch HDFS credentials to
  * segments.
  *
- * The function uses a hdfs uri in the form hdfs://host:port/ where
- * port is hard-coded 8020. For HA the function uses hdfs://nameservice/
+ * The function uses a hdfs uri in the form hdfs://host:port/path
+ * or hdfs://nameservice/path where
+ * this value is taken from pg_filespace_entry which is populated
+ * based on hawq-site.xml's hawq_dfs_url entry.
  *
  * prepareDispatchedCatalogFileSystemCredential will store the token
- * using port == 0 in HA case (otherwise the supplied port)
- *
- * TODO Get HDFS port from someplace else, currently hard coded
+ * using port == 0 in HA case (otherwise the supplied port).
  */
 static void AddFileSystemCredentialForPxfTable(char *uri)
 {
-	StringInfoData hdfs_uri;
+	char* dfs_address = NULL;
 
-	initStringInfo(&hdfs_uri);
-	GPHDUri *gphd_uri = parseGPHDUri(uri);
-    if (gphd_uri->ha_nodes)
-        appendStringInfo(&hdfs_uri, "hdfs://%s/", gphd_uri->ha_nodes->nameservice);
-    else
-        appendStringInfo(&hdfs_uri, "hdfs://%s:8020/", gphd_uri->host);
+	if (!enable_secure_filesystem)
+		return;
+
+	get_hdfs_location_from_filespace(&dfs_address);
 
-    elog(DEBUG2, "about to acquire delegation token for %s", hdfs_uri.data);
+    elog(DEBUG2, "about to acquire delegation token for %s", dfs_address);
 
-	prepareDispatchedCatalogFileSystemCredential(hdfs_uri.data);
+	prepareDispatchedCatalogFileSystemCredential(dfs_address);
 
-	freeGPHDUri(gphd_uri);
-	pfree(hdfs_uri.data);
+	pfree(dfs_address);
 }
 
 List *

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/495bb2a2/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index f4a0656..5cdb624 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -473,7 +473,6 @@ void build_uri_for_write(gphadoop_context* context, PxfServer* rest_server
)
 					 rest_server->host, rest_server->port,
 					 PXF_SERVICE_PREFIX, PXF_VERSION, context->write_file_name.data);
 	elog(DEBUG2, "pxf: uri %s with file name for write: %s", context->uri.data, context->write_file_name.data);
-
 }
 
 size_t fill_buffer(gphadoop_context* context, char* start, size_t size)
@@ -496,46 +495,39 @@ size_t fill_buffer(gphadoop_context* context, char* start, size_t size)
 
 /*
  * The function will get the cached delegation token
- * for remote host and add it to inputData
- * 
- * In HA case, where only a nameservice is used, port has to 
- * be zero (stored that way by prepareDispatchedCatalogFileSystemCredential)
- * see HdfsGetConnection()@fd.c
- * 
- * TODO Get "hdfs" and port from someplace else.
+ * for remote host and add it to inputData.
+ *
+ * The function uses a hdfs uri in the form of hdfs://host:port/path
+ * or hdfs://nameservice/path.
+ * This value is taken from pg_filespace_entry which is populated
+ * based on hawq-site.xml's hawq_dfs_url entry.
+ *
+ * Both regular and HA cases are handled the same way,
+ * where a nameservice is parsed by HdfsParsePath()@fd.c
  */
 void add_delegation_token(PxfInputData *inputData)
 {
 	PxfHdfsTokenData *token = NULL;
-    int port;
-    char *host = NULL;
+	char* dfs_address = NULL;
 
 	if (!enable_secure_filesystem)
 		return;
 
 	token = palloc0(sizeof(PxfHdfsTokenData));
 
-    if (inputData->gphduri->ha_nodes)
-    {
-        host = inputData->gphduri->ha_nodes->nameservice;
-        port = 0 /* port used when using a nameservice */;
-    } else
-    {
-        host = inputData->gphduri->host;
-        port = 8020;
-    }
+	get_hdfs_location_from_filespace(&dfs_address);
 
-    elog(DEBUG2, "locating token for %s:%d", host, port);
+    elog(DEBUG2, "locating token for %s", dfs_address);
 
-	token->hdfs_token = find_filesystem_credential("hdfs", 
-												   host,
-												   port);
+	token->hdfs_token = find_filesystem_credential_with_uri(dfs_address);
 
 	if (token->hdfs_token == NULL)
-		elog(ERROR, "failed to find delegation token for %s", inputData->gphduri->host);
-	elog(DEBUG2, "Delegation token for %s found", inputData->gphduri->host);
+		elog(ERROR, "failed to find delegation token for %s", dfs_address);
+	elog(DEBUG2, "Delegation token for %s found", dfs_address);
 
 	inputData->token = token;
+
+	pfree(dfs_address);
 }
 
 void free_token_resources(PxfInputData *inputData)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/495bb2a2/src/include/access/pxfutils.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfutils.h b/src/include/access/pxfutils.h
index ae21c51..d6bc016 100644
--- a/src/include/access/pxfutils.h
+++ b/src/include/access/pxfutils.h
@@ -39,6 +39,9 @@ bool are_ips_equal(char *ip1, char *ip2);
 /* override port str with given new port int */
 void port_to_str(char** port, int new_port);
 
+/* get hdfs location from current session's filespace entry */
+void get_hdfs_location_from_filespace(char** path);
+
 /* Parse the REST message and issue the libchurl call */
 void call_rest(GPHDUri *hadoop_uri, ClientContext *client_context, char* rest_msg);
 


Mime
View raw message