hawq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Librago <...@git.apache.org>
Subject [GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Date Wed, 08 Aug 2018 01:43:58 GMT
Github user Librago commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434274
  
    --- Diff: contrib/exthdfs/exthdfs.c ---
    @@ -0,0 +1,472 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +
    +
    +#include "postgres.h"
    +
    +#include "common.h"
    +#include "access/extprotocol.h"
    +#include "cdb/cdbdatalocality.h"
    +#include "storage/fd.h"
    +#include "storage/filesystem.h"
    +#include "utils/uri.h"
    +
    +
    +
    +
    +PG_MODULE_MAGIC;
    +
    +PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation);
    +PG_FUNCTION_INFO_V1(hdfsprotocol_validate);
    +
    +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS);
    +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS);
    +
    +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
    +{
    +
    +	// Build the result instance
    +	int nsize = 0;
    +	int numOfBlock = 0;
    +	ExtProtocolBlockLocationData *bldata =
    +		palloc0(sizeof(ExtProtocolBlockLocationData));
    +	if (bldata == NULL)
    +	{
    +		elog(ERROR, "hdfsprotocol_blocklocation : "
    +                    "cannot allocate due to no memory");
    +	}
    +	bldata->type = T_ExtProtocolBlockLocationData;
    +	fcinfo->resultinfo = bldata;
    +
    +	ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *)
    +												(fcinfo->context);
    +
    +
    +	 // Parse URI of the first location, we expect all locations uses the same
    +	 // name node server. This is checked in validation function.
    +
    +	char *first_uri_str =
    +		(char *)strVal(lfirst(list_head(pvalidator_data->url_list)));
    +	Uri *uri = ParseExternalTableUri(first_uri_str);
    +
    +	elog(DEBUG3, "hdfsprotocol_blocklocation : "
    +				 "extracted HDFS name node address %s:%d",
    +				 uri->hostname, uri->port);
    +
    +	// Create file system instance
    +	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
    +	if (fs == NULL)
    +	{
    +		elog(ERROR, "hdfsprotocol_blocklocation : "
    +					"failed to create HDFS instance to connect to %s:%d",
    +					uri->hostname, uri->port);
    +	}
    +
    +	// Clean up uri instance as we don't need it any longer
    +	pfree(uri);
    +
    +	// Check all locations to get files to fetch location.
    +	ListCell *lc = NULL;
    +	foreach(lc, pvalidator_data->url_list)
    +	{
    +		// Parse current location URI.
    +		char *url = (char *)strVal(lfirst(lc));
    +		Uri *uri = ParseExternalTableUri(url);
    +		if (uri == NULL)
    +		{
    +			elog(ERROR, "hdfsprotocol_blocklocation : "
    +						"invalid URI encountered %s", url);
    +		}
    +
    +		 //
    +		 // NOTICE: We temporarily support only directories as locations. We plan
    +		 //        to extend the logic to specifying single file as one location
    +		 //         very soon.
    +
    +
    +		// get files contained in the path.
    +		hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
    +		if (fiarray == NULL)
    +		{
    +			elog(ERROR, "hdfsprotocol_blocklocation : "
    +						"failed to get files of path %s",
    +						uri->path);
    +		}
    +
    +		int i = 0 ;
    +		// Call block location api to get data location for each file
    +		for (i = 0 ; i < nsize ; i++)
    +		{
    +			hdfsFileInfo *fi = &fiarray[i];
    +
    +			// break condition of this for loop
    +			if (fi == NULL) {break;}
    +
    +			// Build file name full path.
    +			const char *fname = fi->mName;
    +			char *fullpath = palloc0(                // slash
    +									 strlen(fname) +      // name
    +									 1);                  // \0
    +			sprintf(fullpath, "%s", fname);
    +
    +			elog(DEBUG3, "hdfsprotocol_blocklocation : "
    +						 "built full path file %s", fullpath);
    +
    +			// Get file full length.
    +			int64_t len = fi->mSize;
    +
    +			elog(DEBUG3, "hdfsprotocol_blocklocation : "
    +					     "got file %s length " INT64_FORMAT,
    +					     fullpath, len);
    +
    +			if (len == 0) {
    +				pfree(fullpath);
    +				continue;
    +			}
    +
    +			// Get block location data for this file
    +			BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
    +			if (bla == NULL)
    +			{
    +				elog(ERROR, "hdfsprotocol_blocklocation : "
    +							"failed to get block location of path %s. "
    +							"It is reported generally due to HDFS service errors or "
    +							"another session's ongoing writing.",
    +							fullpath);
    +			}
    +
    +			// Add file full path and its block number as result.
    +			blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
    +			blf->file_uri = pstrdup(fullpath);
    +			blf->block_num = numOfBlock;
    +			blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num);
    +
    +			elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks",
    +					  	 fullpath, blf->block_num);
    +
    +			// We don't need it any longer
    +			pfree(fullpath);
    +			int bidx = 0;
    +			// Add block information as a list.
    +			for (bidx = 0 ; bidx < blf->block_num ; bidx++)
    +			{
    +				BlockLocation *blo = &bla[bidx];
    +				BlockLocation *bl = &(blf->locations[bidx]);
    +				bl->numOfNodes = blo->numOfNodes;
    +				bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
    +				bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
    +				bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
    +				bl->offset = blo->offset;
    +				bl->length = blo->length;
    +				bl->corrupt = blo->corrupt;
    +
    +				int nidx = 0 ;
    +				for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++)
    +				{
    +					bl->hosts[nidx] = pstrdup(*blo[nidx].hosts);
    +					bl->names[nidx] = pstrdup(*blo[nidx].names);
    +					bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths);
    +				}
    +			}
    +
    +			bldata->files = lappend(bldata->files, (void *)(blf));
    +
    +			// Clean up block location instances created by the lib.
    +			hdfsFreeFileBlockLocations(bla,numOfBlock);
    +		}
    +
    +		// Clean up URI instance in loop as we don't need it any longer
    +		pfree(uri);
    +
    +		// Clean up file info array created by the lib for this location.
    +		hdfsFreeFileInfo(fiarray,nsize);
    +	}
    +
    +	// destroy fs instance
    +	hdfsDisconnect(fs);
    +
    +	PG_RETURN_VOID();
    +
    +}
    +
    +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
    +{
    +	elog(DEBUG3, "hdfsprotocol_validate() begin");
    +
    +	/* Check which action should perform. */
    +	ExtProtocolValidatorData *pvalidator_data =
    +       (ExtProtocolValidatorData *)(fcinfo->context);
    +
    +	if (pvalidator_data->forceCreateDir)
    +		Assert(pvalidator_data->url_list && pvalidator_data->url_list->length
== 1);
    +
    +	if (pvalidator_data->direction == EXT_VALIDATE_WRITE)
    +	{
    +		/* accept only one directory location */
    +		if (list_length(pvalidator_data->url_list) != 1)
    +		{
    +			ereport(ERROR,
    +					(errcode(ERRCODE_SYNTAX_ERROR),
    +					 errmsg("hdfsprotocol_validate : "
    +							"only one location url is supported for writable external hdfs")));
    +		}
    +	}
    +
    +	/* Go through first round to get formatter type */
    +	bool isCsv = false;
    +	bool isText = false;
    +	bool isOrc = false;
    +	ListCell *optcell = NULL;
    +	foreach(optcell, pvalidator_data->format_opts)
    +	{
    +		DefElem *de = (DefElem *)lfirst(optcell);
    +		if (strcasecmp(de->defname, "formatter") == 0)
    +		{
    +			char *val = strVal(de->arg);
    +			if (strcasecmp(val, "csv") == 0)
    +			{
    +				isCsv = true;
    +			}
    +			else if (strcasecmp(val, "text") == 0)
    +			{
    +				isText = true;
    +			}
    +			else if (strcasecmp(val, "orc") == 0)
    +			{
    +				isOrc = true;
    +			}
    +		}
    +	}
    +	if(1)
    +	{
    +		ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +						 errmsg("hdfsprotocol_validate : "
    +								"no formatter is supported for external hdfs")));
    +	}
    +	if (!isCsv && !isText && !isOrc)
    +	{
    +		ereport(ERROR,
    +				(errcode(ERRCODE_SYNTAX_ERROR),
    +				 errmsg("hdfsprotocol_validate : "
    +						"only 'csv', 'text' and 'orc' formatter is supported for external hdfs")));
    +	}
    +	Assert(isCsv || isText || isOrc);
    +
    +	/* Validate formatter options */
    +	foreach(optcell, pvalidator_data->format_opts)
    +	{
    +		DefElem *de = (DefElem *)lfirst(optcell);
    +		if (strcasecmp(de->defname, "delimiter") == 0)
    +		{
    +			char *val = strVal(de->arg);
    +			/* Validation 1. User can not specify 'OFF' in delimiter */
    +			if (strcasecmp(val, "off") == 0)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +						 errmsg("hdfsprotocol_validate : "
    +								"'off' value of 'delimiter' option is not supported")));
    +			}
    +			/* Validation 2. Can specify multibytes characters */
    +			if (strlen(val) < 1)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +								 errmsg("hdfsprotocol_validate : "
    +										"'delimiter' option accepts multibytes characters")));
    +			}
    +		}
    +
    +		if (strcasecmp(de->defname, "escape") == 0)
    +		{
    +			char *val = strVal(de->arg);
    +			/* Validation 3. User can not specify 'OFF' in delimiter */
    +			if (strcasecmp(val, "off") == 0)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +						 errmsg("hdfsprotocol_validate : "
    +								"'off' value of 'escape' option is not supported")));
    +			}
    +			/* Validation 4. Can only specify one character */
    +			if (strlen(val) != 1)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +								 errmsg("hdfsprotocol_validate : "
    +										"'escape' option accepts single character")));
    +			}
    +		}
    +
    +		if (strcasecmp(de->defname, "newline") == 0)
    +		{
    +			char *val = strVal(de->arg);
    +			/* Validation 5. only accept 'lf', 'cr', 'crlf' */
    +			if (strcasecmp(val, "lf") != 0 &&
    +				strcasecmp(val, "cr") != 0 &&
    +				strcasecmp(val, "crlf") != 0)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +						 errmsg("hdfsprotocol_validate : "
    +								"the value of 'newline' option can only be "
    +								"'lf', 'cr' or 'crlf'")));
    +			}
    +		}
    +
    +		if (strcasecmp(de->defname, "quote") == 0)
    +		{
    +			/* This is allowed only for csv mode formatter */
    +			if (!isCsv)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +								 errmsg("hdfsprotocol_validate : "
    +										"'quote' option is only available in 'csv' formatter")));
    +			}
    +
    +			char *val = strVal(de->arg);
    +			/* Validation 5. Can only specify one character */
    +			if (strlen(val) != 1)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +								 errmsg("hdfsprotocol_validate : "
    +										"'quote' option accepts single character")));
    +			}
    +		}
    +
    +		if (strcasecmp(de->defname, "force_notnull") == 0)
    +		{
    +			/* This is allowed only for csv mode formatter */
    +			if (!isCsv)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +								 errmsg("hdfsprotocol_validate : "
    +										"'force_notnull' option is only available in 'csv' formatter")));
    +			}
    +		}
    +
    +		if (strcasecmp(de->defname, "force_quote") == 0)
    +		{
    +			/* This is allowed only for csv mode formatter */
    +			if (!isCsv)
    +			{
    +				ereport(ERROR,
    +						(errcode(ERRCODE_SYNTAX_ERROR),
    +								 errmsg("hdfsprotocol_validate : "
    +										"'force_quote' option is only available in 'csv' formatter")));
    +			}
    +		}
    +	}
    +
    +	/* All urls should
    +	 * 1) have the same protocol name 'hdfs',
    +	 * 2) the same hdfs namenode server address
    +	 */
    +	/* Check all locations to get files to fetch location. */
    +	char *nnaddr = NULL;
    +	int nnport = -1;
    +	ListCell *lc = NULL;
    +	foreach(lc, pvalidator_data->url_list)
    +	{
    +		/* Parse current location URI. */
    +		char *url = (char *)strVal(lfirst(lc));
    +		Uri *uri = ParseExternalTableUri(url);
    +		if (uri == NULL)
    +		{
    +			elog(ERROR, "hdfsprotocol_validate : "
    +						"invalid URI encountered %s", url);
    +		}
    +
    +		if (uri->protocol != URI_HDFS)
    +		{
    +			elog(ERROR, "hdfsprotocol_validate : "
    +						"invalid URI protocol encountered in %s, "
    +						"hdfs:// protocol is required",
    +						url);
    +		}
    +
    +		if (nnaddr == NULL)
    +		{
    +			nnaddr = pstrdup(uri->hostname);
    +			nnport = uri->port;
    +		}
    +		else
    +		{
    +			if (strcmp(nnaddr, uri->hostname) != 0)
    +			{
    +				elog(ERROR, "hdfsprotocol_validate : "
    +							"different name server addresses are detected, "
    +							"both %s and %s are found",
    +							nnaddr, uri->hostname);
    +			}
    +			if (nnport != uri->port)
    +			{
    +				elog(ERROR, "hdfsprotocol_validate : "
    +							"different name server ports are detected, "
    +							"both %d and %d are found",
    +							nnport, uri->port);
    +			}
    +		}
    +
    +		/* SHOULD ADD LOGIC HERE TO CREATE UNEXISTING PATH */
    +		if (pvalidator_data->forceCreateDir) {
    +
    +		  elog(LOG, "hdfs_validator() forced creating dir");
    +
    +		  /* Create file system instance */
    +		  	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
    +			if (fs == NULL)
    +			{
    +				elog(ERROR, "hdfsprotocol_validate : "
    +							"failed to create HDFS instance to connect to %s:%d",
    +							uri->hostname, uri->port);
    +			}
    +
    +			if (hdfsExists(fs, uri->path) == -1)
    +				elog(ERROR, "hdfsprotocol_validate : "
    +						"Location \"%s\" is not exist",
    +						uri->path);
    +
    +		 /* destroy fs instance */
    +			hdfsDisconnect(fs);
    +		}
    +
    +		/* Clean up temporarily created instances */
    +		pfree(uri);
    --- End diff --
    
    this is a memory leak, use FreeExternalTableUri(uri)


---

Mime
View raw message