hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1382836 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/ src/contrib/libwebhdfs/ src/contrib/libwebhdfs/resources/ src/contrib/libwebhdfs/src/
Date Mon, 10 Sep 2012 13:43:29 GMT
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <string.h>
+#include <ctype.h>
+#include <jansson.h>
+#include "hdfs_json_parser.h"
+#include "exception.h"
+
+hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries, const char *operation); //Forward Declaration
+
+static hdfsFileInfo *json_parse_array(json_t *jobj, char *key, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
+    int arraylen = json_array_size(jobj);                      //Getting the length of the array
+    *numEntries = arraylen;
+    if (!key) {
+        return NULL;
+    }
+    if(arraylen > 0) {
+        fileStat = (hdfsFileInfo *)realloc(fileStat,sizeof(hdfsFileInfo)*arraylen);
+    }
+    json_t *jvalue;
+    int i;
+    for (i=0; i< arraylen; i++) {
+        jvalue = json_array_get(jobj, i);            //Getting the array element at position i
+        if (json_is_array(jvalue)) {                 // array within an array - program should never come here for now
+            json_parse_array(jvalue, NULL, &fileStat[i], numEntries, operation);
+        }
+        else if (json_is_object(jvalue)) {           // program will definitely come over here
+            parseJsonGFS(jvalue, &fileStat[i], numEntries, operation);
+        }
+        else {
+            return NULL;                               // program will never come over here for now
+        }
+    }
+    *numEntries = arraylen;
+    return fileStat;
+}
+
+int parseBoolean(char *response) {
+    json_t *root;
+    json_error_t error;
+    size_t flags = 0;
+    int result = 0;
+    const char *key;
+    json_t *value;
+    root = json_loads(response, flags, &error);
+    void *iter = json_object_iter(root);
+    while(iter)  {
+        key = json_object_iter_key(iter);
+        value = json_object_iter_value(iter);
+        switch (json_typeof(value))  {
+            case JSON_TRUE:
+                result = 1;
+                break;
+            default:
+                result = 0;
+                break;
+        }
+        iter = json_object_iter_next(root, iter);
+    }
+    return result;
+}
+
+int parseMKDIR(char *response) {
+    return (parseBoolean(response));
+}
+
+int parseRENAME(char *response) {
+    return (parseBoolean(response));
+}
+
+int parseDELETE(char *response) {
+    return (parseBoolean(response));
+}
+
+hdfs_exception_msg *parseJsonException(json_t *jobj) {
+    const char *key;
+    json_t *value;
+    hdfs_exception_msg *exception = NULL;
+    
+    exception = (hdfs_exception_msg *) calloc(1, sizeof(hdfs_exception_msg));
+    if (!exception) {
+        return NULL;
+    }
+    
+    void *iter = json_object_iter(jobj);
+    while (iter) {
+        key = json_object_iter_key(iter);
+        value = json_object_iter_value(iter);
+        
+        if (!strcmp(key, "exception")) {
+            exception->exception = json_string_value(value);
+        } else if (!strcmp(key, "javaClassName")) {
+            exception->javaClassName = json_string_value(value);
+        } else if (!strcmp(key, "message")) {
+            exception->message = json_string_value(value);
+        }
+        
+        iter = json_object_iter_next(jobj, iter);
+    }
+    
+    return exception;
+}
+
+hdfs_exception_msg *parseException(const char *content) {
+    if (!content) {
+        return NULL;
+    }
+    
+    json_error_t error;
+    size_t flags = 0;
+    const char *key;
+    json_t *value;
+    json_t *jobj = json_loads(content, flags, &error);
+    
+    if (!jobj) {
+        fprintf(stderr, "JSon parsing failed\n");
+        return NULL;
+    }
+    void *iter = json_object_iter(jobj);
+    while(iter)  {
+        key = json_object_iter_key(iter);
+        value = json_object_iter_value(iter);
+        
+        if (!strcmp(key, "RemoteException") && json_typeof(value) == JSON_OBJECT) {
+            return parseJsonException(value);
+        }
+        iter = json_object_iter_next(jobj, iter);
+    }
+    return NULL;
+}
+
+hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
+    const char *tempstr;
+    const char *key;
+    json_t *value;
+    void *iter = json_object_iter(jobj);
+    while(iter)  {
+        key = json_object_iter_key(iter);
+        value = json_object_iter_value(iter);
+        
+        switch (json_typeof(value)) {
+            case JSON_INTEGER:
+                if(!strcmp(key,"accessTime")) {
+                    fileStat->mLastAccess = (time_t)(json_integer_value(value)/1000);
+                } else if (!strcmp(key,"blockSize")) {
+                    fileStat->mBlockSize = (tOffset)json_integer_value(value);
+                } else if (!strcmp(key,"length")) {
+                    fileStat->mSize = (tOffset)json_integer_value(value);
+                } else if(!strcmp(key,"modificationTime")) {
+                    fileStat->mLastMod = (time_t)(json_integer_value(value)/1000);
+                } else if (!strcmp(key,"replication")) {
+                    fileStat->mReplication = (short)json_integer_value(value);
+                }
+                break;
+                
+            case JSON_STRING:
+                if(!strcmp(key,"group")) {
+                    fileStat->mGroup=(char *)json_string_value(value);
+                } else if (!strcmp(key,"owner")) {
+                    fileStat->mOwner=(char *)json_string_value(value);
+                } else if (!strcmp(key,"pathSuffix")) {
+                    fileStat->mName=(char *)json_string_value(value);
+                } else if (!strcmp(key,"permission")) {
+                    tempstr=(char *)json_string_value(value);
+                    fileStat->mPermissions = (short)strtol(tempstr,(char **)NULL,8);
+                } else if (!strcmp(key,"type")) {
+                    char *cvalue = (char *)json_string_value(value);
+                    if (!strcmp(cvalue, "DIRECTORY")) {
+                        fileStat->mKind = kObjectKindDirectory;
+                    } else {
+                        fileStat->mKind = kObjectKindFile;
+                    }
+                }
+                break;
+                
+            case JSON_OBJECT:
+                if(!strcmp(key,"FileStatus")) {
+                    parseJsonGFS(value, fileStat, numEntries, operation);
+                } else if (!strcmp(key,"FileStatuses")) {
+                    fileStat = parseJsonGFS(value, &fileStat[0], numEntries, operation);
+                } else if (!strcmp(key,"RemoteException")) {
+                    //Besides returning NULL, we also need to print the exception information
+                    hdfs_exception_msg *exception = parseJsonException(value);
+                    if (exception) {
+                        errno = printExceptionWeb(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+                    }
+                    
+                    if(fileStat != NULL) {
+                        free(fileStat);
+                        fileStat = NULL;
+                    }
+                }
+                break;
+                
+            case JSON_ARRAY:
+                if (!strcmp(key,"FileStatus")) {
+                    fileStat = json_parse_array(value,(char *) key,fileStat,numEntries, operation);
+                }
+                break;
+                
+            default:
+                if(fileStat != NULL) {
+                    free(fileStat);
+                    fileStat = NULL;
+                }
+        }
+        iter = json_object_iter_next(jobj, iter);
+    }
+    return fileStat;
+}
+
+
+int checkHeader(char *header, const char *content, const char *operation) {
+    char *result = NULL;
+    char delims[] = ":";
+    char *responseCode= "200 OK";
+    if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) {
+        return 0;
+    }
+    if(!(strstr(header, responseCode)) || !(header = strstr(header, "Content-Length"))) {
+        hdfs_exception_msg *exc = parseException(content);
+        if (exc) {
+            errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+        }
+        return 0;
+    }
+    result = strtok(header, delims);
+    result = strtok(NULL,delims);
+    while (isspace(*result)) {
+        result++;
+    }
+    if(strcmp(result,"0")) {                 //Content-Length should be equal to 0
+        return 1;
+    } else {
+        return 0;
+    }
+}
+
+int parseOPEN(const char *header, const char *content) {
+    const char *responseCode1 = "307 TEMPORARY_REDIRECT";
+    const char *responseCode2 = "200 OK";
+    if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
+        return -1;
+    }
+    if(!(strstr(header,responseCode1) && strstr(header, responseCode2))) {
+        hdfs_exception_msg *exc = parseException(content);
+        if (exc) {
+            //if the exception is an IOException and it is because the offset is out of the range
+            //do not print out the exception
+            if (!strcasecmp(exc->exception, "IOException") && strstr(exc->message, "out of the range")) {
+                return 0;
+            }
+            errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)");
+        }
+        return -1;
+    }
+    
+    return 1;
+}
+
+int parseCHMOD(char *header, const char *content) {
+    return checkHeader(header, content, "CHMOD");
+}
+
+
+int parseCHOWN(char *header, const char *content) {
+    return checkHeader(header, content, "CHOWN");
+}
+
+int parseUTIMES(char *header, const char *content) {
+    return checkHeader(header, content, "UTIMES");
+}
+
+
+int checkIfRedirect(const char *const headerstr, const char *content, const char *operation) {
+    char *responseCode = "307 TEMPORARY_REDIRECT";
+    char * locTag = "Location";
+    char * tempHeader;
+    if(headerstr == '\0' || strncmp(headerstr,"HTTP/", 5)) {
+        return 0;
+    }
+    if(!(tempHeader = strstr(headerstr,responseCode))) {
+        //process possible exception information
+        hdfs_exception_msg *exc = parseException(content);
+        if (exc) {
+            errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+        }
+        return 0;
+    }
+    if(!(strstr(tempHeader,locTag))) {
+        return 0;
+    }
+    return 1;
+}
+
+
+int parseNnWRITE(const char *header, const char *content) {
+    return checkIfRedirect(header, content, "Write(NameNode)");
+}
+
+
+int parseNnAPPEND(const char *header, const char *content) {
+    return checkIfRedirect(header, content, "Append(NameNode)");
+}
+
+char *parseDnLoc(char *content) {
+    char delims[] = "\r\n";
+    char *url = NULL;
+    char *DnLocation = NULL;
+    char *savepter;
+    DnLocation = strtok_r(content, delims, &savepter);
+    while (DnLocation && strncmp(DnLocation, "Location:", strlen("Location:"))) {
+        DnLocation = strtok_r(NULL, delims, &savepter);
+    }
+    if (!DnLocation) {
+        return NULL;
+    }
+    DnLocation = strstr(DnLocation, "http");
+    if (!DnLocation) {
+        return NULL;
+    }
+    url = malloc(strlen(DnLocation) + 1);
+    if (!url) {
+        return NULL;
+    }
+    strcpy(url, DnLocation);
+    return url;
+}
+
+int parseDnWRITE(const char *header, const char *content) {
+    char *responseCode = "201 Created";
+    fprintf(stderr, "\nheaderstr is: %s\n", header);
+    if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
+        return 0;
+    }
+    if(!(strstr(header,responseCode))) {
+        hdfs_exception_msg *exc = parseException(content);
+        if (exc) {
+            errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))");
+        }
+        return 0;
+    }
+    return 1;
+}
+
+int parseDnAPPEND(const char *header, const char *content) {
+    char *responseCode = "200 OK";
+    if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) {
+        return 0;
+    }
+    if(!(strstr(header, responseCode))) {
+        hdfs_exception_msg *exc = parseException(content);
+        if (exc) {
+            errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))");
+        }
+        return 0;
+    }
+    return 1;
+}
+
+hdfsFileInfo *parseGFS(char *str, hdfsFileInfo *fileStat, int *numEntries) {
+    json_error_t error;
+    size_t flags = 0;
+    json_t *jobj = json_loads(str, flags, &error);
+    fileStat = parseJsonGFS(jobj, fileStat, numEntries, "GETPATHSTATUS/LISTSTATUS");
+    return fileStat;
+}
+
+int parseSETREPLICATION(char *response) {
+    return (parseBoolean(response));
+}
+

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_JSON_PARSER_H_
+#define _HDFS_JSON_PARSER_H_
+#include "webhdfs.h"
+
+int parseMKDIR(char *response);
+int parseRENAME(char *response);
+int parseDELETE (char *response);
+int parseSETREPLICATION(char *response);
+
+int parseOPEN(const char *header, const char *content);
+
+int parseNnWRITE(const char *header, const char *content);
+int parseDnWRITE(const char *header, const char *content);
+int parseNnAPPEND(const char *header, const char *content);
+int parseDnAPPEND(const char *header, const char *content);
+
+char* parseDnLoc(char *content);
+
+hdfsFileInfo *parseGFS(char *response, hdfsFileInfo *fileStat, int *numEntries);
+
+int parseCHOWN (char *header, const char *content);
+int parseCHMOD (char *header, const char *content);
+int parseUTIMES(char *header, const char *content);
+
+#endif //_FUSE_JSON_PARSER_H

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,1113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <jni.h>
+#include "webhdfs.h"
+#include "hdfs_http_client.h"
+#include "hdfs_http_query.h"
+#include "hdfs_json_parser.h"
+#include "jni_helper.h"
+#include "exception.h"
+
+#define HADOOP_HDFS_CONF        "org/apache/hadoop/hdfs/HdfsConfiguration"
+#define HADOOP_NAMENODE         "org/apache/hadoop/hdfs/server/namenode/NameNode"
+#define JAVA_INETSOCKETADDRESS  "java/net/InetSocketAddress"
+
+static void initFileinfo(hdfsFileInfo *fileInfo) {
+    if (fileInfo) {
+        fileInfo->mKind = kObjectKindFile;
+        fileInfo->mName = NULL;
+        fileInfo->mLastMod = 0;
+        fileInfo->mSize = 0;
+        fileInfo->mReplication = 0;
+        fileInfo->mBlockSize = 0;
+        fileInfo->mOwner = NULL;
+        fileInfo->mGroup = NULL;
+        fileInfo->mPermissions = 0;
+        fileInfo->mLastAccess = 0;
+    }
+}
+
+static webhdfsBuffer *initWebHdfsBuffer() {
+    webhdfsBuffer *buffer = (webhdfsBuffer *) calloc(1, sizeof(webhdfsBuffer));
+    if (!buffer) {
+        fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n");
+        return NULL;
+    }
+    buffer->remaining = 0;
+    buffer->offset = 0;
+    buffer->wbuffer = NULL;
+    buffer->closeFlag = 0;
+    buffer->openFlag = 0;
+    pthread_mutex_init(&buffer->writeMutex, NULL);
+    pthread_cond_init(&buffer->newwrite_or_close, NULL);
+    pthread_cond_init(&buffer->transfer_finish, NULL);
+    return buffer;
+}
+
+static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) {
+    if (buffer && length > 0) {
+        pthread_mutex_lock(&wb->writeMutex);
+        wb->wbuffer = buffer;
+        wb->offset = 0;
+        wb->remaining = length;
+        pthread_cond_signal(&wb->newwrite_or_close);
+        while (wb->remaining != 0) {
+            pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex);
+        }
+        pthread_mutex_unlock(&wb->writeMutex);
+    }
+    return wb;
+}
+
+static void freeWebhdfsBuffer(webhdfsBuffer *buffer) {
+    if (buffer) {
+        int des = pthread_cond_destroy(&buffer->newwrite_or_close);
+        if (des == EBUSY) {
+            fprintf(stderr, "The condition newwrite_or_close is still referenced!\n");
+        } else if (des == EINVAL) {
+            fprintf(stderr, "The condition newwrite_or_close is invalid!\n");
+        }
+        des = pthread_cond_destroy(&buffer->transfer_finish);
+        if (des == EBUSY) {
+            fprintf(stderr, "The condition transfer_finish is still referenced!\n");
+        } else if (des == EINVAL) {
+            fprintf(stderr, "The condition transfer_finish is invalid!\n");
+        }
+        if (des == EBUSY) {
+            fprintf(stderr, "The condition close_clean is still referenced!\n");
+        } else if (des == EINVAL) {
+            fprintf(stderr, "The condition close_clean is invalid!\n");
+        }
+        des = pthread_mutex_destroy(&buffer->writeMutex);
+        if (des == EBUSY) {
+            fprintf(stderr, "The mutex is still locked or referenced!\n");
+        }
+        free(buffer);
+        buffer = NULL;
+    }
+}
+
+static void freeWebFileHandle(struct webhdfsFileHandle * handle) {
+    if (handle) {
+        freeWebhdfsBuffer(handle->uploadBuffer);
+        if (handle->datanode) {
+            free(handle->datanode);
+        }
+        if (handle->absPath) {
+            free(handle->absPath);
+        }
+        free(handle);
+        handle = NULL;
+    }
+}
+
+struct hdfsBuilder *hdfsNewBuilder(void)
+{
+    struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
+    if (!bld) {
+        return NULL;
+    }
+    hdfsSetWorkingDirectory(bld, "/");
+    return bld;
+}
+
+void hdfsFreeBuilder(struct hdfsBuilder *bld)
+{
+    if (bld && bld->workingDir) {
+        free(bld->workingDir);
+    }
+    free(bld);
+}
+
+void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
+{
+    if (bld) {
+        bld->forceNewInstance = 1;
+    }
+}
+
+void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
+{
+    if (bld) {
+        bld->nn = nn;
+        bld->nn_jni = nn;
+    }
+}
+
+void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
+{
+    if (bld) {
+        bld->port = port;
+    }
+}
+
+void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
+{
+    if (bld) {
+        bld->userName = userName;
+    }
+}
+
+void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
+                                       const char *kerbTicketCachePath)
+{
+    if (bld) {
+        bld->kerbTicketCachePath = kerbTicketCachePath;
+    }
+}
+
+hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user)
+{
+    struct hdfsBuilder* bld = hdfsNewBuilder();
+    if (!bld) {
+        return NULL;
+    }
+    hdfsBuilderSetNameNode(bld, nn);
+    hdfsBuilderSetNameNodePort(bld, port);
+    hdfsBuilderSetUserName(bld, user);
+    return hdfsBuilderConnect(bld);
+}
+
+hdfsFS hdfsConnect(const char* nn, tPort port)
+{
+    return hdfsConnectAsUser(nn, port, NULL);
+}
+
+hdfsFS hdfsConnectNewInstance(const char* nn, tPort port)
+{
+    struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port);
+    if (!bld) {
+        return NULL;
+    }
+    hdfsBuilderSetForceNewInstance(bld);
+    return bld;
+}
+
+hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
+                                    const char *user)
+{
+    struct hdfsBuilder *bld = hdfsNewBuilder();
+    if (!bld)
+        return NULL;
+    hdfsBuilderSetNameNode(bld, host);
+    hdfsBuilderSetNameNodePort(bld, port);
+    hdfsBuilderSetUserName(bld, user);
+    hdfsBuilderSetForceNewInstance(bld);
+    return hdfsBuilderConnect(bld);
+}
+
+const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
+                             char *buf, size_t bufLen);
+
+hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
+{
+    if (!bld) {
+        return NULL;
+    }
+    // if the hostname is null for the namenode, set it to localhost
+    //only handle bld->nn
+    if (bld->nn == NULL) {
+        bld->nn = "localhost";
+    } else {
+        /* check whether the hostname of the namenode (nn in hdfsBuilder) has already contained the port */
+        const char *lastColon = rindex(bld->nn, ':');
+        if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) {
+            fprintf(stderr, "port %d was given, but URI '%s' already "
+                    "contains a port!\n", bld->port, bld->nn);
+            char *newAddr = (char *)malloc(strlen(bld->nn) - strlen(lastColon) + 1);
+            if (!newAddr) {
+                return NULL;
+            }
+            strncpy(newAddr, bld->nn, strlen(bld->nn) - strlen(lastColon));
+            newAddr[strlen(bld->nn) - strlen(lastColon)] = '\0';
+            free(bld->nn);
+            bld->nn = newAddr;
+        }
+    }
+    
+    /* if the namenode is "default" and/or the port of namenode is 0, get the default namenode/port by using JNI */
+    if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
+        JNIEnv *env = 0;
+        jobject jHDFSConf = NULL, jAddress = NULL;
+        jvalue jVal;
+        jthrowable jthr = NULL;
+        int ret = 0;
+        char buf[512];
+        
+        //Get the JNIEnv* corresponding to current thread
+        env = getJNIEnv();
+        if (env == NULL) {
+            errno = EINTERNAL;
+            free(bld);
+            bld = NULL;
+            return NULL;
+        }
+        
+        //  jHDFSConf = new HDFSConfiguration();
+        jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
+        if (jthr) {
+            ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                        "hdfsBuilderConnect(%s)",
+                                        hdfsBuilderToStr(bld, buf, sizeof(buf)));
+            goto done;
+        }
+        
+        jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress",
+                            "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
+                            jHDFSConf);
+        if (jthr) {
+            ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                            "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
+            goto done; //free(bld), deleteReference for jHDFSConf
+        }
+        jAddress = jVal.l;
+        
+        if (bld->port == 0) {
+            jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+                                JAVA_INETSOCKETADDRESS, "getPort", "()I");
+            if (jthr) {
+                ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                            "hdfsBuilderConnect(%s)",
+                                            hdfsBuilderToStr(bld, buf, sizeof(buf)));
+                goto done;
+            }
+            bld->port = jVal.i;
+        }
+        
+        if (!strcasecmp("default", bld->nn)) {
+            jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+                                JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;");
+            if (jthr) {
+                ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                            "hdfsBuilderConnect(%s)",
+                                            hdfsBuilderToStr(bld, buf, sizeof(buf)));
+                goto done;
+            }
+            bld->nn = (const char*) ((*env)->GetStringUTFChars(env, jVal.l, NULL));
+        }
+        
+    done:
+        destroyLocalReference(env, jHDFSConf);
+        destroyLocalReference(env, jAddress);
+        if (ret) { //if there is error/exception, we free the builder and return NULL
+            free(bld);
+            bld = NULL;
+        }
+    }
+    
+    //for debug
+    fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
+    return bld;
+}
+
+int hdfsDisconnect(hdfsFS fs)
+{
+    if (fs == NULL) {
+        errno = EBADF;
+        return -1;
+    } else {
+        free(fs);
+        fs = NULL;
+    }
+    return 0;
+}
+
+char *getAbsolutePath(hdfsFS fs, const char *path) {
+    if (fs == NULL || path == NULL) {
+        return NULL;
+    }
+    char *absPath = NULL;
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    
+    if ('/' != *path && bld->workingDir) {
+        absPath = (char *)malloc(strlen(bld->workingDir) + strlen(path) + 1);
+        if (!absPath) {
+            return NULL;
+        }
+        absPath = strcpy(absPath, bld->workingDir);
+        absPath = strcat(absPath, path);
+        return absPath;
+    } else {
+        absPath = (char *)malloc(strlen(path) + 1);
+        if (!absPath) {
+            return NULL;
+        }
+        absPath = strcpy(absPath, path);
+        return absPath;
+    }
+}
+
+int hdfsCreateDirectory(hdfsFS fs, const char* path)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return -1;
+    }
+    
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url = NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareMKDIR(bld->nn, bld->port, absPath, bld->userName))
+         && (resp = launchMKDIR(url))
+         && (parseMKDIR(resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(url);
+    free(absPath);
+    return ret;
+}
+
+int hdfsChmod(hdfsFS fs, const char* path, short mode)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return -1;
+    }
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url=NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareCHMOD(bld->nn, bld->port, absPath, (int)mode, bld->userName))
+         && (resp = launchCHMOD(url))
+         && (parseCHMOD(resp->header->content, resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    return ret;
+}
+
+int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return -1;
+    }
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url=NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareCHOWN(bld->nn, bld->port, absPath, owner, group, bld->userName))
+         && (resp = launchCHOWN(url))
+         && (parseCHOWN(resp->header->content, resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    return ret;
+}
+
+int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
+{
+    if (fs == NULL || oldPath == NULL || newPath == NULL) {
+        return -1;
+    }
+    
+    char *oldAbsPath = getAbsolutePath(fs, oldPath);
+    if (!oldAbsPath) {
+        return -1;
+    }
+    char *newAbsPath = getAbsolutePath(fs, newPath);
+    if (!newAbsPath) {
+        return -1;
+    }
+    
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url=NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareRENAME(bld->nn, bld->port, oldAbsPath, newAbsPath, bld->userName))
+         && (resp = launchRENAME(url))
+         && (parseRENAME(resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(oldAbsPath);
+    free(newAbsPath);
+    free(url);
+    return ret;
+}
+
+hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
+{
+    if (fs == NULL || path == NULL) {
+        return NULL;
+    }
+    
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return NULL;
+    }
+    
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url=NULL;
+    Response resp = NULL;
+    int numEntries = 0;
+    int ret = 0;
+    
+    hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
+    if (!fileInfo) {
+        ret = -1;
+        goto done;
+    }
+    initFileinfo(fileInfo);
+    
+    if(!((url = prepareGFS(bld->nn, bld->port, absPath, bld->userName))
+         && (resp = launchGFS(url))
+         && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries))))  {
+        ret = -1;
+        goto done;
+    }
+    
+done:
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    
+    if (ret == 0) {
+        return fileInfo;
+    } else {
+        free(fileInfo);
+        return NULL;
+    }
+}
+
+hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
+{
+    if (fs == NULL || path == NULL) {
+        return NULL;
+    }
+    
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return NULL;
+    }
+
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url = NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
+    if (!fileInfo) {
+        ret = -1;
+        goto done;
+    }
+    
+    if(!((url = prepareLS(bld->nn, bld->port, absPath, bld->userName))
+         && (resp = launchLS(url))
+         && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries))))  {
+        ret = -1;
+        goto done;
+    }
+    
+done:
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    
+    if (ret == 0) {
+        return fileInfo;
+    } else {
+        free(fileInfo);
+        return NULL;
+    }
+}
+
+int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return -1;
+    }
+    
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url = NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareSETREPLICATION(bld->nn, bld->port, absPath, replication, bld->userName))
+         && (resp = launchSETREPLICATION(url))
+         && (parseSETREPLICATION(resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    return ret;
+}
+
+void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
+{
+    //Free the mName, mOwner, and mGroup
+    int i;
+    for (i=0; i < numEntries; ++i) {
+        if (hdfsFileInfo[i].mName) {
+            free(hdfsFileInfo[i].mName);
+        }
+        if (hdfsFileInfo[i].mOwner) {
+            free(hdfsFileInfo[i].mOwner);
+        }
+        if (hdfsFileInfo[i].mGroup) {
+            free(hdfsFileInfo[i].mGroup);
+        }
+    }
+    
+    //Free entire block
+    free(hdfsFileInfo);
+    hdfsFileInfo = NULL;
+}
+
+int hdfsDelete(hdfsFS fs, const char* path, int recursive)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return -1;
+    }
+    
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url = NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareDELETE(bld->nn, bld->port, absPath, recursive, bld->userName))
+         && (resp = launchDELETE(url))
+         && (parseDELETE(resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    return ret;
+}
+
+int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    char *absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        return -1;
+    }
+    
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *url = NULL;
+    Response resp = NULL;
+    int ret = 0;
+    
+    if(!((url = prepareUTIMES(bld->nn, bld->port, absPath, mtime, atime, bld->userName))
+         && (resp = launchUTIMES(url))
+         && (parseUTIMES(resp->header->content, resp->body->content)))) {
+        ret = -1;
+    }
+    
+    freeResponse(resp);
+    free(absPath);
+    free(url);
+    return ret;
+}
+
+int hdfsExists(hdfsFS fs, const char *path)
+{
+    hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path);
+    if (fileInfo) {
+        hdfsFreeFileInfo(fileInfo, 1);
+        return 0;
+    } else {
+        return -1;
+    }
+}
+
+typedef struct {
+    char *url;
+    webhdfsBuffer *uploadBuffer;
+    int flags;
+    Response resp;
+} threadData;
+
+static void freeThreadData(threadData *data) {
+    if (data) {
+        if (data->url) {
+            free(data->url);
+        }
+        if (data->resp) {
+            freeResponse(data->resp);
+        }
+        //the uploadBuffer would be freed by freeWebFileHandle()
+        free(data);
+        data = NULL;
+    }
+}
+
+static void *writeThreadOperation(void *v) {
+    threadData *data = (threadData *) v;
+    if (data->flags & O_APPEND) {
+        data->resp = launchDnAPPEND(data->url, data->uploadBuffer);
+    } else {
+        data->resp = launchDnWRITE(data->url, data->uploadBuffer);
+    }
+    return data;
+}
+
+hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
+                      int bufferSize, short replication, tSize blockSize)
+{
+    /*
+     * the original version of libhdfs based on JNI store a fsinputstream/fsoutputstream in the hdfsFile
+     * in libwebhdfs that is based on webhdfs, we store (absolute_path, buffersize, replication, blocksize) in it
+     */
+    if (fs == NULL || path == NULL) {
+        return NULL;
+    }
+
+    int accmode = flags & O_ACCMODE;
+    if (accmode == O_RDWR) {
+        fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
+        errno = ENOTSUP;
+        return NULL;
+    }
+    
+    if ((flags & O_CREAT) && (flags & O_EXCL)) {
+        fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
+    }
+    
+    hdfsFile hdfsFileHandle = (hdfsFile) calloc(1, sizeof(struct hdfsFile_internal));
+    if (!hdfsFileHandle) {
+        return NULL;
+    }
+    int ret = 0;
+    hdfsFileHandle->flags = flags;
+    hdfsFileHandle->type = accmode == O_RDONLY ? INPUT : OUTPUT;
+    hdfsFileHandle->offset = 0;
+    struct webhdfsFileHandle *webhandle = (struct webhdfsFileHandle *) calloc(1, sizeof(struct webhdfsFileHandle));
+    if (!webhandle) {
+        ret = -1;
+        goto done;
+    }
+    webhandle->bufferSize = bufferSize;
+    webhandle->replication = replication;
+    webhandle->blockSize = blockSize;
+    webhandle->absPath = getAbsolutePath(fs, path);
+    if (!webhandle->absPath) {
+        ret = -1;
+        goto done;
+    }
+    hdfsFileHandle->file = webhandle;
+    
+    //for write/append, need to connect to the namenode
+    //and get the url of corresponding datanode
+    if (hdfsFileHandle->type == OUTPUT) {
+        webhandle->uploadBuffer = initWebHdfsBuffer();
+        if (!webhandle->uploadBuffer) {
+            ret = -1;
+            goto done;
+        }
+        struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+        char *url = NULL;
+        Response resp = NULL;
+        int append = flags & O_APPEND;
+        int create = append ? 0 : 1;
+        
+        //if create: send create request to NN
+        if (create) {
+            url = prepareNnWRITE(bld->nn, bld->port, webhandle->absPath, bld->userName, webhandle->replication, webhandle->blockSize);
+        } else if (append) {
+            url = prepareNnAPPEND(bld->nn, bld->port, webhandle->absPath, bld->userName);
+        }
+        if (!url) {
+            fprintf(stderr,
+                    "fail to create the url connecting to namenode for file creation/appending\n");
+            ret = -1;
+            goto done;
+        }
+
+        if (create) {
+            resp = launchNnWRITE(url);
+        } else if (append) {
+            resp = launchNnAPPEND(url);
+        }
+        if (!resp) {
+            fprintf(stderr,
+                    "fail to get the response from namenode for file creation/appending\n");
+            free(url);
+            ret = -1;
+            goto done;
+        }
+        
+        int parseRet = 0;
+        if (create) {
+            parseRet = parseNnWRITE(resp->header->content, resp->body->content);
+        } else if (append) {
+            parseRet = parseNnAPPEND(resp->header->content, resp->body->content);
+        }
+        if (!parseRet) {
+            fprintf(stderr,
+                    "fail to parse the response from namenode for file creation/appending\n");
+            free(url);
+            freeResponse(resp);
+            ret = -1;
+            goto done;
+        }
+            
+        free(url);
+        url = parseDnLoc(resp->header->content);
+        if (!url) {
+            fprintf(stderr,
+                    "fail to get the datanode url from namenode for file creation/appending\n");
+            freeResponse(resp);
+            ret = -1;
+            return NULL;
+        }
+        freeResponse(resp);
+        //store the datanode url in the file handle
+        webhandle->datanode = strdup(url);
+ 
+        //create a new thread for performing the http transferring
+        threadData *data = (threadData *) calloc(1, sizeof(threadData));
+        if (!data) {
+            ret = -1;
+            goto done;
+        }
+        data->url = strdup(url);
+        data->flags = flags;
+        data->uploadBuffer = webhandle->uploadBuffer;
+        free(url);
+        ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data);
+        if (ret) {
+            fprintf(stderr, "Failed to create the writing thread.\n");
+        } else {
+            webhandle->uploadBuffer->openFlag = 1;
+        }
+    }
+    
+done:
+    if (ret == 0) {
+        return hdfsFileHandle;
+    } else {
+        freeWebFileHandle(webhandle);
+        free(hdfsFileHandle);
+        return NULL;
+    }
+}
+
+tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length)
+{
+    if (length == 0) {
+        return 0;
+    }
+    if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) {
+        return -1;
+    }
+    
+    struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
+    if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) {
+        resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
+        return length;
+    } else {
+        fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath);
+        return -1;
+    }
+}
+
+int hdfsCloseFile(hdfsFS fs, hdfsFile file)
+{
+    int ret = 0;
+    fprintf(stderr, "to close file...\n");
+    if (file->type == OUTPUT) {
+        void *respv;
+        threadData *tdata;
+        struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
+        pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
+        wfile->uploadBuffer->closeFlag = 1;
+        pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
+        pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex));
+        
+        //waiting for the writing thread to terminate
+        ret = pthread_join(wfile->connThread, &respv);
+        if (ret) {
+            fprintf(stderr, "Error (code %d) when pthread_join.\n", ret);
+        }
+        //parse the response
+        tdata = (threadData *) respv;
+        if (!tdata) {
+            fprintf(stderr, "Response from the writing thread is NULL.\n");
+            ret = -1;
+        }
+        if (file->flags & O_APPEND) {
+            parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content);
+        } else {
+            parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content);
+        }
+        //free the threaddata
+        freeThreadData(tdata);
+    }
+    
+    fprintf(stderr, "To clean the webfilehandle...\n");
+    if (file) {
+        freeWebFileHandle(file->file);
+        free(file);
+        file = NULL;
+        fprintf(stderr, "Cleaned the webfilehandle...\n");
+    }
+    return ret;
+}
+
+int hdfsFileIsOpenForRead(hdfsFile file)
+{
+    return (file->type == INPUT);
+}
+
+int hdfsFileIsOpenForWrite(hdfsFile file)
+{
+    return (file->type == OUTPUT);
+}
+
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
+{
+    if (length == 0) {
+        return 0;
+    }
+    if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    struct webhdfsFileHandle *webFile = (struct webhdfsFileHandle *) file->file;
+    char *url = NULL;
+    Response resp = NULL;
+    int openResult = -1;
+    
+    resp = (Response) calloc(1, sizeof(*resp));
+    if (!resp) {
+        return -1;
+    }
+    resp->header = initResponseBuffer();
+    resp->body = initResponseBuffer();
+    resp->body->content = buffer;
+    resp->body->remaining = length;
+    
+    if (!((url = prepareOPEN(bld->nn, bld->port, webFile->absPath, bld->userName, file->offset, length))
+          && (resp = launchOPEN(url, resp))
+          && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) {
+        free(url);
+        freeResponseBuffer(resp->header);
+        if (openResult == 0) {
+            return 0;
+        } else {
+            return -1;
+        }
+    }
+    
+    size_t readSize = resp->body->offset;
+    file->offset += readSize;
+    
+    freeResponseBuffer(resp->header);
+    free(resp->body);
+    free(resp);
+    free(url);
+    return readSize;
+}
+
+int hdfsAvailable(hdfsFS fs, hdfsFile file)
+{
+    if (!file || !fs) {
+        return -1;
+    }
+    struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
+    if (!wf) {
+        return -1;
+    }
+    hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
+    if (fileInfo) {
+        int available = (int)(fileInfo->mSize - file->offset);
+        hdfsFreeFileInfo(fileInfo, 1);
+        return available;
+    } else {
+        return -1;
+    }
+}
+
+int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
+{
+    if (!fs || !file || desiredPos < 0) {
+        return -1;
+    }
+    struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
+    if (!wf) {
+        return -1;
+    }
+    hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
+    int ret = 0;
+    if (fileInfo) {
+        if (fileInfo->mSize < desiredPos) {
+            errno = ENOTSUP;
+            fprintf(stderr,
+                    "hdfsSeek for %s failed since the desired position %lld is beyond the size of the file %lld\n",
+                    wf->absPath, desiredPos, fileInfo->mSize);
+            ret = -1;
+        } else {
+            file->offset = desiredPos;
+        }
+        hdfsFreeFileInfo(fileInfo, 1);
+        return ret;
+    } else {
+        return -1;
+    }
+}
+
+tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length)
+{
+    if (!fs || !file || file->type != INPUT || position < 0 || !buffer || length < 0) {
+        return -1;
+    }
+    file->offset = position;
+    return hdfsRead(fs, file, buffer, length);
+}
+
+tOffset hdfsTell(hdfsFS fs, hdfsFile file)
+{
+    if (!file) {
+        return -1;
+    }
+    return file->offset;
+}
+
+char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
+{
+    if (fs == NULL || buffer == NULL ||  bufferSize <= 0) {
+        return NULL;
+    }
+    
+    struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
+    if (bld->workingDir) {
+        strncpy(buffer, bld->workingDir, bufferSize);
+    }
+    return buffer;
+}
+
+int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
+{
+    if (fs == NULL || path == NULL) {
+        return -1;
+    }
+    
+    struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
+    free(bld->workingDir);
+    bld->workingDir = (char *)malloc(strlen(path) + 1);
+    if (!(bld->workingDir)) {
+        return -1;
+    }
+    strcpy(bld->workingDir, path);
+    return 0;
+}
+
+void hdfsFreeHosts(char ***blockHosts)
+{
+    int i, j;
+    for (i=0; blockHosts[i]; i++) {
+        for (j=0; blockHosts[i][j]; j++) {
+            free(blockHosts[i][j]);
+        }
+        free(blockHosts[i]);
+    }
+    free(blockHosts);
+}
+
+/* not useful for libwebhdfs */
+int hdfsFileUsesDirectRead(hdfsFile file)
+{
+    /* return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); */
+    fprintf(stderr, "hdfsFileUsesDirectRead is no longer useful for libwebhdfs.\n");
+    return -1;
+}
+
+/* not useful for libwebhdfs */
+void hdfsFileDisableDirectRead(hdfsFile file)
+{
+    /* file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; */
+    fprintf(stderr, "hdfsFileDisableDirectRead is no longer useful for libwebhdfs.\n");
+}
+
+/* not useful for libwebhdfs */
+int hdfsHFlush(hdfsFS fs, hdfsFile file)
+{
+    return 0;
+}
+
+/* not useful for libwebhdfs */
+int hdfsFlush(hdfsFS fs, hdfsFile file)
+{
+    return 0;
+}
+
+char*** hdfsGetHosts(hdfsFS fs, const char* path,
+                     tOffset start, tOffset length)
+{
+    fprintf(stderr, "hdfsGetHosts is not but will be supported by libwebhdfs yet.\n");
+    return NULL;
+}
+
+tOffset hdfsGetCapacity(hdfsFS fs)
+{
+    fprintf(stderr, "hdfsGetCapacity is not but will be supported by libwebhdfs.\n");
+    return -1;
+}
+
+tOffset hdfsGetUsed(hdfsFS fs)
+{
+    fprintf(stderr, "hdfsGetUsed is not but will be supported by libwebhdfs yet.\n");
+    return -1;
+}
+

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,609 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#include "config.h"
+#include "exception.h"
+#include "jni_helper.h"
+
+#include <stdio.h> 
+#include <string.h> 
+
+static pthread_mutex_t hdfsHashMutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t jvmMutex = PTHREAD_MUTEX_INITIALIZER;
+static volatile int hashTableInited = 0;
+
+#define LOCK_HASH_TABLE() pthread_mutex_lock(&hdfsHashMutex)
+#define UNLOCK_HASH_TABLE() pthread_mutex_unlock(&hdfsHashMutex)
+
+
+/** The Native return types that methods could return */
+#define VOID          'V'
+#define JOBJECT       'L'
+#define JARRAYOBJECT  '['
+#define JBOOLEAN      'Z'
+#define JBYTE         'B'
+#define JCHAR         'C'
+#define JSHORT        'S'
+#define JINT          'I'
+#define JLONG         'J'
+#define JFLOAT        'F'
+#define JDOUBLE       'D'
+
+
+/**
+ * MAX_HASH_TABLE_ELEM: The maximum no. of entries in the hashtable.
+ * It's set to 4096 to account for (classNames + No. of threads)
+ */
+#define MAX_HASH_TABLE_ELEM 4096
+
+/** Key that allows us to retrieve thread-local storage */
+static pthread_key_t gTlsKey;
+
+/** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */
+static int gTlsKeyInitialized = 0;
+
+/** Pthreads thread-local storage for each library thread. */
+struct hdfsTls {
+    JNIEnv *env;
+};
+
+/**
+ * The function that is called whenever a thread with libhdfs thread local data
+ * is destroyed.
+ *
+ * @param v         The thread-local data
+ */
+static void hdfsThreadDestructor(void *v)
+{
+    struct hdfsTls *tls = v;
+    JavaVM *vm;
+    JNIEnv *env = tls->env;
+    jint ret;
+
+    ret = (*env)->GetJavaVM(env, &vm);
+    if (ret) {
+        fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with "
+                "error %d\n", ret);
+        (*env)->ExceptionDescribe(env);
+    } else {
+        (*vm)->DetachCurrentThread(vm);
+    }
+    free(tls);
+}
+
+void destroyLocalReference(JNIEnv *env, jobject jObject)
+{
+  if (jObject)
+    (*env)->DeleteLocalRef(env, jObject);
+}
+
+static jthrowable validateMethodType(JNIEnv *env, MethType methType)
+{
+    if (methType != STATIC && methType != INSTANCE) {
+        return newRuntimeError(env, "validateMethodType(methType=%d): "
+            "illegal method type.\n", methType);
+    }
+    return NULL;
+}
+
+jthrowable newJavaStr(JNIEnv *env, const char *str, jstring *out)
+{
+    jstring jstr;
+
+    if (!str) {
+        /* Can't pass NULL to NewStringUTF: the result would be
+         * implementation-defined. */
+        *out = NULL;
+        return NULL;
+    }
+    jstr = (*env)->NewStringUTF(env, str);
+    if (!jstr) {
+        /* If NewStringUTF returns NULL, an exception has been thrown,
+         * which we need to handle.  Probaly an OOM. */
+        return getPendingExceptionAndClear(env);
+    }
+    *out = jstr;
+    return NULL;
+}
+
+jthrowable newCStr(JNIEnv *env, jstring jstr, char **out)
+{
+    const char *tmp;
+
+    if (!jstr) {
+        *out = NULL;
+        return NULL;
+    }
+    tmp = (*env)->GetStringUTFChars(env, jstr, NULL);
+    if (!tmp) {
+        return getPendingExceptionAndClear(env);
+    }
+    *out = strdup(tmp);
+    (*env)->ReleaseStringUTFChars(env, jstr, tmp);
+    return NULL;
+}
+
+static int hashTableInit(void)
+{
+    if (!hashTableInited) {
+        LOCK_HASH_TABLE();
+        if (!hashTableInited) {
+            if (hcreate(MAX_HASH_TABLE_ELEM) == 0) {
+                fprintf(stderr, "error creating hashtable, <%d>: %s\n",
+                        errno, strerror(errno));
+                return 0;
+            } 
+            hashTableInited = 1;
+        }
+        UNLOCK_HASH_TABLE();
+    }
+    return 1;
+}
+
+
+static int insertEntryIntoTable(const char *key, void *data)
+{
+    ENTRY e, *ep;
+    if (key == NULL || data == NULL) {
+        return 0;
+    }
+    if (! hashTableInit()) {
+      return -1;
+    }
+    e.data = data;
+    e.key = (char*)key;
+    LOCK_HASH_TABLE();
+    ep = hsearch(e, ENTER);
+    UNLOCK_HASH_TABLE();
+    if (ep == NULL) {
+        fprintf(stderr, "warn adding key (%s) to hash table, <%d>: %s\n",
+                key, errno, strerror(errno));
+    }  
+    return 0;
+}
+
+
+
+static void* searchEntryFromTable(const char *key)
+{
+    ENTRY e,*ep;
+    if (key == NULL) {
+        return NULL;
+    }
+    hashTableInit();
+    e.key = (char*)key;
+    LOCK_HASH_TABLE();
+    ep = hsearch(e, FIND);
+    UNLOCK_HASH_TABLE();
+    if (ep != NULL) {
+        return ep->data;
+    }
+    return NULL;
+}
+
+
+
+jthrowable invokeMethod(JNIEnv *env, jvalue *retval, MethType methType,
+                 jobject instObj, const char *className,
+                 const char *methName, const char *methSignature, ...)
+{
+    va_list args;
+    jclass cls;
+    jmethodID mid;
+    jthrowable jthr;
+    const char *str; 
+    char returnType;
+    
+    jthr = validateMethodType(env, methType);
+    if (jthr)
+        return jthr;
+    jthr = globalClassReference(className, env, &cls);
+    if (jthr)
+        return jthr;
+    jthr = methodIdFromClass(className, methName, methSignature, 
+                            methType, env, &mid);
+    if (jthr)
+        return jthr;
+    str = methSignature;
+    while (*str != ')') str++;
+    str++;
+    returnType = *str;
+    va_start(args, methSignature);
+    if (returnType == JOBJECT || returnType == JARRAYOBJECT) {
+        jobject jobj = NULL;
+        if (methType == STATIC) {
+            jobj = (*env)->CallStaticObjectMethodV(env, cls, mid, args);
+        }
+        else if (methType == INSTANCE) {
+            jobj = (*env)->CallObjectMethodV(env, instObj, mid, args);
+        }
+        retval->l = jobj;
+    }
+    else if (returnType == VOID) {
+        if (methType == STATIC) {
+            (*env)->CallStaticVoidMethodV(env, cls, mid, args);
+        }
+        else if (methType == INSTANCE) {
+            (*env)->CallVoidMethodV(env, instObj, mid, args);
+        }
+    }
+    else if (returnType == JBOOLEAN) {
+        jboolean jbool = 0;
+        if (methType == STATIC) {
+            jbool = (*env)->CallStaticBooleanMethodV(env, cls, mid, args);
+        }
+        else if (methType == INSTANCE) {
+            jbool = (*env)->CallBooleanMethodV(env, instObj, mid, args);
+        }
+        retval->z = jbool;
+    }
+    else if (returnType == JSHORT) {
+        jshort js = 0;
+        if (methType == STATIC) {
+            js = (*env)->CallStaticShortMethodV(env, cls, mid, args);
+        }
+        else if (methType == INSTANCE) {
+            js = (*env)->CallShortMethodV(env, instObj, mid, args);
+        }
+        retval->s = js;
+    }
+    else if (returnType == JLONG) {
+        jlong jl = -1;
+        if (methType == STATIC) {
+            jl = (*env)->CallStaticLongMethodV(env, cls, mid, args);
+        }
+        else if (methType == INSTANCE) {
+            jl = (*env)->CallLongMethodV(env, instObj, mid, args);
+        }
+        retval->j = jl;
+    }
+    else if (returnType == JINT) {
+        jint ji = -1;
+        if (methType == STATIC) {
+            ji = (*env)->CallStaticIntMethodV(env, cls, mid, args);
+        }
+        else if (methType == INSTANCE) {
+            ji = (*env)->CallIntMethodV(env, instObj, mid, args);
+        }
+        retval->i = ji;
+    }
+    va_end(args);
+
+    jthr = (*env)->ExceptionOccurred(env);
+    if (jthr) {
+        (*env)->ExceptionClear(env);
+        return jthr;
+    }
+    return NULL;
+}
+
+jthrowable constructNewObjectOfClass(JNIEnv *env, jobject *out, const char *className, 
+                                  const char *ctorSignature, ...)
+{
+    va_list args;
+    jclass cls;
+    jmethodID mid; 
+    jobject jobj;
+    jthrowable jthr;
+
+    jthr = globalClassReference(className, env, &cls);
+    if (jthr)
+        return jthr;
+    jthr = methodIdFromClass(className, "<init>", ctorSignature, 
+                            INSTANCE, env, &mid);
+    if (jthr)
+        return jthr;
+    va_start(args, ctorSignature);
+    jobj = (*env)->NewObjectV(env, cls, mid, args);
+    va_end(args);
+    if (!jobj)
+        return getPendingExceptionAndClear(env);
+    *out = jobj;
+    return NULL;
+}
+
+
+jthrowable methodIdFromClass(const char *className, const char *methName, 
+                            const char *methSignature, MethType methType, 
+                            JNIEnv *env, jmethodID *out)
+{
+    jclass cls;
+    jthrowable jthr;
+
+    jthr = globalClassReference(className, env, &cls);
+    if (jthr)
+        return jthr;
+    jmethodID mid = 0;
+    jthr = validateMethodType(env, methType);
+    if (jthr)
+        return jthr;
+    if (methType == STATIC) {
+        mid = (*env)->GetStaticMethodID(env, cls, methName, methSignature);
+    }
+    else if (methType == INSTANCE) {
+        mid = (*env)->GetMethodID(env, cls, methName, methSignature);
+    }
+    if (mid == NULL) {
+        fprintf(stderr, "could not find method %s from class %s with "
+            "signature %s\n", methName, className, methSignature);
+        return getPendingExceptionAndClear(env);
+    }
+    *out = mid;
+    return NULL;
+}
+
+jthrowable globalClassReference(const char *className, JNIEnv *env, jclass *out)
+{
+    jclass clsLocalRef;
+    jclass cls = searchEntryFromTable(className);
+    if (cls) {
+        *out = cls;
+        return NULL;
+    }
+    clsLocalRef = (*env)->FindClass(env,className);
+    if (clsLocalRef == NULL) {
+        return getPendingExceptionAndClear(env);
+    }
+    cls = (*env)->NewGlobalRef(env, clsLocalRef);
+    if (cls == NULL) {
+        (*env)->DeleteLocalRef(env, clsLocalRef);
+        return getPendingExceptionAndClear(env);
+    }
+    (*env)->DeleteLocalRef(env, clsLocalRef);
+    insertEntryIntoTable(className, cls);
+    *out = cls;
+    return NULL;
+}
+
+jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name)
+{
+    jthrowable jthr;
+    jclass cls, clsClass = NULL;
+    jmethodID mid;
+    jstring str = NULL;
+    const char *cstr = NULL;
+    char *newstr;
+
+    cls = (*env)->GetObjectClass(env, jobj);
+    if (cls == NULL) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    clsClass = (*env)->FindClass(env, "java/lang/Class");
+    if (clsClass == NULL) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    mid = (*env)->GetMethodID(env, clsClass, "getName", "()Ljava/lang/String;");
+    if (mid == NULL) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    str = (*env)->CallObjectMethod(env, cls, mid);
+    if (str == NULL) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    cstr = (*env)->GetStringUTFChars(env, str, NULL);
+    if (!cstr) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    newstr = strdup(cstr);
+    if (newstr == NULL) {
+        jthr = newRuntimeError(env, "classNameOfObject: out of memory");
+        goto done;
+    }
+    *name = newstr;
+    jthr = NULL;
+
+done:
+    destroyLocalReference(env, cls);
+    destroyLocalReference(env, clsClass);
+    if (str) {
+        if (cstr)
+            (*env)->ReleaseStringUTFChars(env, str, cstr);
+        (*env)->DeleteLocalRef(env, str);
+    }
+    return jthr;
+}
+
+
+/**
+ * Get the global JNI environemnt.
+ *
+ * We only have to create the JVM once.  After that, we can use it in
+ * every thread.  You must be holding the jvmMutex when you call this
+ * function.
+ *
+ * @return          The JNIEnv on success; error code otherwise
+ */
+static JNIEnv* getGlobalJNIEnv(void)
+{
+    const jsize vmBufLength = 1;
+    JavaVM* vmBuf[vmBufLength]; 
+    JNIEnv *env;
+    jint rv = 0; 
+    jint noVMs = 0;
+    jthrowable jthr;
+
+    rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs);
+    if (rv != 0) {
+        fprintf(stderr, "JNI_GetCreatedJavaVMs failed with error: %d\n", rv);
+        return NULL;
+    }
+
+    if (noVMs == 0) {
+        //Get the environment variables for initializing the JVM
+        char *hadoopClassPath = getenv("CLASSPATH");
+        if (hadoopClassPath == NULL) {
+            fprintf(stderr, "Environment variable CLASSPATH not set!\n");
+            return NULL;
+        } 
+        char *hadoopClassPathVMArg = "-Djava.class.path=";
+        size_t optHadoopClassPathLen = strlen(hadoopClassPath) + 
+          strlen(hadoopClassPathVMArg) + 1;
+        char *optHadoopClassPath = malloc(sizeof(char)*optHadoopClassPathLen);
+        snprintf(optHadoopClassPath, optHadoopClassPathLen,
+                "%s%s", hadoopClassPathVMArg, hadoopClassPath);
+
+        // Determine the # of LIBHDFS_OPTS args
+        int noArgs = 1;
+        char *hadoopJvmArgs = getenv("LIBHDFS_OPTS");
+        char jvmArgDelims[] = " ";
+        char *str, *token, *savePtr;
+        if (hadoopJvmArgs != NULL)  {
+          hadoopJvmArgs = strdup(hadoopJvmArgs);
+          for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
+            token = strtok_r(str, jvmArgDelims, &savePtr);
+            if (NULL == token) {
+              break;
+            }
+          }
+          free(hadoopJvmArgs);
+        }
+
+        // Now that we know the # args, populate the options array
+        JavaVMOption options[noArgs];
+        options[0].optionString = optHadoopClassPath;
+        hadoopJvmArgs = getenv("LIBHDFS_OPTS");
+	if (hadoopJvmArgs != NULL)  {
+          hadoopJvmArgs = strdup(hadoopJvmArgs);
+          for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
+            token = strtok_r(str, jvmArgDelims, &savePtr);
+            if (NULL == token) {
+              break;
+            }
+            options[noArgs].optionString = token;
+          }
+        }
+
+        //Create the VM
+        JavaVMInitArgs vm_args;
+        JavaVM *vm;
+        vm_args.version = JNI_VERSION_1_2;
+        vm_args.options = options;
+        vm_args.nOptions = noArgs; 
+        vm_args.ignoreUnrecognized = 1;
+
+        rv = JNI_CreateJavaVM(&vm, (void*)&env, &vm_args);
+
+        if (hadoopJvmArgs != NULL)  {
+          free(hadoopJvmArgs);
+        }
+        free(optHadoopClassPath);
+
+        if (rv != 0) {
+            fprintf(stderr, "Call to JNI_CreateJavaVM failed "
+                    "with error: %d\n", rv);
+            return NULL;
+        }
+        jthr = invokeMethod(env, NULL, STATIC, NULL,
+                         "org/apache/hadoop/fs/FileSystem",
+                         "loadFileSystems", "()V");
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "loadFileSystems");
+        }
+    }
+    else {
+        //Attach this thread to the VM
+        JavaVM* vm = vmBuf[0];
+        rv = (*vm)->AttachCurrentThread(vm, (void*)&env, 0);
+        if (rv != 0) {
+            fprintf(stderr, "Call to AttachCurrentThread "
+                    "failed with error: %d\n", rv);
+            return NULL;
+        }
+    }
+
+    return env;
+}
+
+/**
+ * getJNIEnv: A helper function to get the JNIEnv* for the given thread.
+ * If no JVM exists, then one will be created. JVM command line arguments
+ * are obtained from the LIBHDFS_OPTS environment variable.
+ *
+ * Implementation note: we rely on POSIX thread-local storage (tls).
+ * This allows us to associate a destructor function with each thread, that
+ * will detach the thread from the Java VM when the thread terminates.  If we
+ * failt to do this, it will cause a memory leak.
+ *
+ * However, POSIX TLS is not the most efficient way to do things.  It requires a
+ * key to be initialized before it can be used.  Since we don't know if this key
+ * is initialized at the start of this function, we have to lock a mutex first
+ * and check.  Luckily, most operating systems support the more efficient
+ * __thread construct, which is initialized by the linker.
+ *
+ * @param: None.
+ * @return The JNIEnv* corresponding to the thread.
+ */
+JNIEnv* getJNIEnv(void)
+{
+    JNIEnv *env;
+    struct hdfsTls *tls;
+    int ret;
+
+#ifdef HAVE_BETTER_TLS
+    static __thread struct hdfsTls *quickTls = NULL;
+    if (quickTls)
+        return quickTls->env;
+#endif
+    pthread_mutex_lock(&jvmMutex);
+    if (!gTlsKeyInitialized) {
+        ret = pthread_key_create(&gTlsKey, hdfsThreadDestructor);
+        if (ret) {
+            pthread_mutex_unlock(&jvmMutex);
+            fprintf(stderr, "getJNIEnv: pthread_key_create failed with "
+                "error %d\n", ret);
+            return NULL;
+        }
+        gTlsKeyInitialized = 1;
+    }
+    tls = pthread_getspecific(gTlsKey);
+    if (tls) {
+        pthread_mutex_unlock(&jvmMutex);
+        return tls->env;
+    }
+
+    env = getGlobalJNIEnv();
+    pthread_mutex_unlock(&jvmMutex);
+    if (!env) {
+        fprintf(stderr, "getJNIEnv: getGlobalJNIEnv failed\n");
+        return NULL;
+    }
+    tls = calloc(1, sizeof(struct hdfsTls));
+    if (!tls) {
+        fprintf(stderr, "getJNIEnv: OOM allocating %zd bytes\n",
+                sizeof(struct hdfsTls));
+        return NULL;
+    }
+    tls->env = env;
+    ret = pthread_setspecific(gTlsKey, tls);
+    if (ret) {
+        fprintf(stderr, "getJNIEnv: pthread_setspecific failed with "
+            "error code %d\n", ret);
+        hdfsThreadDestructor(tls);
+        return NULL;
+    }
+#ifdef HAVE_BETTER_TLS
+    quickTls = tls;
+#endif
+    return env;
+}
+

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFS_JNI_HELPER_H
+#define LIBHDFS_JNI_HELPER_H
+
+#include <jni.h>
+#include <stdio.h>
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <search.h>
+#include <pthread.h>
+#include <errno.h>
+
+#define PATH_SEPARATOR ':'
+
+
+/** Denote the method we want to invoke as STATIC or INSTANCE */
+typedef enum {
+    STATIC,
+    INSTANCE
+} MethType;
+
+/**
+ * Create a new malloc'ed C string from a Java string.
+ *
+ * @param env       The JNI environment
+ * @param jstr      The Java string
+ * @param out       (out param) the malloc'ed C string
+ *
+ * @return          NULL on success; the exception otherwise
+ */
+jthrowable newCStr(JNIEnv *env, jstring jstr, char **out);
+
+/**
+ * Create a new Java string from a C string.
+ *
+ * @param env       The JNI environment
+ * @param str       The C string
+ * @param out       (out param) the java string
+ *
+ * @return          NULL on success; the exception otherwise
+ */
+jthrowable newJavaStr(JNIEnv *env, const char *str, jstring *out);
+
+/**
+ * Helper function to destroy a local reference of java.lang.Object
+ * @param env: The JNIEnv pointer. 
+ * @param jFile: The local reference of java.lang.Object object
+ * @return None.
+ */
+void destroyLocalReference(JNIEnv *env, jobject jObject);
+
+/** invokeMethod: Invoke a Static or Instance method.
+ * className: Name of the class where the method can be found
+ * methName: Name of the method
+ * methSignature: the signature of the method "(arg-types)ret-type"
+ * methType: The type of the method (STATIC or INSTANCE)
+ * instObj: Required if the methType is INSTANCE. The object to invoke
+   the method on.
+ * env: The JNIEnv pointer
+ * retval: The pointer to a union type which will contain the result of the
+   method invocation, e.g. if the method returns an Object, retval will be
+   set to that, if the method returns boolean, retval will be set to the
+   value (JNI_TRUE or JNI_FALSE), etc.
+ * exc: If the methods throws any exception, this will contain the reference
+ * Arguments (the method arguments) must be passed after methSignature
+ * RETURNS: -1 on error and 0 on success. If -1 is returned, exc will have 
+   a valid exception reference, and the result stored at retval is undefined.
+ */
+jthrowable invokeMethod(JNIEnv *env, jvalue *retval, MethType methType,
+                 jobject instObj, const char *className, const char *methName, 
+                 const char *methSignature, ...);
+
+jthrowable constructNewObjectOfClass(JNIEnv *env, jobject *out, const char *className, 
+                                  const char *ctorSignature, ...);
+
+jthrowable methodIdFromClass(const char *className, const char *methName, 
+                            const char *methSignature, MethType methType, 
+                            JNIEnv *env, jmethodID *out);
+
+jthrowable globalClassReference(const char *className, JNIEnv *env, jclass *out);
+
+/** classNameOfObject: Get an object's class name.
+ * @param jobj: The object.
+ * @param env: The JNIEnv pointer.
+ * @param name: (out param) On success, will contain a string containing the
+ * class name. This string must be freed by the caller.
+ * @return NULL on success, or the exception
+ */
+jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);
+
+/** getJNIEnv: A helper function to get the JNIEnv* for the given thread.
+ * If no JVM exists, then one will be created. JVM command line arguments
+ * are obtained from the LIBHDFS_OPTS environment variable.
+ * @param: None.
+ * @return The JNIEnv* corresponding to the thread.
+ * */
+JNIEnv* getJNIEnv(void);
+
+#endif /*LIBHDFS_JNI_HELPER_H*/
+
+/**
+ * vim: ts=4: sw=4: et:
+ */
+

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "webhdfs.h"
+
+#include <errno.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "hdfs_http_client.h"
+#include "hdfs_http_query.h"
+#include "hdfs_json_parser.h"
+#include <unistd.h>
+#include <curl/curl.h>
+
+#define TLH_MAX_THREADS 100
+
+static sem_t *tlhSem;
+
+static const char *nn;
+static const char *user;
+static int port;
+
+static const char *fileName = "/tmp/tlhData";
+
+struct tlhThreadInfo {
+    /** Thread index */
+    int threadIdx;
+    /** 0 = thread was successful; error code otherwise */
+    int success;
+    /** pthread identifier */
+    pthread_t thread;
+};
+
+static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs)
+{
+    hdfsFS hdfs;
+    if (port < 0) {
+        fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
+                "returned error %d\n", port);
+        return port;
+    }
+    
+    hdfs = hdfsConnectAsUserNewInstance(nn, port, user);
+    if (!hdfs) {
+        return -errno;
+    }
+    *fs = hdfs;
+    return 0;
+}
+
+static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
+{
+    hdfsFile file;
+    int ret = 0;
+    char buffer[1024 * (ti->threadIdx + 1)];
+    memset(buffer, 'a', sizeof(buffer));
+
+    file = hdfsOpenFile(fs, "/tmp/thread_test.txt", O_WRONLY, 0, 0, 0);
+    sleep(1);
+    hdfsCloseFile(fs, file);
+    return ret;
+}
+
+static void *testHdfsOperations(void *v)
+{
+    struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
+    hdfsFS fs = NULL;
+    int ret;
+    
+    fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
+            ti->threadIdx);
+    ret = hdfsSingleNameNodeConnect(nn, port, user, &fs);
+    if (ret) {
+        fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
+                "hdfsSingleNameNodeConnect failed with error %d.\n",
+                ti->threadIdx, ret);
+        ti->success = EIO;
+        return NULL;
+    }
+    ti->success = doTestHdfsOperations(ti, fs);
+    if (hdfsDisconnect(fs)) {
+        ret = errno;
+        fprintf(stderr, "hdfsDisconnect error %d\n", ret);
+        ti->success = ret;
+    }
+    return NULL;
+}
+
+static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
+{
+    int i, threadsFailed = 0;
+    const char *sep = "";
+    
+    for (i = 0; i < tlhNumThreads; i++) {
+        if (ti[i].success != 0) {
+            threadsFailed = 1;
+        }
+    }
+    if (!threadsFailed) {
+        fprintf(stderr, "testLibHdfs: all threads succeeded.  SUCCESS.\n");
+        return EXIT_SUCCESS;
+    }
+    fprintf(stderr, "testLibHdfs: some threads failed: [");
+    for (i = 0; i < tlhNumThreads; i++) {
+        if (ti[i].success != 0) {
+            fprintf(stderr, "%s%d", sep, i);
+            sep = ", ";
+        }
+    }
+    fprintf(stderr, "].  FAILURE.\n");
+    return EXIT_FAILURE;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(int argc, const char *args[])
+{
+    if (argc != 4) {
+        fprintf(stderr, "usage: test_libhdfs_threaded <namenode> <port> <username>");
+        return -1;
+    }
+    
+    nn = args[1];
+    port = atoi(args[2]);
+    user = args[3];
+    
+    int i, tlhNumThreads;
+    const char *tlhNumThreadsStr;
+    struct tlhThreadInfo ti[TLH_MAX_THREADS];
+    
+    tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
+    if (!tlhNumThreadsStr) {
+        tlhNumThreadsStr = "3";
+    }
+    tlhNumThreads = atoi(tlhNumThreadsStr);
+    if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
+        fprintf(stderr, "testLibHdfs: must have a number of threads "
+                "between 1 and %d inclusive, not %d\n",
+                TLH_MAX_THREADS, tlhNumThreads);
+        return EXIT_FAILURE;
+    }
+    memset(&ti[0], 0, sizeof(ti));
+    for (i = 0; i < tlhNumThreads; i++) {
+        ti[i].threadIdx = i;
+    }
+    
+    tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads);
+    
+    for (i = 0; i < tlhNumThreads; i++) {
+        fprintf(stderr, "\ncreating thread %d\n", i);
+        EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
+                                   testHdfsOperations, &ti[i]));
+    }
+    for (i = 0; i < tlhNumThreads; i++) {
+        EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
+    }
+    
+    EXPECT_ZERO(sem_close(tlhSem));
+    return checkFailures(ti, tlhNumThreads);
+}



Mime
View raw message