hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject incubator-hawq git commit: HAWQ-1498. Segments keep open file descriptors for deleted files
Date Fri, 11 Aug 2017 12:15:58 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master c0cd47c3d -> 31aeb4a11


HAWQ-1498. Segments keep open file descriptors for deleted files


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/31aeb4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/31aeb4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/31aeb4a1

Branch: refs/heads/master
Commit: 31aeb4a11c14571051a39c2144a6eeb8d43a1606
Parents: c0cd47c
Author: Yi <yjin@apache.org>
Authored: Fri Aug 11 22:03:33 2017 +1000
Committer: Yi <yjin@apache.org>
Committed: Fri Aug 11 22:03:33 2017 +1000

----------------------------------------------------------------------
 src/backend/cdb/cdbpersistentfilesysobj.c |   3 +
 src/backend/storage/file/fd.c             | 171 +++++++++++++++++++------
 src/include/storage/fd.h                  |   1 +
 3 files changed, 139 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/backend/cdb/cdbpersistentfilesysobj.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpersistentfilesysobj.c b/src/backend/cdb/cdbpersistentfilesysobj.c
index 77ae62c..8667fd6 100644
--- a/src/backend/cdb/cdbpersistentfilesysobj.c
+++ b/src/backend/cdb/cdbpersistentfilesysobj.c
@@ -2129,6 +2129,9 @@ void PersistentFileSysObj_EndXactDrop(
 							ignoreNonExistence,
 							Debug_persistent_print,
 							Persistent_DebugPrintLevel());
+
+	// clean up alive connections that are used for deleting hdfs objects
+	cleanup_hdfs_handlers_for_dropping();
 }
 
 void PersistentFileSysObj_UpdateRelationBufpoolKind(

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/backend/storage/file/fd.c
----------------------------------------------------------------------
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 4ec458e..2366318 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -228,6 +228,7 @@ typedef struct
  * hash table of hdfs file systems, key = hdfs:/<host>:<port>, value = hdfsFS
  */
 static HTAB * HdfsFsTable = NULL;
+static HTAB * HdfsFsTable4Drop = NULL;
 static MemoryContext HdfsGlobalContext = NULL;
 #define EXPECTED_MAX_HDFS_CONNECTIONS 10
 
@@ -298,7 +299,7 @@ static void CleanupTempFiles(bool isProcExit);
 static void RemovePgTempFilesInDir(const char *tmpdirname);
 static bool HasTempFilePrefix(char * fileName);
 
-static hdfsFS HdfsGetConnection(const char * path);
+static hdfsFS HdfsGetConnection(const char * path, bool isForDrop);
 static bool HdfsBasicOpenFile(FileName fileName, int fileFlags, int fileMode,
 							  char **hProtocol, hdfsFS *fs, hdfsFile *hFile);
 static const char * ConvertToUnixPath(const char * fileName, char * buffer,
@@ -1793,7 +1794,7 @@ AllocateDir(const char *dirname)
 			return NULL;
 		if (ConvertToUnixPath(dirname, unixpath, sizeof(unixpath)) == NULL)
 			return NULL;
-		if ((fs = HdfsGetConnection(dirname)) == NULL)
+		if ((fs = HdfsGetConnection(dirname, false)) == NULL)
 			return NULL;
 		/* TODO: add to filesystem! */
 		if ((info = hdfsListDirectory(fs, unixpath, &num)) == NULL)
@@ -2005,15 +2006,66 @@ void
 cleanup_filesystem_handler(void)
 {
 	HASH_SEQ_STATUS	status;
+	HASH_SEQ_STATUS status4drop;
 	struct FsEntry *entry;
 	char *protocol;
 
-	if (NULL == HdfsFsTable)
+	if (NULL == HdfsFsTable && NULL == HdfsFsTable4Drop)
 		return;
 
-	hash_seq_init(&status, HdfsFsTable);
+	if (NULL != HdfsFsTable) {
+		hash_seq_init(&status, HdfsFsTable);
 
-	while (NULL != (entry = hash_seq_search(&status)))
+		while (NULL != (entry = hash_seq_search(&status)))
+		{
+			if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || NULL == protocol)
+			{
+				elog(WARNING, "cannot get protocol for host: %s", entry->host);
+				continue;
+			}
+
+			if (entry->fs)
+				HdfsDisconnect(protocol, entry->fs);
+			pfree(protocol);
+		}
+		hash_destroy(HdfsFsTable);
+		HdfsFsTable = NULL;
+	}
+
+	if (NULL != HdfsFsTable4Drop) {
+		hash_seq_init(&status4drop, HdfsFsTable4Drop);
+
+		while (NULL != (entry = hash_seq_search(&status4drop)))
+		{
+			if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || NULL == protocol)
+			{
+				elog(WARNING, "cannot get protocol for host: %s", entry->host);
+				continue;
+			}
+
+			if (entry->fs)
+				HdfsDisconnect(protocol, entry->fs);
+			pfree(protocol);
+		}
+		hash_destroy(HdfsFsTable4Drop);
+		HdfsFsTable4Drop = NULL;
+	}
+	MemoryContextResetAndDeleteChildren(HdfsGlobalContext);
+}
+
+void
+cleanup_hdfs_handlers_for_dropping()
+{
+	HASH_SEQ_STATUS status4drop;
+	struct FsEntry *entry;
+	char *protocol;
+
+	if (NULL == HdfsFsTable4Drop)
+		return;
+
+	hash_seq_init(&status4drop, HdfsFsTable4Drop);
+
+	while (NULL != (entry = hash_seq_search(&status4drop)))
 	{
 		if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || NULL == protocol)
 		{
@@ -2026,13 +2078,10 @@ cleanup_filesystem_handler(void)
 		pfree(protocol);
 	}
 
-	hash_destroy(HdfsFsTable);
-	HdfsFsTable = NULL;
-
-	MemoryContextResetAndDeleteChildren(HdfsGlobalContext);
+	hash_destroy(HdfsFsTable4Drop);
+	HdfsFsTable4Drop = NULL;
 }
 
-
 /*
  * closeAllVfds
  *
@@ -2339,10 +2388,11 @@ HasTempFilePrefix(char * fileName)
  * 		hdfs:/<host>:<port>/...
  */
 static hdfsFS
-HdfsGetConnection(const char * path)
+HdfsGetConnection(const char * path, bool isForDrop)
 {
 	struct FsEntry * entry;
 	HASHCTL hash_ctl;
+	HASHCTL hash_ctl_4drop;
 	bool found;
 
 	char *host = NULL, *location = NULL, *protocol;
@@ -2363,15 +2413,16 @@ HdfsGetConnection(const char * path)
 		else
 			sprintf(location, "%s://%s/", protocol, host);
 
+		if (NULL == HdfsGlobalContext)
+		{
+			Assert(NULL != TopMemoryContext);
+			HdfsGlobalContext = AllocSetContextCreate(TopMemoryContext,
+					"HDFS Global Context", ALLOCSET_DEFAULT_MINSIZE,
+					ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
+		}
+
 		if (NULL == HdfsFsTable)
 		{
-			if (NULL == HdfsGlobalContext)
-			{
-				Assert(NULL != TopMemoryContext);
-				HdfsGlobalContext = AllocSetContextCreate(TopMemoryContext,
-						"HDFS Global Context", ALLOCSET_DEFAULT_MINSIZE,
-						ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
-			}
 
 			MemSet(&hash_ctl, 0, sizeof(hash_ctl));
 			hash_ctl.keysize = MAXPGPATH;
@@ -2380,8 +2431,9 @@ HdfsGetConnection(const char * path)
 			hash_ctl.hcxt = HdfsGlobalContext;
 
 			HdfsFsTable = hash_create("hdfs connections hash table",
-					EXPECTED_MAX_HDFS_CONNECTIONS, &hash_ctl,
-					HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+									  EXPECTED_MAX_HDFS_CONNECTIONS,
+									  &hash_ctl,
+									  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
 
 			if (HdfsFsTable == NULL )
 			{
@@ -2389,6 +2441,30 @@ HdfsGetConnection(const char * path)
 				errno = EIO;
 				break;
 			}
+
+			elog(LOG, "created hash table for hdfs access.");
+		}
+
+		if (NULL == HdfsFsTable4Drop)
+		{
+			MemSet(&hash_ctl_4drop, 0, sizeof(hash_ctl_4drop));
+			hash_ctl_4drop.keysize = MAXPGPATH;
+			hash_ctl_4drop.entrysize = sizeof(struct FsEntry);
+			hash_ctl_4drop.hash = string_hash;
+			hash_ctl_4drop.hcxt = HdfsGlobalContext;
+
+			HdfsFsTable4Drop = hash_create("hash connections hash table for drop",
+										   EXPECTED_MAX_HDFS_CONNECTIONS,
+										   &hash_ctl_4drop,
+										   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+			if (HdfsFsTable4Drop == NULL )
+			{
+				elog(WARNING, "failed to create hash table for dropping table.");
+				errno = EIO;
+				break;
+			}
+
+			elog(LOG, "created hash table (4drop) for hdfs access.");
 		}
 
 		if (enable_secure_filesystem && Gp_role != GP_ROLE_EXECUTE)
@@ -2403,8 +2479,21 @@ HdfsGetConnection(const char * path)
 		    }
 		}
 
-		entry = (struct FsEntry *) hash_search(HdfsFsTable, location,
-				HASH_ENTER, &found);
+		/* If this is for normal connection, check from normal table, otherwise,
+		 * check the table for dropping. */
+		if (!isForDrop) {
+			entry = (struct FsEntry *) hash_search(HdfsFsTable,
+												   location,
+												   HASH_ENTER,
+												   &found);
+		}
+		else
+		{
+			entry = (struct FsEntry *) hash_search(HdfsFsTable4Drop,
+												   location,
+												   HASH_ENTER,
+												   &found);
+		}
 
 		if (!found)
 		{
@@ -2423,7 +2512,10 @@ HdfsGetConnection(const char * path)
 										errmsg("failed to get filesystem credential."),
 										errdetail("%s", HdfsGetLastError())));
 
-						hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+						if (!isForDrop)
+						    hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+						else
+							hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found);
 						errno = EACCES;
 						break;
 					}
@@ -2432,7 +2524,11 @@ HdfsGetConnection(const char * path)
 				{
 					if (!login())
 					{
-						hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+						if (!isForDrop)
+							hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+						else {
+							hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found);
+						}
 						errno = EACCES;
 						break;
 					}
@@ -2448,7 +2544,10 @@ HdfsGetConnection(const char * path)
 
 			if (NULL == entry->fs)
 			{
-				hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+				if (!isForDrop)
+					hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+				else
+					hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found);
 				ereport(LOG,
 						(errcode(ERRCODE_IO_ERROR),
 								errmsg("fail to connect hdfs at %s, errno = %d", location, errno),
@@ -2606,7 +2705,7 @@ HdfsBasicOpenFile(FileName fileName, int fileFlags, int fileMode,
 	if (NULL == ConvertToUnixPath(fileName, path, sizeof(path)))
 		return FALSE;
 
-	tempfs = HdfsGetConnection(fileName);
+	tempfs = HdfsGetConnection(fileName, false);
 	if (tempfs == NULL)
 		return FALSE;
 
@@ -2976,7 +3075,7 @@ HdfsRemovePath(FileName fileName, int recursive)
 	if (HdfsParsePath(fileName, &protocol, NULL, NULL, NULL) || NULL == protocol)
 		return -1;
 
-	hdfsFS fs = HdfsGetConnection(fileName);
+	hdfsFS fs = HdfsGetConnection(fileName, true);
 	if (NULL == fs)
 		return -1;
 	if (NULL == ConvertToUnixPath(fileName, path, sizeof(path)))
@@ -3001,7 +3100,7 @@ HdfsMakeDirectory(const char * path, mode_t mode)
 	if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || protocol == NULL)
 		return -1;
 
-	hdfsFS fs = HdfsGetConnection(path);
+	hdfsFS fs = HdfsGetConnection(path, false);
 	if (NULL == fs)
 		return -1;
 	if (NULL == ConvertToUnixPath(path, p, sizeof(p)))
@@ -3128,7 +3227,7 @@ HdfsGetDelegationToken(const char *uri, void **fs)
 	char *token;
 	char *retval;
 
-	*fs = HdfsGetConnection(uri);
+	*fs = HdfsGetConnection(uri, false);
 	if (*fs == NULL)
 		return NULL;
 
@@ -3285,7 +3384,7 @@ HdfsPathFileTruncate(FileName fileName) {
 		return -1;
 	}
 
-	fs = HdfsGetConnection(fileName);
+	fs = HdfsGetConnection(fileName, true);
 	if (fs == NULL)
 		return -1;
 
@@ -3338,7 +3437,7 @@ HdfsPathExist(char *path)
 	if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol)
 		elog(ERROR, "cannot get protocol for path: %s", path);
 
-	fs = HdfsGetConnection(path);
+	fs = HdfsGetConnection(path, false);
 
 	if (fs == NULL)
 		return false;
@@ -3363,7 +3462,7 @@ HdfsPathExistAndNonEmpty(char *path, bool *existed)
   if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol)
     elog(ERROR, "cannot get protocol for path: %s", path);
 
-  fs = HdfsGetConnection(path);
+  fs = HdfsGetConnection(path, false);
 
   if (fs == NULL)
     return false;
@@ -3446,7 +3545,7 @@ HdfsPathSize(const char *path)
     return 0;
   if (ConvertToUnixPath(path, unixpath, sizeof(unixpath)) == NULL)
     return 0;
-  if ((fs = HdfsGetConnection(path)) == NULL)
+  if ((fs = HdfsGetConnection(path, false)) == NULL)
     return 0;
 
   total_size = HdfsPathSizeRecursive(fs, protocol, unixpath);
@@ -3469,7 +3568,7 @@ HdfsGetFileLength(char * path)
 	if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol)
 		elog(ERROR, "cannot get protocol for path: %s", path);
 
-	fs = HdfsGetConnection(path);
+	fs = HdfsGetConnection(path, false);
 
 	if (fs == NULL)
 		return -1;
@@ -3506,7 +3605,7 @@ int HdfsIsDirOrFile(char * path)
 	if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol)
 		elog(ERROR, "cannot get protocol for path: %s", path);
 
-	fs = HdfsGetConnection(path);
+	fs = HdfsGetConnection(path, false);
 
 	if (fs == NULL)
 		return -1;
@@ -3552,7 +3651,7 @@ HdfsGetFileBlockLocations2(const char *path, int64 offset, int64 length,
int *bl
 		elog(ERROR, "cannot get protocol for path: %s", path);
 	}
 
-	fs = HdfsGetConnection(path);
+	fs = HdfsGetConnection(path, false);
 
 	if (NULL == fs)
 	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/include/storage/fd.h
----------------------------------------------------------------------
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 0c25264..dde0c69 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -113,6 +113,7 @@ extern char *DeserializeDelegationToken(void *binary, int size);
 
 extern void cleanup_lru_opened_files(void);
 extern void cleanup_filesystem_handler(void);
+extern void cleanup_hdfs_handlers_for_dropping(void);
 
 /* abstract file system */
 extern File FileNameOpenFile(FileName fileName, const char *temp_dir, int fileFlags, int
fileMode);


Mime
View raw message