hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1397387 [2/5] - in /hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs/ hadoop-hdfs...
Date Fri, 12 Oct 2012 00:15:37 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c Fri Oct 12 00:15:22 2012
@@ -16,38 +16,63 @@
  * limitations under the License.
  */
 
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <jni.h>
-#include "webhdfs.h"
+#include "exception.h"
+#include "hdfs.h"
 #include "hdfs_http_client.h"
 #include "hdfs_http_query.h"
 #include "hdfs_json_parser.h"
 #include "jni_helper.h"
-#include "exception.h"
+
+#include <inttypes.h>
+#include <jni.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
 
 #define HADOOP_HDFS_CONF        "org/apache/hadoop/hdfs/HdfsConfiguration"
 #define HADOOP_NAMENODE         "org/apache/hadoop/hdfs/server/namenode/NameNode"
 #define JAVA_INETSOCKETADDRESS  "java/net/InetSocketAddress"
 
-static void initFileinfo(hdfsFileInfo *fileInfo) {
-    if (fileInfo) {
-        fileInfo->mKind = kObjectKindFile;
-        fileInfo->mName = NULL;
-        fileInfo->mLastMod = 0;
-        fileInfo->mSize = 0;
-        fileInfo->mReplication = 0;
-        fileInfo->mBlockSize = 0;
-        fileInfo->mOwner = NULL;
-        fileInfo->mGroup = NULL;
-        fileInfo->mPermissions = 0;
-        fileInfo->mLastAccess = 0;
-    }
-}
+struct hdfsBuilder {
+    int forceNewInstance;
+    const char *nn;
+    tPort port;
+    const char *kerbTicketCachePath;
+    const char *userName;
+};
+
+/**
+ * The information required for accessing webhdfs,
+ * including the network address of the namenode and the user name
+ *
+ * Unlike the string in hdfsBuilder, the strings in this structure are
+ * dynamically allocated.  This structure will not be freed until we disconnect
+ * from HDFS.
+ */
+struct hdfs_internal {
+    char *nn;
+    tPort port;
+    char *userName;
 
-static webhdfsBuffer *initWebHdfsBuffer() {
-    webhdfsBuffer *buffer = (webhdfsBuffer *) calloc(1, sizeof(webhdfsBuffer));
+    /**
+     * Working directory -- stored with a trailing slash.
+     */
+    char *workingDir;
+};
+
+/**
+ * The 'file-handle' to a file in hdfs.
+ */
+struct hdfsFile_internal {
+    struct webhdfsFileHandle* file;
+    enum hdfsStreamType type;
+    int flags;
+    tOffset offset;
+};
+
+static webhdfsBuffer *initWebHdfsBuffer(void)
+{
+    webhdfsBuffer *buffer = calloc(1, sizeof(*buffer));
     if (!buffer) {
         fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n");
         return NULL;
@@ -107,49 +132,36 @@ static void freeWebhdfsBuffer(webhdfsBuf
 }
 
 static void freeWebFileHandle(struct webhdfsFileHandle * handle) {
-    if (handle) {
-        freeWebhdfsBuffer(handle->uploadBuffer);
-        if (handle->datanode) {
-            free(handle->datanode);
-        }
-        if (handle->absPath) {
-            free(handle->absPath);
-        }
-        free(handle);
-        handle = NULL;
-    }
+    if (!handle)
+        return;
+    freeWebhdfsBuffer(handle->uploadBuffer);
+    free(handle->datanode);
+    free(handle->absPath);
+    free(handle);
 }
 
 struct hdfsBuilder *hdfsNewBuilder(void)
 {
     struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
-    if (!bld) {
+    if (!bld)
         return NULL;
-    }
-    hdfsSetWorkingDirectory(bld, "/");
     return bld;
 }
 
 void hdfsFreeBuilder(struct hdfsBuilder *bld)
 {
-    if (bld && bld->workingDir) {
-        free(bld->workingDir);
-    }
     free(bld);
 }
 
 void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
 {
-    if (bld) {
-        bld->forceNewInstance = 1;
-    }
+    // We don't cache instances in libwebhdfs, so this is not applicable.
 }
 
 void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
 {
     if (bld) {
         bld->nn = nn;
-        bld->nn_jni = nn;
     }
 }
 
@@ -199,7 +211,7 @@ hdfsFS hdfsConnectNewInstance(const char
         return NULL;
     }
     hdfsBuilderSetForceNewInstance(bld);
-    return bld;
+    return hdfsBuilderConnect(bld);
 }
 
 hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
@@ -215,290 +227,356 @@ hdfsFS hdfsConnectAsUserNewInstance(cons
     return hdfsBuilderConnect(bld);
 }
 
-const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
-                             char *buf, size_t bufLen);
+static const char *maybeNull(const char *str)
+{
+    return str ? str : "(NULL)";
+}
+
+static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
+                                    char *buf, size_t bufLen)
+{
+    snprintf(buf, bufLen, "nn=%s, port=%d, "
+             "kerbTicketCachePath=%s, userName=%s",
+             maybeNull(bld->nn), bld->port,
+             maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
+    return buf;
+}
+
+static void freeWebHdfsInternal(struct hdfs_internal *fs)
+{
+    if (fs) {
+        free(fs->nn);
+        free(fs->userName);
+        free(fs->workingDir);
+    }
+}
+
+static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port,
+                            char **nn)
+{
+    JNIEnv *env = 0;
+    jobject jHDFSConf = NULL, jAddress = NULL;
+    jstring jHostName = NULL;
+    jvalue jVal;
+    jthrowable jthr = NULL;
+    int ret = 0;
+    char buf[512];
+    
+    // TODO: can we do this without using JNI?  See HDFS-3917
+    env = getJNIEnv();
+    if (!env) {
+        return EINTERNAL;
+    }
+    
+    //  jHDFSConf = new HDFSConfiguration();
+    jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "hdfsBuilderConnect(%s)",
+                                    hdfsBuilderToStr(bld, buf, sizeof(buf)));
+        goto done;
+    }
+    
+    jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress",
+                        "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
+                        jHDFSConf);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                        "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
+        goto done;
+    }
+    jAddress = jVal.l;
+    
+    jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+                        JAVA_INETSOCKETADDRESS, "getPort", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "hdfsBuilderConnect(%s)",
+                                    hdfsBuilderToStr(bld, buf, sizeof(buf)));
+        goto done;
+    }
+    *port = jVal.i;
+    
+    jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+                        JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "hdfsBuilderConnect(%s)",
+                                    hdfsBuilderToStr(bld, buf, sizeof(buf)));
+        goto done;
+    }
+    jHostName = jVal.l;
+    jthr = newCStr(env, jHostName, nn);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "hdfsBuilderConnect(%s)",
+                                    hdfsBuilderToStr(bld, buf, sizeof(buf)));
+        goto done;
+    }
+
+done:
+    destroyLocalReference(env, jHDFSConf);
+    destroyLocalReference(env, jAddress);
+    destroyLocalReference(env, jHostName);
+    return ret;
+}
 
 hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
 {
+    struct hdfs_internal *fs = NULL;
+    int ret;
+
     if (!bld) {
-        return NULL;
+        ret = EINVAL;
+        goto done;
     }
-    // if the hostname is null for the namenode, set it to localhost
-    //only handle bld->nn
     if (bld->nn == NULL) {
-        bld->nn = "localhost";
-    } else {
-        /* check whether the hostname of the namenode (nn in hdfsBuilder) has already contained the port */
-        const char *lastColon = rindex(bld->nn, ':');
-        if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) {
-            fprintf(stderr, "port %d was given, but URI '%s' already "
-                    "contains a port!\n", bld->port, bld->nn);
-            char *newAddr = (char *)malloc(strlen(bld->nn) - strlen(lastColon) + 1);
-            if (!newAddr) {
-                return NULL;
-            }
-            strncpy(newAddr, bld->nn, strlen(bld->nn) - strlen(lastColon));
-            newAddr[strlen(bld->nn) - strlen(lastColon)] = '\0';
-            free(bld->nn);
-            bld->nn = newAddr;
-        }
+        // In the JNI version of libhdfs this returns a LocalFileSystem.
+        ret = ENOTSUP;
+        goto done;
     }
     
-    /* if the namenode is "default" and/or the port of namenode is 0, get the default namenode/port by using JNI */
+    fs = calloc(1, sizeof(*fs));
+    if (!fs) {
+        ret = ENOMEM;
+        goto done;
+    }
+    /* If the namenode is "default" and/or the port of namenode is 0, get the
+     * default namenode/port */
     if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
-        JNIEnv *env = 0;
-        jobject jHDFSConf = NULL, jAddress = NULL;
-        jvalue jVal;
-        jthrowable jthr = NULL;
-        int ret = 0;
-        char buf[512];
-        
-        //Get the JNIEnv* corresponding to current thread
-        env = getJNIEnv();
-        if (env == NULL) {
-            errno = EINTERNAL;
-            free(bld);
-            bld = NULL;
-            return NULL;
-        }
-        
-        //  jHDFSConf = new HDFSConfiguration();
-        jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
-        if (jthr) {
-            ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                                        "hdfsBuilderConnect(%s)",
-                                        hdfsBuilderToStr(bld, buf, sizeof(buf)));
+        ret = retrieveDefaults(bld, &fs->port, &fs->nn);
+        if (ret)
+            goto done;
+    } else {
+        fs->port = bld->port;
+        fs->nn = strdup(bld->nn);
+        if (!fs->nn) {
+            ret = ENOMEM;
             goto done;
         }
-        
-        jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress",
-                            "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
-                            jHDFSConf);
-        if (jthr) {
-            ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                            "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
-            goto done; //free(bld), deleteReference for jHDFSConf
-        }
-        jAddress = jVal.l;
-        
-        if (bld->port == 0) {
-            jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
-                                JAVA_INETSOCKETADDRESS, "getPort", "()I");
-            if (jthr) {
-                ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                                            "hdfsBuilderConnect(%s)",
-                                            hdfsBuilderToStr(bld, buf, sizeof(buf)));
-                goto done;
-            }
-            bld->port = jVal.i;
-        }
-        
-        if (!strcasecmp("default", bld->nn)) {
-            jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
-                                JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;");
-            if (jthr) {
-                ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                                            "hdfsBuilderConnect(%s)",
-                                            hdfsBuilderToStr(bld, buf, sizeof(buf)));
-                goto done;
-            }
-            bld->nn = (const char*) ((*env)->GetStringUTFChars(env, jVal.l, NULL));
-        }
-        
-    done:
-        destroyLocalReference(env, jHDFSConf);
-        destroyLocalReference(env, jAddress);
-        if (ret) { //if there is error/exception, we free the builder and return NULL
-            free(bld);
-            bld = NULL;
+    }
+    if (bld->userName) {
+        // userName may be NULL
+        fs->userName = strdup(bld->userName);
+        if (!fs->userName) {
+            ret = ENOMEM;
+            goto done;
         }
     }
-    
+    // The working directory starts out as root.
+    fs->workingDir = strdup("/");
+    if (!fs->workingDir) {
+        ret = ENOMEM;
+        goto done;
+    }
     //for debug
     fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
-    return bld;
+
+done:
+    free(bld);
+    if (ret) {
+        freeWebHdfsInternal(fs);
+        errno = ret;
+        return NULL;
+    }
+    return fs;
 }
 
 int hdfsDisconnect(hdfsFS fs)
 {
     if (fs == NULL) {
-        errno = EBADF;
+        errno = EINVAL;
         return -1;
-    } else {
-        free(fs);
-        fs = NULL;
     }
+    freeWebHdfsInternal(fs);
     return 0;
 }
 
-char *getAbsolutePath(hdfsFS fs, const char *path) {
-    if (fs == NULL || path == NULL) {
-        return NULL;
-    }
+static char *getAbsolutePath(hdfsFS fs, const char *path)
+{
     char *absPath = NULL;
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    size_t absPathLen;
     
-    if ('/' != *path && bld->workingDir) {
-        absPath = (char *)malloc(strlen(bld->workingDir) + strlen(path) + 1);
-        if (!absPath) {
-            return NULL;
-        }
-        absPath = strcpy(absPath, bld->workingDir);
-        absPath = strcat(absPath, path);
-        return absPath;
-    } else {
-        absPath = (char *)malloc(strlen(path) + 1);
-        if (!absPath) {
-            return NULL;
-        }
-        absPath = strcpy(absPath, path);
-        return absPath;
+    if (path[0] == '/') {
+        // path is already absolute.
+        return strdup(path);
+    }
+    // prepend the workingDir to the path.
+    absPathLen = strlen(fs->workingDir) + strlen(path);
+    absPath = malloc(absPathLen + 1);
+    if (!absPath) {
+        return NULL;
     }
+    snprintf(absPath, absPathLen + 1, "%s%s", fs->workingDir, path);
+    return absPath;
 }
 
 int hdfsCreateDirectory(hdfsFS fs, const char* path)
 {
+    char *url = NULL, *absPath = NULL;
+    Response resp = NULL;
+    int ret = 0;
+
     if (fs == NULL || path == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url = NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    if(!((url = prepareMKDIR(bld->nn, bld->port, absPath, bld->userName))
+    if(!((url = prepareMKDIR(fs->nn, fs->port, absPath, fs->userName))
          && (resp = launchMKDIR(url))
          && (parseMKDIR(resp->body->content)))) {
-        ret = -1;
+        ret = EIO;
+        goto done;
     }
     
+done:
     freeResponse(resp);
     free(url);
     free(absPath);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 int hdfsChmod(hdfsFS fs, const char* path, short mode)
 {
+    char *absPath = NULL, *url = NULL;
+    Response resp = NULL;
+    int ret = 0;
+
     if (fs == NULL || path == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url=NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    if(!((url = prepareCHMOD(bld->nn, bld->port, absPath, (int)mode, bld->userName))
+    if(!((url = prepareCHMOD(fs->nn, fs->port, absPath, (int)mode, fs->userName))
          && (resp = launchCHMOD(url))
          && (parseCHMOD(resp->header->content, resp->body->content)))) {
-        ret = -1;
+        ret = EIO;
+        goto done;
     }
-    
+done:
     freeResponse(resp);
     free(absPath);
     free(url);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
 {
+    int ret = 0;
+    char *absPath = NULL, *url = NULL;
+    Response resp = NULL;
+
     if (fs == NULL || path == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
     
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url=NULL;
-    Response resp = NULL;
-    int ret = 0;
     
-    if(!((url = prepareCHOWN(bld->nn, bld->port, absPath, owner, group, bld->userName))
+    if(!((url = prepareCHOWN(fs->nn, fs->port, absPath, owner, group, fs->userName))
          && (resp = launchCHOWN(url))
          && (parseCHOWN(resp->header->content, resp->body->content)))) {
-        ret = -1;
+        ret = EIO;
+        goto done;
     }
     
+done:
     freeResponse(resp);
     free(absPath);
     free(url);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
 {
+    char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL;
+    int ret = 0;
+    Response resp = NULL;
+
     if (fs == NULL || oldPath == NULL || newPath == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    
-    char *oldAbsPath = getAbsolutePath(fs, oldPath);
+    oldAbsPath = getAbsolutePath(fs, oldPath);
     if (!oldAbsPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    char *newAbsPath = getAbsolutePath(fs, newPath);
+    newAbsPath = getAbsolutePath(fs, newPath);
     if (!newAbsPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url=NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    if(!((url = prepareRENAME(bld->nn, bld->port, oldAbsPath, newAbsPath, bld->userName))
+    if(!((url = prepareRENAME(fs->nn, fs->port, oldAbsPath, newAbsPath, fs->userName))
          && (resp = launchRENAME(url))
          && (parseRENAME(resp->body->content)))) {
         ret = -1;
     }
-    
+done:
     freeResponse(resp);
     free(oldAbsPath);
     free(newAbsPath);
     free(url);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
 {
-    if (fs == NULL || path == NULL) {
-        return NULL;
-    }
-    
-    char *absPath = getAbsolutePath(fs, path);
-    if (!absPath) {
-        return NULL;
-    }
-    
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+    char *absPath = NULL;
     char *url=NULL;
     Response resp = NULL;
     int numEntries = 0;
     int ret = 0;
-    
-    hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
+    hdfsFileInfo *fileInfo = NULL;
+
+    if (fs == NULL || path == NULL) {
+        ret = EINVAL;
+        goto done;
+    }
+    absPath = getAbsolutePath(fs, path);
+    if (!absPath) {
+        ret = ENOMEM;
+        goto done;
+    }
+    fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
     if (!fileInfo) {
-        ret = -1;
+        ret = ENOMEM;
         goto done;
     }
-    initFileinfo(fileInfo);
-    
-    if(!((url = prepareGFS(bld->nn, bld->port, absPath, bld->userName))
+    fileInfo->mKind = kObjectKindFile;
+
+    if(!((url = prepareGFS(fs->nn, fs->port, absPath, fs->userName))
          && (resp = launchGFS(url))
          && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries))))  {
-        ret = -1;
+        ret = EIO;
         goto done;
     }
     
@@ -511,163 +589,172 @@ done:
         return fileInfo;
     } else {
         free(fileInfo);
+        errno = ret;
         return NULL;
     }
 }
 
 hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
 {
+    char *url = NULL, *absPath = NULL;
+    Response resp = NULL;
+    int ret = 0;
+    hdfsFileInfo *fileInfo = NULL;
+
     if (fs == NULL || path == NULL) {
-        return NULL;
+        ret = EINVAL;
+        goto done;
     }
-    
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return NULL;
+        ret = ENOMEM;
+        goto done;
     }
-
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url = NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
+    fileInfo = calloc(1, sizeof(*fileInfo));
     if (!fileInfo) {
-        ret = -1;
+        ret = ENOMEM;
         goto done;
     }
-    
-    if(!((url = prepareLS(bld->nn, bld->port, absPath, bld->userName))
+    if(!((url = prepareLS(fs->nn, fs->port, absPath, fs->userName))
          && (resp = launchLS(url))
          && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries))))  {
-        ret = -1;
+        ret = EIO;
         goto done;
     }
-    
 done:
     freeResponse(resp);
     free(absPath);
     free(url);
-    
+
     if (ret == 0) {
         return fileInfo;
     } else {
-        free(fileInfo);
+        hdfsFreeFileInfo(fileInfo, 1);
+        errno = ret;
         return NULL;
     }
 }
 
 int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
 {
+    char *url = NULL, *absPath = NULL;
+    Response resp = NULL;
+    int ret = 0;
+
     if (fs == NULL || path == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url = NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    if(!((url = prepareSETREPLICATION(bld->nn, bld->port, absPath, replication, bld->userName))
+    if(!((url = prepareSETREPLICATION(fs->nn, fs->port, absPath, replication, fs->userName))
          && (resp = launchSETREPLICATION(url))
          && (parseSETREPLICATION(resp->body->content)))) {
-        ret = -1;
+        ret = EIO;
+        goto done;
     }
-    
+done:
     freeResponse(resp);
     free(absPath);
     free(url);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
 {
-    //Free the mName, mOwner, and mGroup
     int i;
+
     for (i=0; i < numEntries; ++i) {
-        if (hdfsFileInfo[i].mName) {
-            free(hdfsFileInfo[i].mName);
-        }
-        if (hdfsFileInfo[i].mOwner) {
-            free(hdfsFileInfo[i].mOwner);
-        }
-        if (hdfsFileInfo[i].mGroup) {
-            free(hdfsFileInfo[i].mGroup);
-        }
+        free(hdfsFileInfo[i].mName);
+        free(hdfsFileInfo[i].mOwner);
+        free(hdfsFileInfo[i].mGroup);
     }
-    
-    //Free entire block
     free(hdfsFileInfo);
-    hdfsFileInfo = NULL;
 }
 
 int hdfsDelete(hdfsFS fs, const char* path, int recursive)
 {
+    char *url = NULL, *absPath = NULL;
+    Response resp = NULL;
+    int ret = 0;
+
     if (fs == NULL || path == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url = NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    if(!((url = prepareDELETE(bld->nn, bld->port, absPath, recursive, bld->userName))
+    if(!((url = prepareDELETE(fs->nn, fs->port, absPath, recursive, fs->userName))
          && (resp = launchDELETE(url))
          && (parseDELETE(resp->body->content)))) {
-        ret = -1;
+        ret = EIO;
+        goto done;
     }
     
+done:
     freeResponse(resp);
     free(absPath);
     free(url);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
 {
+    char *url = NULL, *absPath = NULL;
+    Response resp = NULL;
+    int ret = 0;
+
     if (fs == NULL || path == NULL) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    char *absPath = getAbsolutePath(fs, path);
+    absPath = getAbsolutePath(fs, path);
     if (!absPath) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
-    
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    char *url = NULL;
-    Response resp = NULL;
-    int ret = 0;
-    
-    if(!((url = prepareUTIMES(bld->nn, bld->port, absPath, mtime, atime, bld->userName))
+    if(!((url = prepareUTIMES(fs->nn, fs->port, absPath, mtime, atime,
+                              fs->userName))
          && (resp = launchUTIMES(url))
          && (parseUTIMES(resp->header->content, resp->body->content)))) {
-        ret = -1;
+        ret = EIO;
+        goto done;
     }
     
+done:
     freeResponse(resp);
     free(absPath);
     free(url);
-    return ret;
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    return 0;
 }
 
 int hdfsExists(hdfsFS fs, const char *path)
 {
     hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path);
-    if (fileInfo) {
-        hdfsFreeFileInfo(fileInfo, 1);
-        return 0;
-    } else {
+    if (!fileInfo) {
+        // (errno will have been set by hdfsGetPathInfo)
         return -1;
     }
+    hdfsFreeFileInfo(fileInfo, 1);
+    return 0;
 }
 
 typedef struct {
@@ -701,39 +788,160 @@ static void *writeThreadOperation(void *
     return data;
 }
 
+/**
+ * Free the memory associated with a webHDFS file handle.
+ *
+ * No other resources will be freed.
+ *
+ * @param file            The webhdfs file handle
+ */
+static void freeFileInternal(hdfsFile file)
+{
+    if (!file)
+        return;
+    freeWebFileHandle(file->file);
+    free(file);
+}
+
+/**
+ * Helper function for opening a file for OUTPUT.
+ *
+ * As part of the open process for OUTPUT files, we have to connect to the
+ * NameNode and get the URL of the corresponding DataNode.
+ * We also create a background thread here for doing I/O.
+ *
+ * @param webhandle              The webhandle being opened
+ * @return                       0 on success; error code otherwise
+ */
+static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file)
+{
+    struct webhdfsFileHandle *webhandle = file->file;
+    Response resp = NULL;
+    int parseRet, append, ret = 0;
+    char *prepareUrl = NULL, *dnUrl = NULL;
+    threadData *data = NULL;
+
+    webhandle->uploadBuffer = initWebHdfsBuffer();
+    if (!webhandle->uploadBuffer) {
+        ret = ENOMEM;
+        goto done;
+    }
+    append = file->flags & O_APPEND;
+    if (!append) {
+        // If we're not appending, send a create request to the NN
+        prepareUrl = prepareNnWRITE(fs->nn, fs->port, webhandle->absPath,
+            fs->userName, webhandle->replication, webhandle->blockSize);
+    } else {
+        prepareUrl = prepareNnAPPEND(fs->nn, fs->port, webhandle->absPath,
+                              fs->userName);
+    }
+    if (!prepareUrl) {
+        fprintf(stderr, "fail to create the url connecting to namenode "
+                "for file creation/appending\n");
+        ret = EIO;
+        goto done;
+    }
+    if (!append) {
+        resp = launchNnWRITE(prepareUrl);
+    } else {
+        resp = launchNnAPPEND(prepareUrl);
+    }
+    if (!resp) {
+        fprintf(stderr, "fail to get the response from namenode for "
+                "file creation/appending\n");
+        ret = EIO;
+        goto done;
+    }
+    if (!append) {
+        parseRet = parseNnWRITE(resp->header->content, resp->body->content);
+    } else {
+        parseRet = parseNnAPPEND(resp->header->content, resp->body->content);
+    }
+    if (!parseRet) {
+        fprintf(stderr, "fail to parse the response from namenode for "
+                "file creation/appending\n");
+        ret = EIO;
+        goto done;
+    }
+    dnUrl = parseDnLoc(resp->header->content);
+    if (!dnUrl) {
+        fprintf(stderr, "fail to get the datanode url from namenode "
+                "for file creation/appending\n");
+        ret = EIO;
+        goto done;
+    }
+    //store the datanode url in the file handle
+    webhandle->datanode = strdup(dnUrl);
+    if (!webhandle->datanode) {
+        ret = ENOMEM;
+        goto done;
+    }
+    //create a new thread for performing the http transferring
+    data = calloc(1, sizeof(*data));
+    if (!data) {
+        ret = ENOMEM;
+        goto done;
+    }
+    data->url = strdup(dnUrl);
+    if (!data->url) {
+        ret = ENOMEM;
+        goto done;
+    }
+    data->flags = file->flags;
+    data->uploadBuffer = webhandle->uploadBuffer;
+    ret = pthread_create(&webhandle->connThread, NULL,
+                         writeThreadOperation, data);
+    if (ret) {
+        fprintf(stderr, "Failed to create the writing thread.\n");
+        goto done;
+    }
+    webhandle->uploadBuffer->openFlag = 1;
+
+done:
+    freeResponse(resp);
+    free(prepareUrl);
+    free(dnUrl);
+    if (ret) {
+        free(data->url);
+        free(data);
+    }
+    return ret;
+}
+
 hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
                       int bufferSize, short replication, tSize blockSize)
 {
-    /*
-     * the original version of libhdfs based on JNI store a fsinputstream/fsoutputstream in the hdfsFile
-     * in libwebhdfs that is based on webhdfs, we store (absolute_path, buffersize, replication, blocksize) in it
-     */
+    int ret = 0;
+    int accmode = flags & O_ACCMODE;
+    struct webhdfsFileHandle *webhandle = NULL;
+    hdfsFile file = NULL;
+
     if (fs == NULL || path == NULL) {
-        return NULL;
+        ret = EINVAL;
+        goto done;
     }
-
-    int accmode = flags & O_ACCMODE;
     if (accmode == O_RDWR) {
+        // TODO: the original libhdfs has very hackish support for this; should
+        // we do the same?  It would actually be a lot easier in libwebhdfs
+        // since the protocol isn't connection-oriented. 
         fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
-        errno = ENOTSUP;
-        return NULL;
+        ret = ENOTSUP;
+        goto done;
     }
-    
     if ((flags & O_CREAT) && (flags & O_EXCL)) {
         fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
     }
-    
-    hdfsFile hdfsFileHandle = (hdfsFile) calloc(1, sizeof(struct hdfsFile_internal));
-    if (!hdfsFileHandle) {
-        return NULL;
+    file = calloc(1, sizeof(struct hdfsFile_internal));
+    if (!file) {
+        ret = ENOMEM;
+        goto done;
     }
-    int ret = 0;
-    hdfsFileHandle->flags = flags;
-    hdfsFileHandle->type = accmode == O_RDONLY ? INPUT : OUTPUT;
-    hdfsFileHandle->offset = 0;
-    struct webhdfsFileHandle *webhandle = (struct webhdfsFileHandle *) calloc(1, sizeof(struct webhdfsFileHandle));
+    file->flags = flags;
+    file->type = accmode == O_RDONLY ? INPUT : OUTPUT;
+    file->offset = 0;
+    webhandle = calloc(1, sizeof(struct webhdfsFileHandle));
     if (!webhandle) {
-        ret = -1;
+        ret = ENOMEM;
         goto done;
     }
     webhandle->bufferSize = bufferSize;
@@ -741,105 +949,28 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const c
     webhandle->blockSize = blockSize;
     webhandle->absPath = getAbsolutePath(fs, path);
     if (!webhandle->absPath) {
-        ret = -1;
+        ret = ENOMEM;
         goto done;
     }
-    hdfsFileHandle->file = webhandle;
-    
-    //for write/append, need to connect to the namenode
-    //and get the url of corresponding datanode
-    if (hdfsFileHandle->type == OUTPUT) {
-        webhandle->uploadBuffer = initWebHdfsBuffer();
-        if (!webhandle->uploadBuffer) {
-            ret = -1;
-            goto done;
-        }
-        struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-        char *url = NULL;
-        Response resp = NULL;
-        int append = flags & O_APPEND;
-        int create = append ? 0 : 1;
-        
-        //if create: send create request to NN
-        if (create) {
-            url = prepareNnWRITE(bld->nn, bld->port, webhandle->absPath, bld->userName, webhandle->replication, webhandle->blockSize);
-        } else if (append) {
-            url = prepareNnAPPEND(bld->nn, bld->port, webhandle->absPath, bld->userName);
-        }
-        if (!url) {
-            fprintf(stderr,
-                    "fail to create the url connecting to namenode for file creation/appending\n");
-            ret = -1;
+    file->file = webhandle;
+    if (file->type == OUTPUT) {
+        ret = hdfsOpenOutputFileImpl(fs, file);
+        if (ret) {
             goto done;
         }
+    }
 
-        if (create) {
-            resp = launchNnWRITE(url);
-        } else if (append) {
-            resp = launchNnAPPEND(url);
-        }
-        if (!resp) {
-            fprintf(stderr,
-                    "fail to get the response from namenode for file creation/appending\n");
-            free(url);
-            ret = -1;
-            goto done;
-        }
-        
-        int parseRet = 0;
-        if (create) {
-            parseRet = parseNnWRITE(resp->header->content, resp->body->content);
-        } else if (append) {
-            parseRet = parseNnAPPEND(resp->header->content, resp->body->content);
-        }
-        if (!parseRet) {
-            fprintf(stderr,
-                    "fail to parse the response from namenode for file creation/appending\n");
-            free(url);
-            freeResponse(resp);
-            ret = -1;
-            goto done;
-        }
-            
-        free(url);
-        url = parseDnLoc(resp->header->content);
-        if (!url) {
-            fprintf(stderr,
-                    "fail to get the datanode url from namenode for file creation/appending\n");
-            freeResponse(resp);
-            ret = -1;
-            return NULL;
-        }
-        freeResponse(resp);
-        //store the datanode url in the file handle
-        webhandle->datanode = strdup(url);
- 
-        //create a new thread for performing the http transferring
-        threadData *data = (threadData *) calloc(1, sizeof(threadData));
-        if (!data) {
-            ret = -1;
-            goto done;
-        }
-        data->url = strdup(url);
-        data->flags = flags;
-        data->uploadBuffer = webhandle->uploadBuffer;
-        free(url);
-        ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data);
-        if (ret) {
-            fprintf(stderr, "Failed to create the writing thread.\n");
+done:
+    if (ret) {
+        if (file) {
+            freeFileInternal(file); // Also frees webhandle
         } else {
-            webhandle->uploadBuffer->openFlag = 1;
+            freeWebFileHandle(webhandle);
         }
-    }
-    
-done:
-    if (ret == 0) {
-        return hdfsFileHandle;
-    } else {
-        freeWebFileHandle(webhandle);
-        free(hdfsFileHandle);
+        errno = ret;
         return NULL;
     }
+    return file;
 }
 
 tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length)
@@ -848,15 +979,17 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file
         return 0;
     }
     if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) {
+        errno = EBADF;
         return -1;
     }
     
-    struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
+    struct webhdfsFileHandle *wfile = file->file;
     if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) {
         resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
         return length;
     } else {
         fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath);
+        errno = EBADF;
         return -1;
     }
 }
@@ -868,7 +1001,7 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile fi
     if (file->type == OUTPUT) {
         void *respv;
         threadData *tdata;
-        struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
+        struct webhdfsFileHandle *wfile = file->file;
         pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
         wfile->uploadBuffer->closeFlag = 1;
         pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
@@ -893,13 +1026,10 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile fi
         //free the threaddata
         freeThreadData(tdata);
     }
-    
-    fprintf(stderr, "To clean the webfilehandle...\n");
-    if (file) {
-        freeWebFileHandle(file->file);
-        free(file);
-        file = NULL;
-        fprintf(stderr, "Cleaned the webfilehandle...\n");
+    freeFileInternal(file);
+    fprintf(stderr, "Closed the webfilehandle...\n");
+    if (ret) {
+        errno = EIO;
     }
     return ret;
 }
@@ -914,111 +1044,155 @@ int hdfsFileIsOpenForWrite(hdfsFile file
     return (file->type == OUTPUT);
 }
 
-tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
+static int hdfsReadImpl(hdfsFS fs, hdfsFile file, void* buffer, tSize off,
+                        tSize length, tSize *numRead)
 {
-    if (length == 0) {
-        return 0;
-    }
-    if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) {
-        errno = EINVAL;
-        return -1;
-    }
-    struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
-    struct webhdfsFileHandle *webFile = (struct webhdfsFileHandle *) file->file;
+    int ret = 0;
     char *url = NULL;
     Response resp = NULL;
     int openResult = -1;
-    
-    resp = (Response) calloc(1, sizeof(*resp));
+
+    if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL ||
+            length < 0) {
+        ret = EINVAL;
+        goto done;
+    }
+    if (length == 0) {
+        // Special case: the user supplied a buffer of zero length, so there is
+        // nothing to do.
+        *numRead = 0;
+        goto done;
+    }
+    resp = calloc(1, sizeof(*resp)); // resp is actually a pointer type
     if (!resp) {
-        return -1;
+        ret = ENOMEM;
+        goto done;
     }
     resp->header = initResponseBuffer();
     resp->body = initResponseBuffer();
     resp->body->content = buffer;
     resp->body->remaining = length;
     
-    if (!((url = prepareOPEN(bld->nn, bld->port, webFile->absPath, bld->userName, file->offset, length))
+    if (!((url = prepareOPEN(fs->nn, fs->port, file->file->absPath,
+                             fs->userName, off, length))
           && (resp = launchOPEN(url, resp))
           && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) {
-        free(url);
-        freeResponseBuffer(resp->header);
         if (openResult == 0) {
-            return 0;
-        } else {
-            return -1;
+            // Special case: if parseOPEN returns 0, we asked for a byte range
+            // with outside what the file contains.  In this case, hdfsRead and
+            // hdfsPread return 0, meaning end-of-file.
+            *numRead = 0;
+            goto done;
         }
+        ret = EIO;
+        goto done;
     }
-    
-    size_t readSize = resp->body->offset;
-    file->offset += readSize;
-    
+    *numRead = resp->body->offset;
+
+done:
     freeResponseBuffer(resp->header);
     free(resp->body);
     free(resp);
     free(url);
-    return readSize;
+    return ret;
 }
 
-int hdfsAvailable(hdfsFS fs, hdfsFile file)
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
 {
-    if (!file || !fs) {
-        return -1;
-    }
-    struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
-    if (!wf) {
-        return -1;
-    }
-    hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
-    if (fileInfo) {
-        int available = (int)(fileInfo->mSize - file->offset);
-        hdfsFreeFileInfo(fileInfo, 1);
-        return available;
-    } else {
+    int ret;
+    tSize numRead = 0;
+
+    ret = hdfsReadImpl(fs, file, buffer, file->offset, length, &numRead);
+    if (ret) {
+        errno = ret;
         return -1;
     }
+    file->offset += numRead; 
+    return numRead;
+}
+
+int hdfsAvailable(hdfsFS fs, hdfsFile file)
+{
+    /* We actually always block when reading from webhdfs, currently.  So the
+     * number of bytes that can be read without blocking is currently 0.
+     */
+    return 0;
+}
+
+int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
+{
+    errno = ENOTSUP;
+    return -1;
 }
 
 int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
 {
-    if (!fs || !file || desiredPos < 0) {
-        return -1;
+    struct webhdfsFileHandle *wf;
+    hdfsFileInfo *fileInfo = NULL;
+    int ret = 0;
+
+    if (!fs || !file || (file->type == OUTPUT) || (desiredPos < 0)) {
+        ret = EINVAL;
+        goto done;
     }
-    struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
+    wf = file->file;
     if (!wf) {
-        return -1;
+        ret = EINVAL;
+        goto done;
     }
-    hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
-    int ret = 0;
+    fileInfo = hdfsGetPathInfo(fs, wf->absPath);
+    if (!fileInfo) {
+        ret = errno;
+        goto done;
+    }
+    if (desiredPos > fileInfo->mSize) {
+        fprintf(stderr,
+                "hdfsSeek for %s failed since the desired position %" PRId64
+                " is beyond the size of the file %" PRId64 "\n",
+                wf->absPath, desiredPos, fileInfo->mSize);
+        ret = ENOTSUP;
+        goto done;
+    }
+    file->offset = desiredPos;
+
+done:
     if (fileInfo) {
-        if (fileInfo->mSize < desiredPos) {
-            errno = ENOTSUP;
-            fprintf(stderr,
-                    "hdfsSeek for %s failed since the desired position %lld is beyond the size of the file %lld\n",
-                    wf->absPath, desiredPos, fileInfo->mSize);
-            ret = -1;
-        } else {
-            file->offset = desiredPos;
-        }
         hdfsFreeFileInfo(fileInfo, 1);
-        return ret;
-    } else {
+    }
+    if (ret) {
+        errno = ret;
         return -1;
     }
+    return 0;
 }
 
 tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length)
 {
-    if (!fs || !file || file->type != INPUT || position < 0 || !buffer || length < 0) {
+    int ret;
+    tSize numRead = 0;
+
+    if (position < 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    ret = hdfsReadImpl(fs, file, buffer, position, length, &numRead);
+    if (ret) {
+        errno = ret;
         return -1;
     }
-    file->offset = position;
-    return hdfsRead(fs, file, buffer, length);
+    return numRead;
 }
 
 tOffset hdfsTell(hdfsFS fs, hdfsFile file)
 {
     if (!file) {
+        errno = EINVAL;
         return -1;
     }
     return file->offset;
@@ -1027,29 +1201,51 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile fil
 char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
 {
     if (fs == NULL || buffer == NULL ||  bufferSize <= 0) {
+        errno = EINVAL;
         return NULL;
     }
-    
-    struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
-    if (bld->workingDir) {
-        strncpy(buffer, bld->workingDir, bufferSize);
+    if (snprintf(buffer, bufferSize, "%s", fs->workingDir) >= bufferSize) {
+        errno = ENAMETOOLONG;
+        return NULL;
     }
     return buffer;
 }
 
 int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
 {
+    char *newWorkingDir;
+    size_t strlenPath, newWorkingDirLen;
+
     if (fs == NULL || path == NULL) {
+        errno = EINVAL;
         return -1;
     }
-    
-    struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
-    free(bld->workingDir);
-    bld->workingDir = (char *)malloc(strlen(path) + 1);
-    if (!(bld->workingDir)) {
+    strlenPath = strlen(path);
+    if (strlenPath < 1) {
+        errno = EINVAL;
+        return -1;
+    }
+    if (path[0] != '/') {
+        // TODO: support non-absolute paths.  They should be interpreted
+        // relative to the current path.
+        errno = ENOTSUP;
+        return -1;
+    }
+    if (strstr(path, "//")) {
+        // TODO: support non-normalized paths (by normalizing them.)
+        errno = ENOTSUP;
         return -1;
     }
-    strcpy(bld->workingDir, path);
+    newWorkingDirLen = strlenPath + 2;
+    newWorkingDir = malloc(newWorkingDirLen);
+    if (!newWorkingDir) {
+        errno = ENOMEM;
+        return -1;
+    }
+    snprintf(newWorkingDir, newWorkingDirLen, "%s%s",
+             path, (path[strlenPath - 1] == '/') ? "" : "/");
+    free(fs->workingDir);
+    fs->workingDir = newWorkingDir;
     return 0;
 }
 
@@ -1065,49 +1261,58 @@ void hdfsFreeHosts(char ***blockHosts)
     free(blockHosts);
 }
 
-/* not useful for libwebhdfs */
-int hdfsFileUsesDirectRead(hdfsFile file)
+tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
 {
-    /* return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); */
-    fprintf(stderr, "hdfsFileUsesDirectRead is no longer useful for libwebhdfs.\n");
+    errno = ENOTSUP;
     return -1;
 }
 
-/* not useful for libwebhdfs */
+int hdfsFileUsesDirectRead(hdfsFile file)
+{
+    return 0; // webhdfs never performs direct reads.
+}
+
 void hdfsFileDisableDirectRead(hdfsFile file)
 {
-    /* file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; */
-    fprintf(stderr, "hdfsFileDisableDirectRead is no longer useful for libwebhdfs.\n");
+    // webhdfs never performs direct reads
 }
 
-/* not useful for libwebhdfs */
 int hdfsHFlush(hdfsFS fs, hdfsFile file)
 {
+    if (file->type != OUTPUT) {
+        errno = EINVAL; 
+        return -1;
+    }
+    // TODO: block until our write buffer is flushed
     return 0;
 }
 
-/* not useful for libwebhdfs */
 int hdfsFlush(hdfsFS fs, hdfsFile file)
 {
+    if (file->type != OUTPUT) {
+        errno = EINVAL; 
+        return -1;
+    }
+    // TODO: block until our write buffer is flushed
     return 0;
 }
 
 char*** hdfsGetHosts(hdfsFS fs, const char* path,
                      tOffset start, tOffset length)
 {
-    fprintf(stderr, "hdfsGetHosts is not but will be supported by libwebhdfs yet.\n");
+    errno = ENOTSUP;
     return NULL;
 }
 
 tOffset hdfsGetCapacity(hdfsFS fs)
 {
-    fprintf(stderr, "hdfsGetCapacity is not but will be supported by libwebhdfs.\n");
+    errno = ENOTSUP;
     return -1;
 }
 
 tOffset hdfsGetUsed(hdfsFS fs)
 {
-    fprintf(stderr, "hdfsGetUsed is not but will be supported by libwebhdfs yet.\n");
+    errno = ENOTSUP;
     return -1;
 }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c Fri Oct 12 00:15:22 2012
@@ -17,7 +17,7 @@
  */
 
 #include "expect.h"
-#include "webhdfs.h"
+#include "hdfs.h"
 
 #include <errno.h>
 #include <semaphore.h>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c Fri Oct 12 00:15:22 2012
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-#include "webhdfs.h"
+#include "hdfs.h"
 
 #include <inttypes.h>
 #include <jni.h>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c Fri Oct 12 00:15:22 2012
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-#include "webhdfs.h"
+#include "hdfs.h"
 
 #include <stdio.h>
 #include <stdlib.h>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c Fri Oct 12 00:15:22 2012
@@ -17,7 +17,7 @@
  */
 
 #include "expect.h"
-#include "webhdfs.h"
+#include "hdfs.h"
 
 #include <errno.h>
 #include <semaphore.h>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c Fri Oct 12 00:15:22 2012
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-#include "webhdfs.h"
+#include "hdfs.h"
 
 #include <limits.h>
 #include <stdio.h>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_read_bm.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_read_bm.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_read_bm.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_read_bm.c Fri Oct 12 00:15:22 2012
@@ -1,8 +1,9 @@
+#include "hdfs.h"
+
 #include <time.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/time.h>
-#include "webhdfs.h"
 
 #ifdef __MACH__
 #include <mach/clock.h>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Fri Oct 12 00:15:22 2012
@@ -17,7 +17,7 @@
 
 bin=`which $0`
 bin=`dirname ${bin}`
-bin=`cd "$bin"; pwd`
+bin=`cd "$bin" > /dev/null; pwd`
 
 DEFAULT_LIBEXEC_DIR="$bin"/../libexec
 HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
@@ -30,6 +30,7 @@ function print_usage(){
   echo "  namenode -format     format the DFS filesystem"
   echo "  secondarynamenode    run the DFS secondary namenode"
   echo "  namenode             run the DFS namenode"
+  echo "  journalnode          run the DFS journalnode"
   echo "  zkfc                 run the ZK Failover Controller daemon"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
@@ -90,6 +91,9 @@ elif [ "$COMMAND" = "datanode" ] ; then
   else
     HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
   fi
+elif [ "$COMMAND" = "journalnode" ] ; then
+  CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOURNALNODE_OPTS"
 elif [ "$COMMAND" = "dfs" ] ; then
   CLASS=org.apache.hadoop.fs.FsShell
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties Fri Oct 12 00:15:22 2012
@@ -19,7 +19,7 @@
 # See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
 
 *.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
-# default sampling period
+# default sampling period, in seconds
 *.period=10
 
 # The namenode-metrics.out will contain metrics from all context

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1363593-1396941
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1390763-1397380

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 12 00:15:22 2012
@@ -180,9 +180,21 @@ public class DFSConfigKeys extends Commo
   // Whether to enable datanode's stale state detection and usage
   public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
   public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
   // The default value of the time interval for marking datanodes as stale
   public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
-  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+  // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. 
+  // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
+  public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+  public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+  
+  // When the number stale datanodes marked as stale reached this certian ratio, 
+  // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+  public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+  public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
   // Replication monitoring related keys
   public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
@@ -395,4 +407,42 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
+  
+  // Journal-node related configs. These are read on the JN side.
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
+  public static final int     DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
+    
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
+  public static final int     DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
+
+  public static final String  DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
+  public static final String  DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";
+  public static final String  DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+
+  // Journal-node related configs for the client side.
+  public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
+  public static final int     DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
+  
+  // Quorum-journal timeouts for various operations. Unlikely to need
+  // to be tweaked, but configurable just in case.
+  public static final String  DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+  public static final String  DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+  public static final String  DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
+  public static final String  DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
+  public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Fri Oct 12 00:15:22 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -488,6 +489,34 @@ public class DFSUtil {
   }
 
   /**
+   * @return a collection of all configured NN Kerberos principals.
+   */
+  public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
+    Set<String> principals = new HashSet<String>();
+    for (String nsId : DFSUtil.getNameServiceIds(conf)) {
+      if (HAUtil.isHAEnabled(conf, nsId)) {
+        for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
+          Configuration confForNn = new Configuration(conf);
+          NameNode.initializeGenericKeys(confForNn, nsId, nnId);
+          String principal = SecurityUtil.getServerPrincipal(confForNn
+              .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+              NameNode.getAddress(confForNn).getHostName());
+          principals.add(principal);
+        }
+      } else {
+        Configuration confForNn = new Configuration(conf);
+        NameNode.initializeGenericKeys(confForNn, nsId, null);
+        String principal = SecurityUtil.getServerPrincipal(confForNn
+            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+            NameNode.getAddress(confForNn).getHostName());
+        principals.add(principal);
+      }
+    }
+
+    return principals;
+  }
+
+  /**
    * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
    * the configuration.
    * 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Fri Oct 12 00:15:22 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.ZKFCProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends 
     new Service("security.inter.datanode.protocol.acl", 
                 InterDatanodeProtocol.class),
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+    new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
         HAServiceProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Oct 12 00:15:22 2012
@@ -254,6 +254,9 @@ public class HftpFileSystem extends File
                   ", assuming security is disabled");
               return null;
             }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Exception getting delegation token", e);
+            }
             throw e;
           }
           for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Oct 12 00:15:22 2012
@@ -326,12 +326,15 @@ public class PBHelper {
   }
 
   public static RemoteEditLogProto convert(RemoteEditLog log) {
-    return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
-        .setStartTxId(log.getStartTxId()).build();
+    return RemoteEditLogProto.newBuilder()
+        .setStartTxId(log.getStartTxId())
+        .setEndTxId(log.getEndTxId())
+        .setIsInProgress(log.isInProgress()).build();
   }
 
   public static RemoteEditLog convert(RemoteEditLogProto l) {
-    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId(),
+        l.getIsInProgress());
   }
 
   public static RemoteEditLogManifestProto convert(

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Oct 12 00:15:22 2012
@@ -62,6 +62,8 @@ public class BlockPlacementPolicyDefault
   protected NetworkTopology clusterMap;
   private FSClusterStats stats;
   protected long heartbeatInterval;   // interval for DataNode heartbeats
+  private long staleInterval;   // interval used to identify stale DataNodes
+  
   /**
    * A miss of that many heartbeats is tolerated for replica deletion policy.
    */
@@ -78,7 +80,8 @@ public class BlockPlacementPolicyDefault
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
                          NetworkTopology clusterMap) {
-    this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
+    this.considerLoad = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
     this.heartbeatInterval = conf.getLong(
@@ -87,6 +90,9 @@ public class BlockPlacementPolicyDefault
     this.tolerateHeartbeatMultiplier = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
+    this.staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
   protected ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -155,9 +161,10 @@ public class BlockPlacementPolicyDefault
       writer=null;
     }
       
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, 
-                                                maxNodesPerRack, results);
+    boolean avoidStaleNodes = (stats != null
+        && stats.isAvoidingStaleDataNodesForWrite());
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (!returnChosenNodes) {  
       results.removeAll(chosenNodes);
     }
@@ -173,8 +180,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
-      
+                                          List<DatanodeDescriptor> results,
+                                          final boolean avoidStaleNodes) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
@@ -185,18 +192,21 @@ public class BlockPlacementPolicyDefault
     if (writer == null && !newBlock) {
       writer = results.get(0);
     }
-      
+
+    // Keep a copy of original excludedNodes
+    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
+        new HashMap<Node, Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
+        writer = chooseLocalNode(writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
+        chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
@@ -204,24 +214,36 @@ public class BlockPlacementPolicyDefault
       if (numOfResults <= 2) {
         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
           chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
+                           blocksize, maxNodesPerRack, 
+                           results, avoidStaleNodes);
         } else if (newBlock){
           chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, avoidStaleNodes);
         } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
+          chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
+              results, avoidStaleNodes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       LOG.warn("Not able to place enough replicas, still in need of "
                + numOfReplicas + " to reach " + totalReplicasExpected + "\n"
                + e.getMessage());
+      if (avoidStaleNodes) {
+        // ecxludedNodes now has - initial excludedNodes, any nodes that were
+        // chosen and nodes that were tried but were not chosen because they
+        // were stale, decommissioned or for any other reason a node is not
+        // chosen for write. Retry again now not avoiding stale node
+        for (Node node : results) {
+          oldExcludedNodes.put(node, node);
+        }
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+            maxNodesPerRack, results, false);
+      }
     }
     return writer;
   }
@@ -236,26 +258,27 @@ public class BlockPlacementPolicyDefault
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+                                             List<DatanodeDescriptor> results,
+                                             boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     if (preferLocalNode) {
       // otherwise try local machine first
       Node oldNode = excludedNodes.put(localMachine, localMachine);
       if (oldNode == null) { // was not in the excluded list
-        if (isGoodTarget(localMachine, blocksize,
-                         maxNodesPerRack, false, results)) {
+        if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
+            results, avoidStaleNodes)) {
           results.add(localMachine);
           return localMachine;
         }
       } 
     }      
     // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
+    return chooseLocalRack(localMachine, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes);
   }
     
   /* choose one node from the rack that <i>localMachine</i> is on.
@@ -270,19 +293,19 @@ public class BlockPlacementPolicyDefault
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+                                             List<DatanodeDescriptor> results,
+                                             boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     }
       
     // choose one from the local rack
     try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -296,18 +319,17 @@ public class BlockPlacementPolicyDefault
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
@@ -323,17 +345,19 @@ public class BlockPlacementPolicyDefault
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
+                                List<DatanodeDescriptor> results,
+                                boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
+                   maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
@@ -345,7 +369,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
+                                          List<DatanodeDescriptor> results,
+                                          boolean avoidStaleNodes) 
     throws NotEnoughReplicasException {
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -363,7 +388,8 @@ public class BlockPlacementPolicyDefault
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
       if (oldNode == null) { // chosenNode was not in the excluded list
         numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, 
+                maxNodesPerRack, results, avoidStaleNodes)) {
           results.add(chosenNode);
           adjustExcludedNodes(excludedNodes, chosenNode);
           return chosenNode;
@@ -390,7 +416,8 @@ public class BlockPlacementPolicyDefault
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
+                            List<DatanodeDescriptor> results,
+                            boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
       
     int numOfAvailableNodes =
@@ -409,7 +436,8 @@ public class BlockPlacementPolicyDefault
       if (oldNode == null) {
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, 
+              maxNodesPerRack, results, avoidStaleNodes)) {
           numOfReplicas--;
           results.add(chosenNode);
           adjustExcludedNodes(excludedNodes, chosenNode);
@@ -451,9 +479,10 @@ public class BlockPlacementPolicyDefault
    */
   private boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerRack,
-                        this.considerLoad, results);
+                               List<DatanodeDescriptor> results, 
+                               boolean avoidStaleNodes) {
+    return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
+        results, avoidStaleNodes);
   }
   
   /**
@@ -466,7 +495,8 @@ public class BlockPlacementPolicyDefault
    *                       the cluster and total number of replicas for a block
    * @param considerLoad whether or not to consider load of the target node
    * @param results A list containing currently chosen nodes. Used to check if 
-   *                too many nodes has been chosen in the target rack. 
+   *                too many nodes has been chosen in the target rack.
+   * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
    * @return Return true if <i>node</i> has enough space, 
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
@@ -474,7 +504,8 @@ public class BlockPlacementPolicyDefault
   protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
+                               List<DatanodeDescriptor> results,                           
+                               boolean avoidStaleNodes) {
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       if(LOG.isDebugEnabled()) {
@@ -485,6 +516,17 @@ public class BlockPlacementPolicyDefault
       return false;
     }
 
+    if (avoidStaleNodes) {
+      if (node.isStale(this.staleInterval)) {
+        if (LOG.isDebugEnabled()) {
+          threadLocalBuilder.get().append(node.toString()).append(": ")
+              .append("Node ").append(NodeBase.getPath(node))
+              .append(" is not chosen because the node is staled ");
+        }
+        return false;
+      }
+    }
+    
     long remaining = node.getRemaining() - 
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine



Mime
View raw message