From hdfs-commits-return-4859-apmail-hadoop-hdfs-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Nov 8 19:10:51 2012 Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82F48D80D for ; Thu, 8 Nov 2012 19:10:51 +0000 (UTC) Received: (qmail 74382 invoked by uid 500); 8 Nov 2012 19:10:51 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 74332 invoked by uid 500); 8 Nov 2012 19:10:51 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 74324 invoked by uid 99); 8 Nov 2012 19:10:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Nov 2012 19:10:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Nov 2012 19:10:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 69A782388C2D; Thu, 8 Nov 2012 19:10:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1407217 [3/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/main/proto/ src/contrib/bkjournal/src/test/j... Date: Thu, 08 Nov 2012 19:10:04 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121108191010.69A782388C2D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c?rev=1407217&r1=1407216&r2=1407217&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c Thu Nov 8 19:09:46 2012 @@ -16,6 +16,10 @@ * limitations under the License. */ +#include +#include +#include + #include "exception.h" #include "hdfs.h" #include "hdfs_http_client.h" @@ -23,15 +27,9 @@ #include "hdfs_json_parser.h" #include "jni_helper.h" -#include -#include -#include -#include -#include - -#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration" -#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" -#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" +#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration" +#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode" +#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress" struct hdfsBuilder { int forceNewInstance; @@ -65,30 +63,70 @@ struct hdfs_internal { */ struct hdfsFile_internal { struct webhdfsFileHandle* file; - enum hdfsStreamType type; - int flags; - tOffset offset; + enum hdfsStreamType type; /* INPUT or OUTPUT */ + int flags; /* Flag indicate read/create/append etc. */ + tOffset offset; /* Current offset position in the file */ }; -static webhdfsBuffer *initWebHdfsBuffer(void) +/** + * Create, initialize and return a webhdfsBuffer + */ +static int initWebHdfsBuffer(struct webhdfsBuffer **webhdfsBuffer) { - webhdfsBuffer *buffer = calloc(1, sizeof(*buffer)); + int ret = 0; + struct webhdfsBuffer *buffer = calloc(1, sizeof(struct webhdfsBuffer)); if (!buffer) { - fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n"); - return NULL; + fprintf(stderr, + "ERROR: fail to allocate memory for webhdfsBuffer.\n"); + return ENOMEM; } - buffer->remaining = 0; - buffer->offset = 0; - buffer->wbuffer = NULL; - buffer->closeFlag = 0; - buffer->openFlag = 0; - pthread_mutex_init(&buffer->writeMutex, NULL); - pthread_cond_init(&buffer->newwrite_or_close, NULL); - pthread_cond_init(&buffer->transfer_finish, NULL); - return buffer; + ret = pthread_mutex_init(&buffer->writeMutex, NULL); + if (ret) { + fprintf(stderr, "ERROR: fail in pthread_mutex_init for writeMutex " + "in initWebHdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + goto done; + } + ret = pthread_cond_init(&buffer->newwrite_or_close, NULL); + if (ret) { + fprintf(stderr, + "ERROR: fail in pthread_cond_init for newwrite_or_close " + "in initWebHdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + goto done; + } + ret = pthread_cond_init(&buffer->transfer_finish, NULL); + if (ret) { + fprintf(stderr, + "ERROR: fail in pthread_cond_init for transfer_finish " + "in initWebHdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + goto done; + } + +done: + if (ret) { + free(buffer); + return ret; + } + *webhdfsBuffer = buffer; + return 0; } -static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) { +/** + * Reset the webhdfsBuffer. This is used in a block way + * when hdfsWrite is called with a new buffer to write. + * The writing thread in libcurl will be waken up to continue writing, + * and the caller of this function is blocked waiting for writing to finish. + * + * @param wb The handle of the webhdfsBuffer + * @param buffer The buffer provided by user to write + * @param length The length of bytes to write + * @return Updated webhdfsBuffer. + */ +static struct webhdfsBuffer *resetWebhdfsBuffer(struct webhdfsBuffer *wb, + const char *buffer, size_t length) +{ if (buffer && length > 0) { pthread_mutex_lock(&wb->writeMutex); wb->wbuffer = buffer; @@ -103,35 +141,49 @@ static webhdfsBuffer *resetWebhdfsBuffer return wb; } -static void freeWebhdfsBuffer(webhdfsBuffer *buffer) { +/** + * Free the webhdfsBuffer and destroy its pthread conditions/mutex + * @param buffer The webhdfsBuffer to free + */ +static void freeWebhdfsBuffer(struct webhdfsBuffer *buffer) +{ + int ret = 0; if (buffer) { - int des = pthread_cond_destroy(&buffer->newwrite_or_close); - if (des == EBUSY) { - fprintf(stderr, "The condition newwrite_or_close is still referenced!\n"); - } else if (des == EINVAL) { - fprintf(stderr, "The condition newwrite_or_close is invalid!\n"); - } - des = pthread_cond_destroy(&buffer->transfer_finish); - if (des == EBUSY) { - fprintf(stderr, "The condition transfer_finish is still referenced!\n"); - } else if (des == EINVAL) { - fprintf(stderr, "The condition transfer_finish is invalid!\n"); + ret = pthread_cond_destroy(&buffer->newwrite_or_close); + if (ret) { + fprintf(stderr, + "WARN: fail in pthread_cond_destroy for newwrite_or_close " + "in freeWebhdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + errno = ret; } - if (des == EBUSY) { - fprintf(stderr, "The condition close_clean is still referenced!\n"); - } else if (des == EINVAL) { - fprintf(stderr, "The condition close_clean is invalid!\n"); + ret = pthread_cond_destroy(&buffer->transfer_finish); + if (ret) { + fprintf(stderr, + "WARN: fail in pthread_cond_destroy for transfer_finish " + "in freeWebhdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + errno = ret; } - des = pthread_mutex_destroy(&buffer->writeMutex); - if (des == EBUSY) { - fprintf(stderr, "The mutex is still locked or referenced!\n"); + ret = pthread_mutex_destroy(&buffer->writeMutex); + if (ret) { + fprintf(stderr, + "WARN: fail in pthread_mutex_destroy for writeMutex " + "in freeWebhdfsBuffer, <%d>: %s.\n", + ret, hdfs_strerror(ret)); + errno = ret; } free(buffer); buffer = NULL; } } -static void freeWebFileHandle(struct webhdfsFileHandle * handle) { +/** + * To free the webhdfsFileHandle, which includes a webhdfsBuffer and strings + * @param handle The webhdfsFileHandle to free + */ +static void freeWebFileHandle(struct webhdfsFileHandle * handle) +{ if (!handle) return; freeWebhdfsBuffer(handle->uploadBuffer); @@ -140,11 +192,46 @@ static void freeWebFileHandle(struct web free(handle); } +static const char *maybeNull(const char *str) +{ + return str ? str : "(NULL)"; +} + +/** To print a hdfsBuilder as string */ +static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, + char *buf, size_t bufLen) +{ + int strlength = snprintf(buf, bufLen, "nn=%s, port=%d, " + "kerbTicketCachePath=%s, userName=%s", + maybeNull(bld->nn), bld->port, + maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); + if (strlength < 0 || strlength >= bufLen) { + fprintf(stderr, "failed to print a hdfsBuilder as string.\n"); + return NULL; + } + return buf; +} + +/** + * Free a hdfs_internal handle + * @param fs The hdfs_internal handle to free + */ +static void freeWebHdfsInternal(struct hdfs_internal *fs) +{ + if (fs) { + free(fs->nn); + free(fs->userName); + free(fs->workingDir); + } +} + struct hdfsBuilder *hdfsNewBuilder(void) { struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); - if (!bld) + if (!bld) { + errno = ENOMEM; return NULL; + } return bld; } @@ -206,12 +293,7 @@ hdfsFS hdfsConnect(const char* nn, tPort hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { - struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port); - if (!bld) { - return NULL; - } - hdfsBuilderSetForceNewInstance(bld); - return hdfsBuilderConnect(bld); + return hdfsConnect(nn, port); } hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, @@ -227,30 +309,16 @@ hdfsFS hdfsConnectAsUserNewInstance(cons return hdfsBuilderConnect(bld); } -static const char *maybeNull(const char *str) -{ - return str ? str : "(NULL)"; -} - -static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, - char *buf, size_t bufLen) -{ - snprintf(buf, bufLen, "nn=%s, port=%d, " - "kerbTicketCachePath=%s, userName=%s", - maybeNull(bld->nn), bld->port, - maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); - return buf; -} - -static void freeWebHdfsInternal(struct hdfs_internal *fs) -{ - if (fs) { - free(fs->nn); - free(fs->userName); - free(fs->workingDir); - } -} - +/** + * To retrieve the default configuration value for NameNode's hostName and port + * TODO: This function currently is using JNI, + * we need to do this without using JNI (HDFS-3917) + * + * @param bld The hdfsBuilder handle + * @param port Used to get the default value for NameNode's port + * @param nn Used to get the default value for NameNode's hostName + * @return 0 for success and non-zero value for failure + */ static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port, char **nn) { @@ -262,13 +330,11 @@ static int retrieveDefaults(const struct int ret = 0; char buf[512]; - // TODO: can we do this without using JNI? See HDFS-3917 env = getJNIEnv(); if (!env) { return EINTERNAL; } - // jHDFSConf = new HDFSConfiguration(); jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, @@ -277,12 +343,14 @@ static int retrieveDefaults(const struct goto done; } - jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress", - "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;", - jHDFSConf); + jthr = invokeMethod(env, &jVal, STATIC, NULL, + HADOOP_NAMENODE, "getHttpAddress", + "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;", + jHDFSConf); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); + "hdfsBuilderConnect(%s)", + hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jAddress = jVal.l; @@ -298,7 +366,8 @@ static int retrieveDefaults(const struct *port = jVal.i; jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, - JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;"); + JAVA_INETSOCKETADDRESS, + "getHostName", "()Ljava/lang/String;"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", @@ -324,7 +393,7 @@ done: hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { struct hdfs_internal *fs = NULL; - int ret; + int ret = 0; if (!bld) { ret = EINVAL; @@ -341,8 +410,8 @@ hdfsFS hdfsBuilderConnect(struct hdfsBui ret = ENOMEM; goto done; } - /* If the namenode is "default" and/or the port of namenode is 0, get the - * default namenode/port */ + // If the namenode is "default" and/or the port of namenode is 0, + // get the default namenode/port if (bld->port == 0 || !strcasecmp("default", bld->nn)) { ret = retrieveDefaults(bld, &fs->port, &fs->nn); if (ret) @@ -369,7 +438,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBui ret = ENOMEM; goto done; } - //for debug + // For debug fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port); done: @@ -392,47 +461,68 @@ int hdfsDisconnect(hdfsFS fs) return 0; } -static char *getAbsolutePath(hdfsFS fs, const char *path) +/** + * Based on the working directory stored in hdfsFS, + * generate the absolute path for the given path + * + * @param fs The hdfsFS handle which stores the current working directory + * @param path The given path which may not be an absolute path + * @param absPath To hold generated absolute path for the given path + * @return 0 on success, non-zero value indicating error + */ +static int getAbsolutePath(hdfsFS fs, const char *path, char **absPath) { - char *absPath = NULL; + char *tempPath = NULL; size_t absPathLen; + int strlength; if (path[0] == '/') { - // path is already absolute. - return strdup(path); + // Path is already absolute. + tempPath = strdup(path); + if (!tempPath) { + return ENOMEM; + } + *absPath = tempPath; + return 0; } - // prepend the workingDir to the path. - absPathLen = strlen(fs->workingDir) + strlen(path); - absPath = malloc(absPathLen + 1); - if (!absPath) { - return NULL; + // Prepend the workingDir to the path. + absPathLen = strlen(fs->workingDir) + strlen(path) + 1; + tempPath = malloc(absPathLen); + if (!tempPath) { + return ENOMEM; + } + strlength = snprintf(tempPath, absPathLen, "%s%s", fs->workingDir, path); + if (strlength < 0 || strlength >= absPathLen) { + free(tempPath); + return EIO; } - snprintf(absPath, absPathLen + 1, "%s%s", fs->workingDir, path); - return absPath; + *absPath = tempPath; + return 0; } int hdfsCreateDirectory(hdfsFS fs, const char* path) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareMKDIR(fs->nn, fs->port, absPath, fs->userName)) - && (resp = launchMKDIR(url)) - && (parseMKDIR(resp->body->content)))) { - ret = EIO; + ret = createUrlForMKDIR(fs->nn, fs->port, absPath, fs->userName, &url); + if (ret) { goto done; } - + ret = launchMKDIR(url, &resp); + if (ret) { + goto done; + } + ret = parseMKDIR(resp->body->content); done: freeResponse(resp); free(url); @@ -447,24 +537,27 @@ done: int hdfsChmod(hdfsFS fs, const char* path, short mode) { char *absPath = NULL, *url = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareCHMOD(fs->nn, fs->port, absPath, (int)mode, fs->userName)) - && (resp = launchCHMOD(url)) - && (parseCHMOD(resp->header->content, resp->body->content)))) { - ret = EIO; + ret = createUrlForCHMOD(fs->nn, fs->port, absPath, (int) mode, + fs->userName, &url); + if (ret) { goto done; } + ret = launchCHMOD(url, &resp); + if (ret) { + goto done; + } + ret = parseCHMOD(resp->header->content, resp->body->content); done: freeResponse(resp); free(absPath); @@ -480,26 +573,27 @@ int hdfsChown(hdfsFS fs, const char* pat { int ret = 0; char *absPath = NULL, *url = NULL; - Response resp = NULL; + struct Response *resp = NULL; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - - if(!((url = prepareCHOWN(fs->nn, fs->port, absPath, owner, group, fs->userName)) - && (resp = launchCHOWN(url)) - && (parseCHOWN(resp->header->content, resp->body->content)))) { - ret = EIO; + ret = createUrlForCHOWN(fs->nn, fs->port, absPath, + owner, group, fs->userName, &url); + if (ret) { goto done; } - + ret = launchCHOWN(url, &resp); + if (ret) { + goto done; + } + ret = parseCHOWN(resp->header->content, resp->body->content); done: freeResponse(resp); free(absPath); @@ -515,27 +609,30 @@ int hdfsRename(hdfsFS fs, const char* ol { char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL; int ret = 0; - Response resp = NULL; + struct Response *resp = NULL; if (fs == NULL || oldPath == NULL || newPath == NULL) { ret = EINVAL; goto done; } - oldAbsPath = getAbsolutePath(fs, oldPath); - if (!oldAbsPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, oldPath, &oldAbsPath); + if (ret) { goto done; } - newAbsPath = getAbsolutePath(fs, newPath); - if (!newAbsPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, newPath, &newAbsPath); + if (ret) { goto done; } - if(!((url = prepareRENAME(fs->nn, fs->port, oldAbsPath, newAbsPath, fs->userName)) - && (resp = launchRENAME(url)) - && (parseRENAME(resp->body->content)))) { - ret = -1; + ret = createUrlForRENAME(fs->nn, fs->port, oldAbsPath, + newAbsPath, fs->userName, &url); + if (ret) { + goto done; } + ret = launchRENAME(url, &resp); + if (ret) { + goto done; + } + ret = parseRENAME(resp->body->content); done: freeResponse(resp); free(oldAbsPath); @@ -548,12 +645,22 @@ done: return 0; } -hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) +/** + * Get the file status for a given path. + * + * @param fs hdfsFS handle containing + * NameNode hostName/port information + * @param path Path for file + * @param printError Whether or not to print out error information + * (mainly remote FileNotFoundException) + * @return File information for the given path + */ +static hdfsFileInfo *hdfsGetPathInfoImpl(hdfsFS fs, const char* path, + int printError) { char *absPath = NULL; char *url=NULL; - Response resp = NULL; - int numEntries = 0; + struct Response *resp = NULL; int ret = 0; hdfsFileInfo *fileInfo = NULL; @@ -561,9 +668,8 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo)); @@ -573,18 +679,21 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, } fileInfo->mKind = kObjectKindFile; - if(!((url = prepareGFS(fs->nn, fs->port, absPath, fs->userName)) - && (resp = launchGFS(url)) - && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries)))) { - ret = EIO; + ret = createUrlForGetFileStatus(fs->nn, fs->port, absPath, + fs->userName, &url); + if (ret) { goto done; } + ret = launchGFS(url, &resp); + if (ret) { + goto done; + } + ret = parseGFS(resp->body->content, fileInfo, printError); done: freeResponse(resp); free(absPath); free(url); - if (ret == 0) { return fileInfo; } else { @@ -594,10 +703,15 @@ done: } } +hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) +{ + return hdfsGetPathInfoImpl(fs, path, 1); +} + hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; hdfsFileInfo *fileInfo = NULL; @@ -605,9 +719,8 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS f ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } fileInfo = calloc(1, sizeof(*fileInfo)); @@ -615,21 +728,24 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS f ret = ENOMEM; goto done; } - if(!((url = prepareLS(fs->nn, fs->port, absPath, fs->userName)) - && (resp = launchLS(url)) - && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries)))) { - ret = EIO; + + ret = createUrlForLS(fs->nn, fs->port, absPath, fs->userName, &url); + if (ret) { + goto done; + } + ret = launchLS(url, &resp); + if (ret) { goto done; } + ret = parseLS(resp->body->content, &fileInfo, numEntries); + done: freeResponse(resp); free(absPath); free(url); - if (ret == 0) { return fileInfo; } else { - hdfsFreeFileInfo(fileInfo, 1); errno = ret; return NULL; } @@ -638,24 +754,28 @@ done: int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareSETREPLICATION(fs->nn, fs->port, absPath, replication, fs->userName)) - && (resp = launchSETREPLICATION(url)) - && (parseSETREPLICATION(resp->body->content)))) { - ret = EIO; + + ret = createUrlForSETREPLICATION(fs->nn, fs->port, absPath, + replication, fs->userName, &url); + if (ret) { goto done; } + ret = launchSETREPLICATION(url, &resp); + if (ret) { + goto done; + } + ret = parseSETREPLICATION(resp->body->content); done: freeResponse(resp); free(absPath); @@ -670,8 +790,7 @@ done: void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { int i; - - for (i=0; i < numEntries; ++i) { + for (i = 0; i < numEntries; ++i) { free(hdfsFileInfo[i].mName); free(hdfsFileInfo[i].mOwner); free(hdfsFileInfo[i].mGroup); @@ -682,25 +801,28 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfs int hdfsDelete(hdfsFS fs, const char* path, int recursive) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { goto done; } - if(!((url = prepareDELETE(fs->nn, fs->port, absPath, recursive, fs->userName)) - && (resp = launchDELETE(url)) - && (parseDELETE(resp->body->content)))) { - ret = EIO; + + ret = createUrlForDELETE(fs->nn, fs->port, absPath, + recursive, fs->userName, &url); + if (ret) { goto done; } - + ret = launchDELETE(url, &resp); + if (ret) { + goto done; + } + ret = parseDELETE(resp->body->content); done: freeResponse(resp); free(absPath); @@ -715,26 +837,28 @@ done: int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { char *url = NULL, *absPath = NULL; - Response resp = NULL; + struct Response *resp = NULL; int ret = 0; if (fs == NULL || path == NULL) { ret = EINVAL; goto done; } - absPath = getAbsolutePath(fs, path); - if (!absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &absPath); + if (ret) { + goto done; + } + + ret = createUrlForUTIMES(fs->nn, fs->port, absPath, mtime, atime, + fs->userName, &url); + if (ret) { goto done; } - if(!((url = prepareUTIMES(fs->nn, fs->port, absPath, mtime, atime, - fs->userName)) - && (resp = launchUTIMES(url)) - && (parseUTIMES(resp->header->content, resp->body->content)))) { - ret = EIO; + ret = launchUTIMES(url, &resp); + if (ret) { goto done; } - + ret = parseUTIMES(resp->header->content, resp->body->content); done: freeResponse(resp); free(absPath); @@ -748,7 +872,7 @@ done: int hdfsExists(hdfsFS fs, const char *path) { - hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path); + hdfsFileInfo *fileInfo = hdfsGetPathInfoImpl(fs, path, 0); if (!fileInfo) { // (errno will have been set by hdfsGetPathInfo) return -1; @@ -757,14 +881,23 @@ int hdfsExists(hdfsFS fs, const char *pa return 0; } +/** + * The information hold by the thread which writes data to hdfs through http + */ typedef struct { - char *url; - webhdfsBuffer *uploadBuffer; - int flags; - Response resp; + char *url; /* the url of the target datanode for writing*/ + struct webhdfsBuffer *uploadBuffer; /* buffer storing data to write */ + int flags; /* flag indicating writing mode: create or append */ + struct Response *resp; /* response from the target datanode */ } threadData; -static void freeThreadData(threadData *data) { +/** + * Free the threadData struct instance, + * including the response and url contained in it + * @param data The threadData instance to free + */ +static void freeThreadData(threadData *data) +{ if (data) { if (data->url) { free(data->url); @@ -772,18 +905,29 @@ static void freeThreadData(threadData *d if (data->resp) { freeResponse(data->resp); } - //the uploadBuffer would be freed by freeWebFileHandle() + // The uploadBuffer would be freed by freeWebFileHandle() free(data); data = NULL; } } -static void *writeThreadOperation(void *v) { - threadData *data = (threadData *) v; +/** + * The action of the thread that writes data to + * the target datanode for hdfsWrite. + * The writing can be either create or append, which is specified by flag + */ +static void *writeThreadOperation(void *v) +{ + int ret = 0; + threadData *data = v; if (data->flags & O_APPEND) { - data->resp = launchDnAPPEND(data->url, data->uploadBuffer); + ret = launchDnAPPEND(data->url, data->uploadBuffer, &(data->resp)); } else { - data->resp = launchDnWRITE(data->url, data->uploadBuffer); + ret = launchDnWRITE(data->url, data->uploadBuffer, &(data->resp)); + } + if (ret) { + fprintf(stderr, "Failed to write to datanode %s, <%d>: %s.\n", + data->url, ret, hdfs_strerror(ret)); } return data; } @@ -816,58 +960,58 @@ static void freeFileInternal(hdfsFile fi static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file) { struct webhdfsFileHandle *webhandle = file->file; - Response resp = NULL; - int parseRet, append, ret = 0; - char *prepareUrl = NULL, *dnUrl = NULL; + struct Response *resp = NULL; + int append, ret = 0; + char *nnUrl = NULL, *dnUrl = NULL; threadData *data = NULL; - webhandle->uploadBuffer = initWebHdfsBuffer(); - if (!webhandle->uploadBuffer) { - ret = ENOMEM; + ret = initWebHdfsBuffer(&webhandle->uploadBuffer); + if (ret) { goto done; } append = file->flags & O_APPEND; if (!append) { // If we're not appending, send a create request to the NN - prepareUrl = prepareNnWRITE(fs->nn, fs->port, webhandle->absPath, - fs->userName, webhandle->replication, webhandle->blockSize); + ret = createUrlForNnWRITE(fs->nn, fs->port, webhandle->absPath, + fs->userName, webhandle->replication, + webhandle->blockSize, &nnUrl); } else { - prepareUrl = prepareNnAPPEND(fs->nn, fs->port, webhandle->absPath, - fs->userName); + ret = createUrlForNnAPPEND(fs->nn, fs->port, webhandle->absPath, + fs->userName, &nnUrl); } - if (!prepareUrl) { - fprintf(stderr, "fail to create the url connecting to namenode " - "for file creation/appending\n"); - ret = EIO; + if (ret) { + fprintf(stderr, "Failed to create the url connecting to namenode " + "for file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } if (!append) { - resp = launchNnWRITE(prepareUrl); + ret = launchNnWRITE(nnUrl, &resp); } else { - resp = launchNnAPPEND(prepareUrl); + ret = launchNnAPPEND(nnUrl, &resp); } - if (!resp) { + if (ret) { fprintf(stderr, "fail to get the response from namenode for " - "file creation/appending\n"); - ret = EIO; + "file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } if (!append) { - parseRet = parseNnWRITE(resp->header->content, resp->body->content); + ret = parseNnWRITE(resp->header->content, resp->body->content); } else { - parseRet = parseNnAPPEND(resp->header->content, resp->body->content); + ret = parseNnAPPEND(resp->header->content, resp->body->content); } - if (!parseRet) { + if (ret) { fprintf(stderr, "fail to parse the response from namenode for " - "file creation/appending\n"); - ret = EIO; + "file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } - dnUrl = parseDnLoc(resp->header->content); - if (!dnUrl) { + ret = parseDnLoc(resp->header->content, &dnUrl); + if (ret) { fprintf(stderr, "fail to get the datanode url from namenode " - "for file creation/appending\n"); - ret = EIO; + "for file creation/appending, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } //store the datanode url in the file handle @@ -892,18 +1036,23 @@ static int hdfsOpenOutputFileImpl(hdfsFS ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data); if (ret) { - fprintf(stderr, "Failed to create the writing thread.\n"); + fprintf(stderr, "ERROR: failed to create the writing thread " + "in hdfsOpenOutputFileImpl, <%d>: %s.\n", + ret, hdfs_strerror(ret)); goto done; } webhandle->uploadBuffer->openFlag = 1; done: freeResponse(resp); - free(prepareUrl); + free(nnUrl); free(dnUrl); if (ret) { - free(data->url); - free(data); + errno = ret; + if (data) { + free(data->url); + free(data); + } } return ret; } @@ -929,7 +1078,8 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const c goto done; } if ((flags & O_CREAT) && (flags & O_EXCL)) { - fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); + fprintf(stderr, + "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); } file = calloc(1, sizeof(struct hdfsFile_internal)); if (!file) { @@ -947,12 +1097,13 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const c webhandle->bufferSize = bufferSize; webhandle->replication = replication; webhandle->blockSize = blockSize; - webhandle->absPath = getAbsolutePath(fs, path); - if (!webhandle->absPath) { - ret = ENOMEM; + ret = getAbsolutePath(fs, path, &webhandle->absPath); + if (ret) { goto done; } file->file = webhandle; + // If open for write/append, + // open and keep the connection with the target datanode for writing if (file->type == OUTPUT) { ret = hdfsOpenOutputFileImpl(fs, file); if (ret) { @@ -988,7 +1139,9 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length); return length; } else { - fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath); + fprintf(stderr, + "Error: have not opened the file %s for writing yet.\n", + wfile->absPath); errno = EBADF; return -1; } @@ -996,42 +1149,47 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file int hdfsCloseFile(hdfsFS fs, hdfsFile file) { + void *respv = NULL; + threadData *tdata = NULL; int ret = 0; - fprintf(stderr, "to close file...\n"); + struct webhdfsFileHandle *wfile = NULL; + if (file->type == OUTPUT) { - void *respv; - threadData *tdata; - struct webhdfsFileHandle *wfile = file->file; + wfile = file->file; pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex)); wfile->uploadBuffer->closeFlag = 1; pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close); pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex)); - //waiting for the writing thread to terminate + // Waiting for the writing thread to terminate ret = pthread_join(wfile->connThread, &respv); if (ret) { - fprintf(stderr, "Error (code %d) when pthread_join.\n", ret); + fprintf(stderr, "Error when pthread_join in hdfsClose, <%d>: %s.\n", + ret, hdfs_strerror(ret)); } - //parse the response - tdata = (threadData *) respv; - if (!tdata) { - fprintf(stderr, "Response from the writing thread is NULL.\n"); - ret = -1; + // Parse the response + tdata = respv; + if (!tdata || !(tdata->resp)) { + fprintf(stderr, + "ERROR: response from the writing thread is NULL.\n"); + ret = EIO; } if (file->flags & O_APPEND) { - parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content); + ret = parseDnAPPEND(tdata->resp->header->content, + tdata->resp->body->content); } else { - parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content); + ret = parseDnWRITE(tdata->resp->header->content, + tdata->resp->body->content); } - //free the threaddata + // Free the threaddata freeThreadData(tdata); } freeFileInternal(file); - fprintf(stderr, "Closed the webfilehandle...\n"); if (ret) { - errno = EIO; + errno = ret; + return -1; } - return ret; + return 0; } int hdfsFileIsOpenForRead(hdfsFile file) @@ -1049,8 +1207,7 @@ static int hdfsReadImpl(hdfsFS fs, hdfsF { int ret = 0; char *url = NULL; - Response resp = NULL; - int openResult = -1; + struct Response *resp = NULL; if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) { @@ -1068,30 +1225,41 @@ static int hdfsReadImpl(hdfsFS fs, hdfsF ret = ENOMEM; goto done; } - resp->header = initResponseBuffer(); - resp->body = initResponseBuffer(); + ret = initResponseBuffer(&(resp->header)); + if (ret) { + goto done; + } + ret = initResponseBuffer(&(resp->body)); + if (ret) { + goto done; + } + memset(buffer, 0, length); resp->body->content = buffer; resp->body->remaining = length; - if (!((url = prepareOPEN(fs->nn, fs->port, file->file->absPath, - fs->userName, off, length)) - && (resp = launchOPEN(url, resp)) - && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) { - if (openResult == 0) { - // Special case: if parseOPEN returns 0, we asked for a byte range - // with outside what the file contains. In this case, hdfsRead and - // hdfsPread return 0, meaning end-of-file. - *numRead = 0; - goto done; - } - ret = EIO; + ret = createUrlForOPEN(fs->nn, fs->port, file->file->absPath, + fs->userName, off, length, &url); + if (ret) { goto done; } - *numRead = resp->body->offset; - + ret = launchOPEN(url, resp); + if (ret) { + goto done; + } + ret = parseOPEN(resp->header->content, resp->body->content); + if (ret == -1) { + // Special case: if parseOPEN returns -1, we asked for a byte range + // with outside what the file contains. In this case, hdfsRead and + // hdfsPread return 0, meaning end-of-file. + *numRead = 0; + } else if (ret == 0) { + *numRead = (tSize) resp->body->offset; + } done: - freeResponseBuffer(resp->header); - free(resp->body); + if (resp) { + freeResponseBuffer(resp->header); + free(resp->body); + } free(resp); free(url); return ret; @@ -1099,11 +1267,12 @@ done: tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { - int ret; + int ret = 0; tSize numRead = 0; - ret = hdfsReadImpl(fs, file, buffer, file->offset, length, &numRead); - if (ret) { + ret = hdfsReadImpl(fs, file, buffer, (tSize) file->offset, + length, &numRead); + if (ret > 0) { // ret == -1 means end of file errno = ret; return -1; } @@ -1119,18 +1288,6 @@ int hdfsAvailable(hdfsFS fs, hdfsFile fi return 0; } -int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) -{ - errno = ENOTSUP; - return -1; -} - -int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) -{ - errno = ENOTSUP; - return -1; -} - int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { struct webhdfsFileHandle *wf; @@ -1172,7 +1329,8 @@ done: return 0; } -tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) +tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length) { int ret; tSize numRead = 0; @@ -1181,8 +1339,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file errno = EINVAL; return -1; } - ret = hdfsReadImpl(fs, file, buffer, position, length, &numRead); - if (ret) { + ret = hdfsReadImpl(fs, file, buffer, (tSize) position, length, &numRead); + if (ret > 0) { errno = ret; return -1; } @@ -1200,21 +1358,44 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile fil char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { + int strlength; if (fs == NULL || buffer == NULL || bufferSize <= 0) { errno = EINVAL; return NULL; } - if (snprintf(buffer, bufferSize, "%s", fs->workingDir) >= bufferSize) { + strlength = snprintf(buffer, bufferSize, "%s", fs->workingDir); + if (strlength >= bufferSize) { errno = ENAMETOOLONG; return NULL; + } else if (strlength < 0) { + errno = EIO; + return NULL; } return buffer; } +/** Replace "//" with "/" in path */ +static void normalizePath(char *path) +{ + int i = 0, j = 0, sawslash = 0; + + for (i = j = sawslash = 0; path[i] != '\0'; i++) { + if (path[i] != '/') { + sawslash = 0; + path[j++] = path[i]; + } else if (path[i] == '/' && !sawslash) { + sawslash = 1; + path[j++] = '/'; + } + } + path[j] = '\0'; +} + int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { - char *newWorkingDir; - size_t strlenPath, newWorkingDirLen; + char *newWorkingDir = NULL; + size_t strlenPath = 0, newWorkingDirLen = 0; + int strlength; if (fs == NULL || path == NULL) { errno = EINVAL; @@ -1225,25 +1406,28 @@ int hdfsSetWorkingDirectory(hdfsFS fs, c errno = EINVAL; return -1; } - if (path[0] != '/') { - // TODO: support non-absolute paths. They should be interpreted - // relative to the current path. - errno = ENOTSUP; - return -1; - } - if (strstr(path, "//")) { - // TODO: support non-normalized paths (by normalizing them.) - errno = ENOTSUP; - return -1; - } - newWorkingDirLen = strlenPath + 2; + // the max string length of the new working dir is + // (length of old working dir) + (length of given path) + strlen("/") + 1 + newWorkingDirLen = strlen(fs->workingDir) + strlenPath + 2; newWorkingDir = malloc(newWorkingDirLen); if (!newWorkingDir) { errno = ENOMEM; return -1; } - snprintf(newWorkingDir, newWorkingDirLen, "%s%s", - path, (path[strlenPath - 1] == '/') ? "" : "/"); + strlength = snprintf(newWorkingDir, newWorkingDirLen, "%s%s%s", + (path[0] == '/') ? "" : fs->workingDir, + path, (path[strlenPath - 1] == '/') ? "" : "/"); + if (strlength < 0 || strlength >= newWorkingDirLen) { + free(newWorkingDir); + errno = EIO; + return -1; + } + + if (strstr(path, "//")) { + // normalize the path by replacing "//" with "/" + normalizePath(newWorkingDir); + } + free(fs->workingDir); fs->workingDir = newWorkingDir; return 0; @@ -1283,7 +1467,7 @@ int hdfsHFlush(hdfsFS fs, hdfsFile file) errno = EINVAL; return -1; } - // TODO: block until our write buffer is flushed + // TODO: block until our write buffer is flushed (HDFS-3952) return 0; } @@ -1293,7 +1477,7 @@ int hdfsFlush(hdfsFS fs, hdfsFile file) errno = EINVAL; return -1; } - // TODO: block until our write buffer is flushed + // TODO: block until our write buffer is flushed (HDFS-3952) return 0; } @@ -1316,3 +1500,15 @@ tOffset hdfsGetUsed(hdfsFS fs) return -1; } +int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) +{ + errno = ENOTSUP; + return -1; +} + +int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) +{ + errno = ENOTSUP; + return -1; +} + Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c?rev=1407217&r1=1407216&r2=1407217&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c Thu Nov 8 19:09:46 2012 @@ -17,6 +17,7 @@ */ #include "hdfs.h" +#include "native_mini_dfs.h" #include #include @@ -26,228 +27,254 @@ #include #include -void permission_disp(short permissions, char *rtr) { +static struct NativeMiniDfsCluster *cluster; + +void permission_disp(short permissions, char *rtr) +{ rtr[9] = '\0'; int i; - for(i=2;i>=0;i--) + short perm; + for(i = 2; i >= 0; i--) { - short permissionsId = permissions >> (i * 3) & (short)7; - char* perm; - switch(permissionsId) { - case 7: - perm = "rwx"; break; - case 6: - perm = "rw-"; break; - case 5: - perm = "r-x"; break; - case 4: - perm = "r--"; break; - case 3: - perm = "-wx"; break; - case 2: - perm = "-w-"; break; - case 1: - perm = "--x"; break; - case 0: - perm = "---"; break; - default: - perm = "???"; - } - strncpy(rtr, perm, 3); - rtr+=3; + perm = permissions >> (i * 3); + rtr[0] = perm & 4 ? 'r' : '-'; + rtr[1] = perm & 2 ? 'w' : '-'; + rtr[2] = perm & 1 ? 'x' : '-'; + rtr += 3; } } -int main(int argc, char **argv) { +int main(int argc, char **argv) +{ + char buffer[32]; + tSize num_written_bytes; + const char* slashTmp = "/tmp"; + int nnPort; + char *rwTemplate, *rwTemplate2, *newDirTemplate, + *appendTemplate, *userTemplate, *rwPath = NULL; + const char* fileContents = "Hello, World!"; + const char* nnHost = NULL; + if (argc != 2) { fprintf(stderr, "usage: test_libwebhdfs_ops \n"); - return -1; + exit(1); } - char buffer[32]; - tSize num_written_bytes; + struct NativeMiniDfsConf conf = { + .doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070, + }; + cluster = nmdCreate(&conf); + if (!cluster) { + fprintf(stderr, "Failed to create the NativeMiniDfsCluster.\n"); + exit(1); + } + if (nmdWaitClusterUp(cluster)) { + fprintf(stderr, "Error when waiting for cluster to be ready.\n"); + exit(1); + } + if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) { + fprintf(stderr, "Error when retrieving namenode host address.\n"); + exit(1); + } - hdfsFS fs = hdfsConnectAsUserNewInstance("default", 50070, argv[1]); + hdfsFS fs = hdfsConnectAsUserNewInstance(nnHost, nnPort, argv[1]); if(!fs) { fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); exit(-1); } - const char* writePath = "/tmp/testfile.txt"; - const char* fileContents = "Hello, World!"; - { - //Write tests - - hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); + // Write tests + rwTemplate = strdup("/tmp/helloWorldXXXXXX"); + if (!rwTemplate) { + fprintf(stderr, "Failed to create rwTemplate!\n"); + exit(1); + } + rwPath = mktemp(rwTemplate); + // hdfsOpenFile + hdfsFile writeFile = hdfsOpenFile(fs, rwPath, + O_WRONLY|O_CREAT, 0, 0, 0); + if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", writePath); - exit(-1); + fprintf(stderr, "Failed to open %s for writing!\n", rwPath); + exit(1); } - fprintf(stderr, "Opened %s for writing successfully...\n", writePath); - num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents) + 1); + fprintf(stderr, "Opened %s for writing successfully...\n", rwPath); + // hdfsWrite + num_written_bytes = hdfsWrite(fs, writeFile, (void*)fileContents, + (int) strlen(fileContents) + 1); if (num_written_bytes != strlen(fileContents) + 1) { - fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n", - (int)(strlen(fileContents) + 1), (int)num_written_bytes); - exit(-1); + fprintf(stderr, "Failed to write correct number of bytes - " + "expected %d, got %d\n", + (int)(strlen(fileContents) + 1), (int) num_written_bytes); + exit(1); } fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); + // hdfsTell tOffset currentPos = -1; if ((currentPos = hdfsTell(fs, writeFile)) == -1) { fprintf(stderr, - "Failed to get current file position correctly! Got %lld!\n", - currentPos); - exit(-1); - } - fprintf(stderr, "Current position: %lld\n", currentPos); - - if (hdfsFlush(fs, writeFile)) { - fprintf(stderr, "Failed to 'flush' %s\n", writePath); - exit(-1); + "Failed to get current file position correctly. Got %" + PRId64 "!\n", currentPos); + exit(1); } - fprintf(stderr, "Flushed %s successfully!\n", writePath); - - if (hdfsHFlush(fs, writeFile)) { - fprintf(stderr, "Failed to 'hflush' %s\n", writePath); - exit(-1); - } - fprintf(stderr, "HFlushed %s successfully!\n", writePath); + fprintf(stderr, "Current position: %" PRId64 "\n", currentPos); hdfsCloseFile(fs, writeFile); + // Done test write } + sleep(1); + { //Read tests - sleep(1); - const char* readPath = "/tmp/testfile.txt"; - int exists = hdfsExists(fs, readPath); + int available = 0, exists = 0; + // hdfsExists + exists = hdfsExists(fs, rwPath); if (exists) { - fprintf(stderr, "Failed to validate existence of %s\n", readPath); - exists = hdfsExists(fs, readPath); + fprintf(stderr, "Failed to validate existence of %s\n", rwPath); + exists = hdfsExists(fs, rwPath); if (exists) { - fprintf(stderr, "Still failed to validate existence of %s\n", readPath); - exit(-1); + fprintf(stderr, + "Still failed to validate existence of %s\n", rwPath); + exit(1); } } - hdfsFile readFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0); + hdfsFile readFile = hdfsOpenFile(fs, rwPath, O_RDONLY, 0, 0, 0); if (!readFile) { - fprintf(stderr, "Failed to open %s for reading!\n", readPath); - exit(-1); + fprintf(stderr, "Failed to open %s for reading!\n", rwPath); + exit(1); } - if (!hdfsFileIsOpenForRead(readFile)) { fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file " "with O_RDONLY, and it did not show up as 'open for " "read'\n"); - exit(-1); + exit(1); } - fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, readFile)); + available = hdfsAvailable(fs, readFile); + fprintf(stderr, "hdfsAvailable: %d\n", available); + // hdfsSeek, hdfsTell tOffset seekPos = 1; if(hdfsSeek(fs, readFile, seekPos)) { - fprintf(stderr, "Failed to seek %s for reading!\n", readPath); - exit(-1); + fprintf(stderr, "Failed to seek %s for reading!\n", rwPath); + exit(1); } tOffset currentPos = -1; if((currentPos = hdfsTell(fs, readFile)) != seekPos) { fprintf(stderr, - "Failed to get current file position correctly! Got %lld!\n", - currentPos); - exit(-1); - } - fprintf(stderr, "Current position: %lld\n", currentPos); - - if (!hdfsFileUsesDirectRead(readFile)) { - fprintf(stderr, "Direct read support incorrectly not detected " - "for HDFS filesystem\n"); - exit(-1); + "Failed to get current file position correctly! Got %" + PRId64 "!\n", currentPos); + + exit(1); } + fprintf(stderr, "Current position: %" PRId64 "\n", currentPos); - fprintf(stderr, "Direct read support detected for HDFS\n"); - - // Test the direct read path if(hdfsSeek(fs, readFile, 0)) { - fprintf(stderr, "Failed to seek %s for reading!\n", readPath); - exit(-1); + fprintf(stderr, "Failed to seek %s for reading!\n", rwPath); + exit(1); } + + // hdfsRead memset(buffer, 0, sizeof(buffer)); - tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, - sizeof(buffer)); + tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer)); if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { - fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n", + fprintf(stderr, "Failed to read (direct). " + "Expected %s but got %s (%d bytes)\n", fileContents, buffer, num_read_bytes); - exit(-1); + exit(1); } fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); + if (hdfsSeek(fs, readFile, 0L)) { fprintf(stderr, "Failed to seek to file start!\n"); - exit(-1); + exit(1); } - // Disable the direct read path so that we really go through the slow - // read path - hdfsFileDisableDirectRead(readFile); - - num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, - sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); - + // hdfsPread memset(buffer, 0, strlen(fileContents + 1)); - - num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, - sizeof(buffer)); + num_read_bytes = hdfsPread(fs, readFile, 0, buffer, sizeof(buffer)); fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); hdfsCloseFile(fs, readFile); + // Done test read } int totalResult = 0; int result = 0; { //Generic file-system operations - - const char* srcPath = "/tmp/testfile.txt"; - const char* dstPath = "/tmp/testfile2.txt"; - const char* copyPath = "/tmp/testfile_copy.txt"; - const char* movePath = "/tmp/testfile_move.txt"; - - fprintf(stderr, "hdfsCopy: %s\n", ((result = hdfsCopy(fs, srcPath, fs, copyPath)) ? "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsMove: %s\n", ((result = hdfsMove(fs, copyPath, fs, movePath)) ? "Failed!" : "Success!")); - totalResult += result; - - fprintf(stderr, "hdfsGetDefaultBlockSize: %lld\n", hdfsGetDefaultBlockSize(fs)); - - fprintf(stderr, "hdfsRename: %s\n", ((result = hdfsRename(fs, srcPath, dstPath)) ? "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsRename back: %s\n", ((result = hdfsRename(fs, dstPath, srcPath)) ? "Failed!" : "Success!")); - totalResult += result; - - const char* slashTmp = "/tmp"; - const char* newDirectory = "/tmp/newdir"; - fprintf(stderr, "hdfsCreateDirectory: %s\n", ((result = hdfsCreateDirectory(fs, newDirectory)) ? "Failed!" : "Success!")); - totalResult += result; - - fprintf(stderr, "hdfsSetReplication: %s\n", ((result = hdfsSetReplication(fs, srcPath, 1)) ? "Failed!" : "Success!")); - totalResult += result; - + char *srcPath = rwPath; char buffer[256]; const char *resp; - fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? buffer : "Failed!")); - totalResult += (resp ? 0 : 1); - fprintf(stderr, "hdfsSetWorkingDirectory: %s\n", ((result = hdfsSetWorkingDirectory(fs, slashTmp)) ? "Failed!" : "Success!")); + rwTemplate2 = strdup("/tmp/helloWorld2XXXXXX"); + if (!rwTemplate2) { + fprintf(stderr, "Failed to create rwTemplate2!\n"); + exit(1); + } + char *dstPath = mktemp(rwTemplate2); + newDirTemplate = strdup("/tmp/newdirXXXXXX"); + if (!newDirTemplate) { + fprintf(stderr, "Failed to create newDirTemplate!\n"); + exit(1); + } + char *newDirectory = mktemp(newDirTemplate); + + // hdfsRename + fprintf(stderr, "hdfsRename: %s\n", + ((result = hdfsRename(fs, rwPath, dstPath)) ? + "Failed!" : "Success!")); + totalResult += result; + fprintf(stderr, "hdfsRename back: %s\n", + ((result = hdfsRename(fs, dstPath, srcPath)) ? + "Failed!" : "Success!")); + totalResult += result; + + // hdfsCreateDirectory + fprintf(stderr, "hdfsCreateDirectory: %s\n", + ((result = hdfsCreateDirectory(fs, newDirectory)) ? + "Failed!" : "Success!")); + totalResult += result; + + // hdfsSetReplication + fprintf(stderr, "hdfsSetReplication: %s\n", + ((result = hdfsSetReplication(fs, srcPath, 1)) ? + "Failed!" : "Success!")); totalResult += result; - fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? buffer : "Failed!")); + + // hdfsGetWorkingDirectory, hdfsSetWorkingDirectory + fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", + ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? + buffer : "Failed!")); totalResult += (resp ? 0 : 1); - + + const char* path[] = {"/foo", "/foo/bar", "foobar", "//foo/bar//foobar", + "foo//bar", "foo/bar///", "/", "////"}; + for (int i = 0; i < 8; i++) { + fprintf(stderr, "hdfsSetWorkingDirectory: %s, %s\n", + ((result = hdfsSetWorkingDirectory(fs, path[i])) ? + "Failed!" : "Success!"), + hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))); + totalResult += result; + } + + fprintf(stderr, "hdfsSetWorkingDirectory: %s\n", + ((result = hdfsSetWorkingDirectory(fs, slashTmp)) ? + "Failed!" : "Success!")); + totalResult += result; + fprintf(stderr, "hdfsGetWorkingDirectory: %s\n", + ((resp = hdfsGetWorkingDirectory(fs, buffer, sizeof(buffer))) ? + buffer : "Failed!")); + totalResult += (resp ? 0 : 1); + + // hdfsGetPathInfo hdfsFileInfo *fileInfo = NULL; if((fileInfo = hdfsGetPathInfo(fs, slashTmp)) != NULL) { fprintf(stderr, "hdfsGetPathInfo - SUCCESS!\n"); @@ -261,13 +288,15 @@ int main(int argc, char **argv) { fprintf(stderr, "Group: %s, ", fileInfo->mGroup); char permissions[10]; permission_disp(fileInfo->mPermissions, permissions); - fprintf(stderr, "Permissions: %d (%s)\n", fileInfo->mPermissions, permissions); + fprintf(stderr, "Permissions: %d (%s)\n", + fileInfo->mPermissions, permissions); hdfsFreeFileInfo(fileInfo, 1); } else { totalResult++; - fprintf(stderr, "waah! hdfsGetPathInfo for %s - FAILED!\n", slashTmp); + fprintf(stderr, "hdfsGetPathInfo for %s - FAILED!\n", slashTmp); } + // hdfsListDirectory hdfsFileInfo *fileList = 0; int numEntries = 0; if((fileList = hdfsListDirectory(fs, slashTmp, &numEntries)) != NULL) { @@ -283,7 +312,8 @@ int main(int argc, char **argv) { fprintf(stderr, "Group: %s, ", fileList[i].mGroup); char permissions[10]; permission_disp(fileList[i].mPermissions, permissions); - fprintf(stderr, "Permissions: %d (%s)\n", fileList[i].mPermissions, permissions); + fprintf(stderr, "Permissions: %d (%s)\n", + fileList[i].mPermissions, permissions); } hdfsFreeFileInfo(fileList, numEntries); } else { @@ -295,203 +325,220 @@ int main(int argc, char **argv) { } } - // char*** hosts = hdfsGetHosts(fs, srcPath, 0, 1); - // if(hosts) { - // fprintf(stderr, "hdfsGetHosts - SUCCESS! ... \n"); - // int i=0; - // while(hosts[i]) { - // int j = 0; - // while(hosts[i][j]) { - // fprintf(stderr, - // "\thosts[%d][%d] - %s\n", i, j, hosts[i][j]); - // ++j; - // } - // ++i; - // } - // } else { - // totalResult++; - // fprintf(stderr, "waah! hdfsGetHosts - FAILED!\n"); - // } - char *newOwner = "root"; - // setting tmp dir to 777 so later when connectAsUser nobody, we can write to it + // Setting tmp dir to 777 so later when connectAsUser nobody, + // we can write to it short newPerm = 0666; - // chown write - fprintf(stderr, "hdfsChown: %s\n", ((result = hdfsChown(fs, writePath, NULL, "users")) ? "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsChown: %s\n", ((result = hdfsChown(fs, writePath, newOwner, NULL)) ? "Failed!" : "Success!")); + // hdfsChown + fprintf(stderr, "hdfsChown: %s\n", + ((result = hdfsChown(fs, rwPath, NULL, "users")) ? + "Failed!" : "Success!")); + totalResult += result; + fprintf(stderr, "hdfsChown: %s\n", + ((result = hdfsChown(fs, rwPath, newOwner, NULL)) ? + "Failed!" : "Success!")); + totalResult += result; + // hdfsChmod + fprintf(stderr, "hdfsChmod: %s\n", + ((result = hdfsChmod(fs, rwPath, newPerm)) ? + "Failed!" : "Success!")); totalResult += result; - // chmod write - fprintf(stderr, "hdfsChmod: %s\n", ((result = hdfsChmod(fs, writePath, newPerm)) ? "Failed!" : "Success!")); - totalResult += result; - - sleep(2); tTime newMtime = time(NULL); tTime newAtime = time(NULL); // utime write - fprintf(stderr, "hdfsUtime: %s\n", ((result = hdfsUtime(fs, writePath, newMtime, newAtime)) ? "Failed!" : "Success!")); - + fprintf(stderr, "hdfsUtime: %s\n", + ((result = hdfsUtime(fs, rwPath, newMtime, newAtime)) ? + "Failed!" : "Success!")); totalResult += result; // chown/chmod/utime read - hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath); + hdfsFileInfo *finfo = hdfsGetPathInfo(fs, rwPath); - fprintf(stderr, "hdfsChown read: %s\n", ((result = (strcmp(finfo->mOwner, newOwner) != 0)) ? "Failed!" : "Success!")); + fprintf(stderr, "hdfsChown read: %s\n", + ((result = (strcmp(finfo->mOwner, newOwner) != 0)) ? + "Failed!" : "Success!")); totalResult += result; - fprintf(stderr, "hdfsChmod read: %s\n", ((result = (finfo->mPermissions != newPerm)) ? "Failed!" : "Success!")); + fprintf(stderr, "hdfsChmod read: %s\n", + ((result = (finfo->mPermissions != newPerm)) ? + "Failed!" : "Success!")); totalResult += result; // will later use /tmp/ as a different user so enable it - fprintf(stderr, "hdfsChmod: %s\n", ((result = hdfsChmod(fs, "/tmp/", 0777)) ? "Failed!" : "Success!")); + fprintf(stderr, "hdfsChmod: %s\n", + ((result = hdfsChmod(fs, slashTmp, 0777)) ? + "Failed!" : "Success!")); totalResult += result; fprintf(stderr,"newMTime=%ld\n",newMtime); fprintf(stderr,"curMTime=%ld\n",finfo->mLastMod); - fprintf(stderr, "hdfsUtime read (mtime): %s\n", ((result = (finfo->mLastMod != newMtime / 1000)) ? "Failed!" : "Success!")); + fprintf(stderr, "hdfsUtime read (mtime): %s\n", + ((result = (finfo->mLastMod != newMtime / 1000)) ? + "Failed!" : "Success!")); totalResult += result; - hdfsFreeFileInfo(finfo, 1); - // Clean up - fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, newDirectory, 1)) ? "Failed!" : "Success!")); - totalResult += result; - fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, srcPath, 1)) ? "Failed!" : "Success!")); - totalResult += result; -// fprintf(stderr, "hdfsDelete: %s\n", ((result = hdfsDelete(fs, movePath, 1)) ? "Failed!" : "Success!")); -// totalResult += result; - fprintf(stderr, "hdfsExists: %s\n", ((result = hdfsExists(fs, newDirectory)) ? "Success!" : "Failed!")); + hdfsFreeFileInfo(finfo, 1); + fprintf(stderr, "hdfsDelete: %s\n", + ((result = hdfsDelete(fs, newDirectory, 1)) ? + "Failed!" : "Success!")); + totalResult += result; + fprintf(stderr, "hdfsDelete: %s\n", + ((result = hdfsDelete(fs, srcPath, 1)) ? + "Failed!" : "Success!")); + totalResult += result; + fprintf(stderr, "hdfsExists: %s\n", + ((result = hdfsExists(fs, newDirectory)) ? + "Success!" : "Failed!")); totalResult += (result ? 0 : 1); + // Done test generic operations } { - // TEST APPENDS - const char *writePath = "/tmp/appends"; + // Test Appends + appendTemplate = strdup("/tmp/appendsXXXXXX"); + if (!appendTemplate) { + fprintf(stderr, "Failed to create appendTemplate!\n"); + exit(1); + } + char *appendPath = mktemp(appendTemplate); + const char* helloBuffer = "Hello,"; + hdfsFile writeFile = NULL; - // CREATE - hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY, 0, 0, 0); + // Create + writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY, 0, 0, 0); if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", writePath); - exit(-1); + fprintf(stderr, "Failed to open %s for writing!\n", appendPath); + exit(1); } - fprintf(stderr, "Opened %s for writing successfully...\n", writePath); + fprintf(stderr, "Opened %s for writing successfully...\n", appendPath); - const char* buffer = "Hello,"; - tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)); + num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer, + (int) strlen(helloBuffer)); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - - if (hdfsFlush(fs, writeFile)) { - fprintf(stderr, "Failed to 'flush' %s\n", writePath); - exit(-1); - } - fprintf(stderr, "Flushed %s successfully!\n", writePath); - hdfsCloseFile(fs, writeFile); - fprintf(stderr, "hdfsSetReplication: %s\n", ((result = hdfsSetReplication(fs, writePath, 1)) ? "Failed!" : "Success!")); + fprintf(stderr, "hdfsSetReplication: %s\n", + ((result = hdfsSetReplication(fs, appendPath, 1)) ? + "Failed!" : "Success!")); totalResult += result; - // RE-OPEN - writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_APPEND, 0, 0, 0); + // Re-Open for Append + writeFile = hdfsOpenFile(fs, appendPath, O_WRONLY | O_APPEND, 0, 0, 0); if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", writePath); - exit(-1); + fprintf(stderr, "Failed to open %s for writing!\n", appendPath); + exit(1); } - fprintf(stderr, "Opened %s for appending successfully...\n", writePath); + fprintf(stderr, "Opened %s for appending successfully...\n", + appendPath); - buffer = " World"; - num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer) + 1); + helloBuffer = " World"; + num_written_bytes = hdfsWrite(fs, writeFile, helloBuffer, + (int)strlen(helloBuffer) + 1); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - if (hdfsFlush(fs, writeFile)) { - fprintf(stderr, "Failed to 'flush' %s\n", writePath); - exit(-1); - } - fprintf(stderr, "Flushed %s successfully!\n", writePath); - hdfsCloseFile(fs, writeFile); - // CHECK size - hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath); - fprintf(stderr, "fileinfo->mSize: == total %s\n", ((result = (finfo->mSize == strlen("Hello, World") + 1)) ? "Success!" : "Failed!")); + // Check size + hdfsFileInfo *finfo = hdfsGetPathInfo(fs, appendPath); + fprintf(stderr, "fileinfo->mSize: == total %s\n", + ((result = (finfo->mSize == strlen("Hello, World") + 1)) ? + "Success!" : "Failed!")); totalResult += (result ? 0 : 1); - // READ and check data - hdfsFile readFile = hdfsOpenFile(fs, writePath, O_RDONLY, 0, 0, 0); + // Read and check data + hdfsFile readFile = hdfsOpenFile(fs, appendPath, O_RDONLY, 0, 0, 0); if (!readFile) { - fprintf(stderr, "Failed to open %s for reading!\n", writePath); - exit(-1); + fprintf(stderr, "Failed to open %s for reading!\n", appendPath); + exit(1); } - char rdbuffer[32]; - tSize num_read_bytes = hdfsRead(fs, readFile, (void*)rdbuffer, sizeof(rdbuffer)); + tSize num_read_bytes = hdfsRead(fs, readFile, buffer, sizeof(buffer)); fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, rdbuffer); - - fprintf(stderr, "read == Hello, World %s\n", (result = (strcmp(rdbuffer, "Hello, World") == 0)) ? "Success!" : "Failed!"); - + num_read_bytes, buffer); + fprintf(stderr, "read == Hello, World %s\n", + (result = (strcmp(buffer, "Hello, World") == 0)) ? + "Success!" : "Failed!"); hdfsCloseFile(fs, readFile); - // DONE test appends + // Cleanup + fprintf(stderr, "hdfsDelete: %s\n", + ((result = hdfsDelete(fs, appendPath, 1)) ? + "Failed!" : "Success!")); + totalResult += result; + // Done test appends } - totalResult += (hdfsDisconnect(fs) != 0); { // // Now test as connecting as a specific user - // This is only meant to test that we connected as that user, not to test + // This only meant to test that we connected as that user, not to test // the actual fs user capabilities. Thus just create a file and read // the owner is correct. - const char *tuser = "nobody"; - const char* writePath = "/tmp/usertestfile.txt"; + userTemplate = strdup("/tmp/usertestXXXXXX"); + if (!userTemplate) { + fprintf(stderr, "Failed to create userTemplate!\n"); + exit(1); + } + char* userWritePath = mktemp(userTemplate); + hdfsFile writeFile = NULL; fs = hdfsConnectAsUserNewInstance("default", 50070, tuser); if(!fs) { - fprintf(stderr, "Oops! Failed to connect to hdfs as user %s!\n",tuser); - exit(-1); + fprintf(stderr, + "Oops! Failed to connect to hdfs as user %s!\n",tuser); + exit(1); } - hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); + writeFile = hdfsOpenFile(fs, userWritePath, O_WRONLY|O_CREAT, 0, 0, 0); if(!writeFile) { - fprintf(stderr, "Failed to open %s for writing!\n", writePath); - exit(-1); + fprintf(stderr, "Failed to open %s for writing!\n", userWritePath); + exit(1); } - fprintf(stderr, "Opened %s for writing successfully...\n", writePath); + fprintf(stderr, "Opened %s for writing successfully...\n", + userWritePath); - char* buffer = "Hello, World!"; - tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); + num_written_bytes = hdfsWrite(fs, writeFile, fileContents, + (int)strlen(fileContents) + 1); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); - - if (hdfsFlush(fs, writeFile)) { - fprintf(stderr, "Failed to 'flush' %s\n", writePath); - exit(-1); - } - fprintf(stderr, "Flushed %s successfully!\n", writePath); - hdfsCloseFile(fs, writeFile); - hdfsFileInfo *finfo = hdfsGetPathInfo(fs, writePath); + hdfsFileInfo *finfo = hdfsGetPathInfo(fs, userWritePath); if (finfo) { - fprintf(stderr, "hdfs new file user is correct: %s\n", ((result = (strcmp(finfo->mOwner, tuser) != 0)) ? "Failed!" : "Success!")); + fprintf(stderr, "hdfs new file user is correct: %s\n", + ((result = (strcmp(finfo->mOwner, tuser) != 0)) ? + "Failed!" : "Success!")); } else { - fprintf(stderr, "hdfsFileInfo returned by hdfsGetPathInfo is NULL\n"); + fprintf(stderr, + "hdfsFileInfo returned by hdfsGetPathInfo is NULL\n"); result = -1; } totalResult += result; + + // Cleanup + fprintf(stderr, "hdfsDelete: %s\n", + ((result = hdfsDelete(fs, userWritePath, 1)) ? + "Failed!" : "Success!")); + totalResult += result; + // Done test specific user } - + totalResult += (hdfsDisconnect(fs) != 0); - fprintf(stderr, "totalResult == %d\n", totalResult); + // Shutdown the native minidfscluster + nmdShutdown(cluster); + nmdFree(cluster); + + fprintf(stderr, "totalResult == %d\n", totalResult); if (totalResult != 0) { return -1; } else { Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c?rev=1407217&r1=1407216&r2=1407217&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c Thu Nov 8 19:09:46 2012 @@ -22,43 +22,52 @@ #include int main(int argc, char **argv) { + + const char* rfile; + tSize fileTotalSize, bufferSize, curSize, totalReadSize; + hdfsFS fs; + hdfsFile readFile; + char *buffer = NULL; if (argc != 4) { - fprintf(stderr, "Usage: hdfs_read \n"); - exit(-1); + fprintf(stderr, "Usage: test_libwebhdfs_read" + " \n"); + exit(1); } - hdfsFS fs = hdfsConnect("0.0.0.0", 50070); + fs = hdfsConnect("localhost", 50070); if (!fs) { fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); - exit(-1); + exit(1); } - const char* rfile = argv[1]; - tSize fileTotalSize = strtoul(argv[2], NULL, 10); - tSize bufferSize = strtoul(argv[3], NULL, 10); + rfile = argv[1]; + fileTotalSize = strtoul(argv[2], NULL, 10); + bufferSize = strtoul(argv[3], NULL, 10); - hdfsFile readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0); + readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0); if (!readFile) { fprintf(stderr, "Failed to open %s for writing!\n", rfile); - exit(-2); + exit(1); } // data to be written to the file - char* buffer = malloc(sizeof(char) * bufferSize); + buffer = malloc(sizeof(char) * bufferSize); if(buffer == NULL) { - return -2; + fprintf(stderr, "Failed to allocate buffer.\n"); + exit(1); } // read from the file - tSize curSize = bufferSize; - tSize totalReadSize = 0; - for (; (curSize = hdfsRead(fs, readFile, (void*)buffer, bufferSize)) == bufferSize ;) { + curSize = bufferSize; + totalReadSize = 0; + for (; (curSize = hdfsRead(fs, readFile, buffer, bufferSize)) == bufferSize; ) { totalReadSize += curSize; } totalReadSize += curSize; - fprintf(stderr, "size of the file: %d; reading size: %d\n", fileTotalSize, totalReadSize); + fprintf(stderr, "size of the file: %d; reading size: %d\n", + fileTotalSize, totalReadSize); free(buffer); hdfsCloseFile(fs, readFile); @@ -67,7 +76,3 @@ int main(int argc, char **argv) { return 0; } -/** - * vim: ts=4: sw=4: et: - */ - Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c?rev=1407217&r1=1407216&r2=1407217&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c Thu Nov 8 19:09:46 2012 @@ -18,6 +18,7 @@ #include "expect.h" #include "hdfs.h" +#include "native_mini_dfs.h" #include #include @@ -28,11 +29,9 @@ #define TLH_MAX_THREADS 100 -static sem_t *tlhSem; +static struct NativeMiniDfsCluster* cluster; -static const char *nn; static const char *user; -static int port; struct tlhThreadInfo { /** Thread index */ @@ -43,19 +42,24 @@ struct tlhThreadInfo { pthread_t thread; }; -static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs) +static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cluster, + hdfsFS *fs) { + int nnPort; + const char *nnHost; hdfsFS hdfs; - if (port < 0) { - fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort " - "returned error %d\n", port); - return port; + + if (nmdGetNameNodeHttpAddress(cluster, &nnPort, &nnHost)) { + fprintf(stderr, "Error when retrieving namenode host address.\n"); + return 1; } - hdfs = hdfsConnectAsUserNewInstance(nn, port, user); - if (!hdfs) { - return -errno; + hdfs = hdfsConnectAsUser(nnHost, nnPort, user); + if(!hdfs) { + fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); + return 1; } + *fs = hdfs; return 0; } @@ -65,6 +69,7 @@ static int doTestHdfsOperations(struct t char prefix[256], tmp[256]; hdfsFile file; int ret, expected; + hdfsFileInfo *fileInfo; snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx); @@ -74,18 +79,13 @@ static int doTestHdfsOperations(struct t EXPECT_ZERO(hdfsCreateDirectory(fs, prefix)); snprintf(tmp, sizeof(tmp), "%s/file", prefix); - /* - * Although there should not be any file to open for reading, - * the right now implementation only construct a local - * information struct when opening file - */ EXPECT_NONNULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0)); file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0); EXPECT_NONNULL(file); /* TODO: implement writeFully and use it here */ - expected = strlen(prefix); + expected = (int)strlen(prefix); ret = hdfsWrite(fs, file, prefix, expected); if (ret < 0) { ret = errno; @@ -118,9 +118,28 @@ static int doTestHdfsOperations(struct t } EXPECT_ZERO(memcmp(prefix, tmp, expected)); EXPECT_ZERO(hdfsCloseFile(fs, file)); - - // TODO: Non-recursive delete should fail? - //EXPECT_NONZERO(hdfsDelete(fs, prefix, 0)); + + snprintf(tmp, sizeof(tmp), "%s/file", prefix); + EXPECT_NONZERO(hdfsChown(fs, tmp, NULL, NULL)); + EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop")); + fileInfo = hdfsGetPathInfo(fs, tmp); + EXPECT_NONNULL(fileInfo); + EXPECT_ZERO(strcmp("doop", fileInfo->mGroup)); + hdfsFreeFileInfo(fileInfo, 1); + + EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2")); + fileInfo = hdfsGetPathInfo(fs, tmp); + EXPECT_NONNULL(fileInfo); + EXPECT_ZERO(strcmp("ha", fileInfo->mOwner)); + EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup)); + hdfsFreeFileInfo(fileInfo, 1); + + EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL)); + fileInfo = hdfsGetPathInfo(fs, tmp); + EXPECT_NONNULL(fileInfo); + EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner)); + EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup)); + hdfsFreeFileInfo(fileInfo, 1); EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); return 0; @@ -134,7 +153,7 @@ static void *testHdfsOperations(void *v) fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", ti->threadIdx); - ret = hdfsSingleNameNodeConnect(nn, port, user, &fs); + ret = hdfsSingleNameNodeConnect(cluster, &fs); if (ret) { fprintf(stderr, "testHdfsOperations(threadIdx=%d): " "hdfsSingleNameNodeConnect failed with error %d.\n", @@ -181,19 +200,23 @@ static int checkFailures(struct tlhThrea */ int main(int argc, const char *args[]) { - if (argc != 4) { - fprintf(stderr, "usage: test_libhdfs_threaded "); - return -1; - } - - nn = args[1]; - port = atoi(args[2]); - user = args[3]; - int i, tlhNumThreads; const char *tlhNumThreadsStr; struct tlhThreadInfo ti[TLH_MAX_THREADS]; + if (argc != 2) { + fprintf(stderr, "usage: test_libwebhdfs_threaded \n"); + exit(1); + } + user = args[1]; + + struct NativeMiniDfsConf conf = { + .doFormat = 1, .webhdfsEnabled = 1, .namenodeHttpPort = 50070, + }; + cluster = nmdCreate(&conf); + EXPECT_NONNULL(cluster); + EXPECT_ZERO(nmdWaitClusterUp(cluster)); + tlhNumThreadsStr = getenv("TLH_NUM_THREADS"); if (!tlhNumThreadsStr) { tlhNumThreadsStr = "3"; @@ -210,8 +233,6 @@ int main(int argc, const char *args[]) ti[i].threadIdx = i; } -// tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads); - for (i = 0; i < tlhNumThreads; i++) { EXPECT_ZERO(pthread_create(&ti[i].thread, NULL, testHdfsOperations, &ti[i])); @@ -220,6 +241,7 @@ int main(int argc, const char *args[]) EXPECT_ZERO(pthread_join(ti[i].thread, NULL)); } -// EXPECT_ZERO(sem_close(tlhSem)); + EXPECT_ZERO(nmdShutdown(cluster)); + nmdFree(cluster); return checkFailures(ti, tlhNumThreads); }