hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1602280 [5/6] - in /hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native: ./ common/ fs/ jni/ ndfs/ rpc/ test/ test/common/ test/common/conf/ test/fs/
Date Thu, 12 Jun 2014 19:56:25 GMT
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/namenode-rpc-unit.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/namenode-rpc-unit.c?rev=1602280&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/namenode-rpc-unit.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/namenode-rpc-unit.c Thu Jun 12 19:56:23 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/user.h"
+#include "protobuf/ClientNamenodeProtocol.call.h"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+#include "test/test.h"
+
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <uv.h>
+
+struct options {
+    struct sockaddr_in remote;
+    char *username;
+};
+
+static void options_from_env(struct options *opts)
+{
+    const char *ip_str;
+    const char *port_str;
+    const char *username;
+    int res, port;
+
+    ip_str = getenv("HDFS_IP");
+    if (!ip_str) {
+        fprintf(stderr, "You must set an ip via the HDFS_IP "
+                "environment variable.\n");
+        exit(EXIT_FAILURE);
+    }
+    port_str = getenv("HDFS_PORT");
+    if (!port_str) {
+        fprintf(stderr, "You must set a port via the HDFS_PORT "
+                "environment variable.\n");
+        exit(EXIT_FAILURE);
+    }
+    port = atoi(port_str);
+    res = uv_ip4_addr(ip_str, port, &opts->remote);
+    if (res) {
+        fprintf(stderr, "Invalid IP and port %s and %d: error %s\n",
+                ip_str, port, uv_strerror(res));
+        exit(EXIT_FAILURE);
+    }
+    username = getenv("HDFS_USERNAME");
+    if (username) {
+        opts->username = strdup(username);
+        if (!opts->username)
+            abort();
+        fprintf(stderr, "using HDFS username %s\n", username);
+    } else {
+        res = geteuid_string(&opts->username);
+        if (res) {
+            fprintf(stderr, "geteuid_string failed with error %d\n", res);
+            abort();
+        }
+    }
+}
+
+void set_replication_cb(SetReplicationResponseProto *resp,
+                        struct hadoop_err *err, void *cb_data)
+{
+    uv_sem_t *sem = cb_data;
+
+    if (err) {
+        fprintf(stderr, "set_replication_cb: got an error.  %s\n",
+                hadoop_err_msg(err));
+    } else {
+        fprintf(stderr, "set_replication_cb: resp->result = %d\n",
+                !!resp->result);
+    }
+
+    uv_sem_post(sem);
+    if (err) {
+        hadoop_err_free(err);
+    }
+    if (resp) {
+        set_replication_response_proto__free_unpacked(resp, NULL);
+    }
+}
+
+
+
+int main(void)
+{
+    struct hrpc_messenger_builder *msgr_bld;
+    struct hrpc_messenger *msgr;
+    struct hrpc_proxy proxy;
+    struct options opts;
+    uv_sem_t sem;
+
+    memset(&opts, 0, sizeof(opts));
+    options_from_env(&opts);
+    msgr_bld = hrpc_messenger_builder_alloc();
+    EXPECT_NONNULL(msgr_bld);
+    EXPECT_NO_HADOOP_ERR(hrpc_messenger_create(msgr_bld, &msgr));
+
+    hrpc_proxy_init(&proxy, msgr, &opts.remote,
+            "org.apache.hadoop.hdfs.protocol.ClientProtocol",
+            opts.username);
+    EXPECT_INT_ZERO(uv_sem_init(&sem, 0));
+    {
+        SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
+        req.src = "/foo2";
+        req.replication = 2;
+        cnn_async_set_replication(&proxy, &req, set_replication_cb, &sem);
+    }
+    uv_sem_wait(&sem);
+
+    hrpc_messenger_shutdown(msgr);
+    hrpc_messenger_free(msgr);
+    uv_sem_destroy(&sem);
+
+    free(opts.username);
+    return EXIT_SUCCESS;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ndfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ndfs.c?rev=1602280&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ndfs.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ndfs.c Thu Jun 12 19:56:23 2014
@@ -0,0 +1,1081 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/hconf.h"
+#include "common/net.h"
+#include "common/string.h"
+#include "common/uri.h"
+#include "fs/common.h"
+#include "fs/fs.h"
+#include "protobuf/ClientNamenodeProtocol.call.h"
+#include "protobuf/hdfs.pb-c.h.s"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <uriparser/Uri.h>
+#include <uv.h>
+
+#define DEFAULT_NN_PORT 8020
+
+#define CLIENT_NN_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientProtocol"
+
+struct native_fs {
+    /** Fields common to all filesystems. */
+    struct hadoop_fs_base base;
+
+    /**
+     * Address of the namenode.
+     * TODO: implement HA failover
+     * TODO: implement IPv6
+     */
+    struct sockaddr_in nn_addr;
+
+    /** The messenger used to perform RPCs. */
+    struct hrpc_messenger *msgr;
+
+    /** The default block size obtained from getServerDefaults. */
+    int64_t default_block_size;
+
+    /** User name to use for RPCs.  Immutable. */
+    char *user_name;
+
+    /**
+     * A dynamically allocated working directory which will be prepended to
+     * all relative paths.
+     */
+    UriUriA *working_uri; 
+
+    /** Lock which protects the working_uri. */
+    uv_mutex_t working_uri_lock;
+};
+
+/**
+ * Set if the file is read-only... otherwise, the file is assumed to be
+ * write-only.
+ */
+#define NDFS_FILE_FLAG_RO                       0x1
+
+/** This flag is for compatibility with some old test harnesses. */
+#define NDFS_FILE_FLAG_DISABLE_DIRECT_READ      0x2
+
+/** Base class for both read-only and write-only files. */
+struct native_file_base {
+    /** Fields common to all filesystems. */
+    struct hadoop_file_base base;
+
+    /** NDFS file flags. */
+    int flags;
+};
+
+/** A read-only file. */
+struct native_ro_file {
+    struct native_file_base base;
+    uint64_t bytes_read;
+};
+
+/** A write-only file. */
+struct native_wo_file {
+    struct native_file_base base;
+};
+
+/** Whole-filesystem stats sent back from the NameNode. */
+struct hadoop_vfs_stats {
+    int64_t capacity;
+    int64_t used;
+    int64_t remaining;
+    int64_t under_replicated;
+    int64_t corrupt_blocks;
+    int64_t missing_blocks;
+};
+
+/** Server defaults sent back from the NameNode. */
+struct ndfs_server_defaults {
+    uint64_t blocksize;
+};
+
+static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri);
+
+static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
+{
+    hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
+                    fs->user_name);
+}
+
+/**
+ * Construct a canonical path from a URI.
+ *
+ * @param fs        The filesystem.
+ * @param uri       The URI.
+ * @param out       (out param) the canonical path.
+ *
+ * @return          NULL on success; the error otherwise.
+ */
+static struct hadoop_err *build_path(struct native_fs *fs, const char *uri_str,
+                                     char **out)
+{
+    char *path = NULL;
+    struct hadoop_err *err = NULL;
+    UriParserStateA uri_state;
+    UriUriA uri;
+
+    memset(&uri_state, 0, sizeof(uri_state));
+
+    uv_mutex_lock(&fs->working_uri_lock);
+    err = uri_parse(uri_str, &uri_state, &uri, fs->working_uri);
+    if (err)
+        goto done;
+    // TODO: check URI scheme and user against saved values?
+    err = uri_get_path(&uri, &path);
+    if (err)
+        goto done;
+    // As a special case, when the URI given has an empty path, we assume that
+    // we want the current working directory.  This is to allow things like
+    // hdfs://mynamenode to map to the current working directory, as they do in
+    // Hadoop.  Note that this is different than hdfs://mynamenode/ (note the
+    // trailing slash) which maps to the root directory.
+    if (!path[0]) {
+        free(path);
+        path = NULL;
+        err = uri_get_path(fs->working_uri, &path);
+        if (err) {
+            goto done;
+        }
+    }
+    err = NULL;
+
+done:
+    uv_mutex_unlock(&fs->working_uri_lock);
+    if (uri_state.uri) {
+        uriFreeUriMembersA(&uri);
+    }
+    if (err) {
+        free(path);
+        return err;
+    }
+    *out = path;
+    return NULL;
+}
+
+static int ndfs_file_is_open_for_read(hdfsFile bfile)
+{
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    return !!(file->flags & NDFS_FILE_FLAG_RO);
+}
+
+static int ndfs_file_is_open_for_write(hdfsFile bfile)
+{
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    return !(file->flags & NDFS_FILE_FLAG_RO);
+}
+
+static int ndfs_file_get_read_statistics(hdfsFile bfile,
+            struct hdfsReadStatistics **out)
+{
+    struct hdfsReadStatistics *stats;
+    struct native_ro_file *file = (struct native_ro_file *)bfile;
+
+    if (!(file->base.flags & NDFS_FILE_FLAG_RO)) {
+        errno = EINVAL;
+        return -1;
+    }
+    stats = calloc(1, sizeof(*stats));
+    if (!stats) {
+        errno = ENOMEM; 
+        return -1;
+    }
+    stats->totalBytesRead = file->bytes_read;
+    *out = stats;
+    return 0;
+}
+
+static struct hadoop_err *ndfs_get_server_defaults(struct native_fs *fs,
+            struct ndfs_server_defaults *defaults)
+{
+    struct hadoop_err *err = NULL;
+    GetServerDefaultsRequestProto req =
+        GET_SERVER_DEFAULTS_REQUEST_PROTO__INIT;
+    GetServerDefaultsResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = cnn_get_server_defaults(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    defaults->blocksize = resp->serverdefaults->blocksize;
+
+done:
+    if (resp) {
+        get_server_defaults_response_proto__free_unpacked(resp, NULL);
+    }
+    return err;
+}
+
+/**
+ * Parse an address in the form <hostname> or <hostname>:<port>.
+ *
+ * @param host          The hostname
+ * @param addr          (out param) The sockaddr.
+ * @param default_port  The default port to use, if one is not found in the
+ *                          string.
+ *
+ * @return              NULL on success; the error otherwise.
+ */
+static struct hadoop_err *parse_rpc_addr(const char *input,
+            struct sockaddr_in *out, int default_port)
+{
+    struct hadoop_err *err = NULL;
+    char *host, *colon;
+    uint32_t addr;
+    int port;
+
+    fprintf(stderr, "parse_rpc_addr(input=%s, default_port=%d)\n",
+            input, default_port);
+
+    // If the URI doesn't contain a port, we use a default.
+    // This may come either from the hdfsBuilder, or from the
+    // 'default default' for HDFS.
+    // It's kind of silly that hdfsBuilder even includes this field, since this
+    // information should just be included in the URI, but this is here for
+    // compatibility.
+    port = (default_port <= 0) ? DEFAULT_NN_PORT : default_port;
+    host = strdup(input);
+    if (!host) {
+        err = hadoop_lerr_alloc(ENOMEM, "parse_rpc_addr: OOM");
+        goto done;
+    }
+    colon = index(host, ':');
+    if (colon) {
+        // If the URI has a colon, we parse the next part as a port.
+        char *port_str = colon + 1;
+        *colon = '\0';
+        port = atoi(colon);
+        if ((port <= 0) || (port >= 65536)) {
+            err = hadoop_lerr_alloc(EINVAL, "parse_rpc_addr: invalid port "
+                                    "string %s", port_str);
+            goto done;
+        }
+    }
+    err = get_first_ipv4_addr(host, &addr);
+    if (err)
+        goto done;
+    out->sin_family = AF_INET;
+    out->sin_port = htons(port);
+    out->sin_addr.s_addr = htonl(addr);
+done:
+    free(host);
+    return err;
+}
+
+static struct hadoop_err *get_namenode_addr(const struct hdfsBuilder *hdfs_bld,
+            struct sockaddr_in *nn_addr)
+{
+    const char *nameservice_id;
+    const char *rpc_addr;
+
+    nameservice_id = hconf_get(hdfs_bld->hconf, "dfs.nameservice.id");
+    if (nameservice_id) {
+        return hadoop_lerr_alloc(ENOTSUP, "get_namenode_addr: we "
+                "don't yet support HA or federated configurations.");
+    }
+    rpc_addr = hconf_get(hdfs_bld->hconf, "dfs.namenode.rpc-address");
+    if (rpc_addr) {
+        return parse_rpc_addr(rpc_addr, nn_addr, hdfs_bld->port);
+    }
+    return parse_rpc_addr(hdfs_bld->uri_authority, nn_addr, hdfs_bld->port);
+}
+
+struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
+                                struct hdfs_internal **out)
+{
+    struct hadoop_err *err = NULL;
+    struct native_fs *fs = NULL;
+    struct hrpc_messenger_builder *msgr_bld;
+    struct ndfs_server_defaults defaults;
+    int working_dir_lock_created = 0;
+    char *working_dir = NULL;
+    UriParserStateA uri_state;
+
+    fs = calloc(1, sizeof(*fs));
+    if (!fs) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
+                                "for a native_fs structure.");
+        goto done;
+    }
+    fs->base.ty = HADOOP_FS_TY_NDFS;
+    fs->user_name = strdup(hdfs_bld->uri_user_info); 
+    if (!fs->user_name) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
+                                "for the user name.");
+        goto done;
+    }
+    msgr_bld = hrpc_messenger_builder_alloc();
+    if (!msgr_bld) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
+                                "for a messenger builder.");
+        goto done;
+    }
+    err = get_namenode_addr(hdfs_bld, &fs->nn_addr);
+    if (err)
+        goto done;
+    err = hrpc_messenger_create(msgr_bld, &fs->msgr);
+    if (err)
+        goto done;
+    // Get the default working directory
+    if (asprintf(&working_dir, "%s:///user/%s/",
+                 hdfs_bld->uri_scheme, hdfs_bld->uri_user_info) < 0) {
+        working_dir = NULL;
+        err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
+                                "working_dir");
+        goto done;
+    }
+    fs->working_uri = calloc(1, sizeof(*(fs->working_uri)));
+    if (!fs->working_uri) {
+        err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
+                                "fs->working_uri");
+        goto done;
+    }
+    err = uri_parse_abs(working_dir, &uri_state, fs->working_uri,
+                        hdfs_bld->uri_scheme);
+    if (err) {
+        free(fs->working_uri);
+        fs->working_uri = NULL;
+        goto done;
+    }
+    if (uv_mutex_init(&fs->working_uri_lock) < 0) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to create a mutex.");
+        goto done;
+    }
+    working_dir_lock_created = 1;
+
+    // Ask the NameNode about our server defaults.  We'll use this information
+    // later in ndfs_get_default_block_size, and when writing new files.  Just
+    // as important, tghis validates that we can talk to the NameNode with our
+    // current configuration.
+    memset(&defaults, 0, sizeof(defaults));
+    err = ndfs_get_server_defaults(fs, &defaults);
+    if (err)
+        goto done;
+    fs->default_block_size = defaults.blocksize;
+    err = NULL;
+
+done:
+    free(working_dir);
+    if (err) {
+        if (fs) {
+            free(fs->user_name);
+            if (fs->working_uri) {
+                uriFreeUriMembersA(fs->working_uri);
+                free(fs->working_uri);
+            }
+            if (working_dir_lock_created) {
+                uv_mutex_destroy(&fs->working_uri_lock);
+            }
+            free(fs);
+        }
+        return err; 
+    }
+    *out = (struct hdfs_internal *)fs;
+    return NULL; 
+}
+
+static int ndfs_disconnect(hdfsFS bfs)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+
+    hrpc_messenger_shutdown(fs->msgr);
+    hrpc_messenger_free(fs->msgr);
+    free(fs->user_name);
+    uriFreeUriMembersA(fs->working_uri);
+    free(fs->working_uri);
+    uv_mutex_destroy(&fs->working_uri_lock);
+    free(fs);
+    return 0;
+}
+
+static struct hadoop_err *ndfs_open_file_for_read(
+        struct native_ro_file **out __attribute__((unused)),
+        struct native_fs *fs __attribute__((unused)),
+        const char *uri __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+static struct hadoop_err *ndfs_open_file_for_write(
+        struct native_ro_file **out __attribute__((unused)),
+        struct native_fs *fs __attribute__((unused)),
+        const char *uri __attribute__((unused)),
+        int buffer_size __attribute__((unused)),
+        short replication __attribute__((unused)),
+        tSize block_size __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+static hdfsFile ndfs_open_file(hdfsFS bfs, const char* uri, int flags, 
+                      int buffer_size, short replication, tSize block_size)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct native_ro_file *file = NULL;
+    struct hadoop_err *err;
+    int accmode;
+    char *path = NULL;
+
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    accmode = flags & O_ACCMODE;
+    if (accmode == O_RDONLY) {
+        err = ndfs_open_file_for_read(&file, fs, path);
+    } else if (accmode == O_WRONLY) {
+        err = ndfs_open_file_for_write(&file, fs, path,
+                        buffer_size, replication, block_size);
+    } else {
+        err = hadoop_lerr_alloc(EINVAL, "cannot open a hadoop file in "
+                "mode 0x%x\n", accmode);
+    }
+done:
+    free(path);
+    return hadoopfs_errno_and_retptr(err, file);
+}
+
+static int ndfs_close_file(hdfsFS fs __attribute__((unused)),
+                           hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_file_exists(hdfsFS bfs, const char *uri)
+{
+    static hdfsFileInfo *info;
+
+    info = ndfs_get_path_info(bfs, uri);
+    if (!info) {
+        // errno will be set
+        return -1;
+    }
+    hdfsFreeFileInfo(info, 1);
+    return 0;
+}
+
+static int ndfs_seek(hdfsFS bfs __attribute__((unused)),
+                     hdfsFile bfile __attribute__((unused)),
+                     tOffset desiredPos __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static tOffset ndfs_tell(hdfsFS bfs __attribute__((unused)),
+                         hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static tSize ndfs_read(hdfsFS bfs __attribute__((unused)),
+                       hdfsFile bfile __attribute__((unused)),
+                       void *buffer __attribute__((unused)),
+                       tSize length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static tSize ndfs_pread(hdfsFS bfs __attribute__((unused)),
+            hdfsFile bfile __attribute__((unused)),
+            tOffset position __attribute__((unused)),
+            void* buffer __attribute__((unused)),
+            tSize length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static tSize ndfs_write(hdfsFS bfs __attribute__((unused)),
+            hdfsFile bfile __attribute__((unused)),
+            const void* buffer __attribute__((unused)),
+            tSize length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_flush(hdfsFS bfs __attribute__((unused)),
+                      hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_hflush(hdfsFS bfs __attribute__((unused)),
+                      hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_hsync(hdfsFS bfs __attribute__((unused)),
+                      hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_available(hdfsFS bfs __attribute__((unused)),
+                          hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_copy(hdfsFS srcFS __attribute__((unused)),
+                     const char* src __attribute__((unused)),
+                     hdfsFS dstFS __attribute__((unused)),
+                     const char* dst __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_move(hdfsFS srcFS __attribute__((unused)),
+                     const char* src __attribute__((unused)),
+                     hdfsFS dstFS __attribute__((unused)),
+                     const char* dst __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+static int ndfs_unlink(struct hdfs_internal *bfs,
+                const char *uri, int recursive)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+    struct hadoop_err *err = NULL;
+    DeleteRequestProto req = DELETE_REQUEST_PROTO__INIT;
+    struct hrpc_proxy proxy;
+    DeleteResponseProto *resp = NULL;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.recursive = !!recursive;
+    err = cnn_delete(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        delete_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static int ndfs_rename(hdfsFS bfs, const char *src_uri, const char *dst_uri)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+    struct hadoop_err *err = NULL;
+    Rename2RequestProto req = RENAME2_REQUEST_PROTO__INIT;
+    Rename2ResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *src_path = NULL, *dst_path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, src_uri, &src_path);
+    if (err) {
+        goto done;
+    }
+    err = build_path(fs, dst_uri, &dst_path);
+    if (err) {
+        goto done;
+    }
+    req.src = src_path;
+    req.dst = dst_path;
+    req.overwritedest = 0; // TODO: support overwrite
+    err = cnn_rename2(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(src_path);
+    free(dst_path);
+    if (resp) {
+        rename2_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static char* ndfs_get_working_directory(hdfsFS bfs, char *buffer,
+                                       size_t bufferSize)
+{
+    size_t len;
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    char *working_path = NULL;
+
+    uv_mutex_lock(&fs->working_uri_lock);
+    err = uri_get_path(fs->working_uri, &working_path);
+    if (err) {
+        err = hadoop_err_prepend(err, 0, "ndfs_get_working_directory: failed "
+                                 "to get the path of the working_uri.");
+        goto done;
+    }
+    len = strlen(working_path);
+    if (len + 1 > bufferSize) {
+        err = hadoop_lerr_alloc(ENAMETOOLONG, "ndfs_get_working_directory: "
+                "the buffer supplied was only %zd bytes, but we would need "
+                "%zd bytes to hold the working directory.",
+                bufferSize, len + 1);
+        goto done;
+    }
+    strcpy(buffer, working_path);
+done:
+    uv_mutex_unlock(&fs->working_uri_lock);
+    free(working_path);
+    return hadoopfs_errno_and_retptr(err, buffer);
+}
+
+static int ndfs_set_working_directory(hdfsFS bfs, const char* uri_str)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    char *path = NULL;
+    char *scheme = NULL;
+    struct hadoop_err *err = NULL;
+    UriParserStateA uri_state;
+    UriUriA *uri = NULL;
+
+    uv_mutex_lock(&fs->working_uri_lock);
+    uri = calloc(1, sizeof(*uri));
+    if (!uri) {
+        err = hadoop_lerr_alloc(ENOMEM, "ndfs_set_working_directory: OOM");
+        goto done;
+    }
+    err = uri_get_scheme(fs->working_uri, &scheme);
+    if (err) {
+        err = hadoop_err_prepend(err, ENOMEM, "ndfs_set_working_directory: "
+                            "failed to get scheme of current working_uri");
+        goto done;
+    }
+    err = build_path(fs, uri_str, &path);
+    if (err)
+        goto done;
+    err = uri_parse_abs(path, &uri_state, uri, scheme);
+    if (err)
+        goto done;
+    uriFreeUriMembersA(fs->working_uri);
+    free(fs->working_uri);
+    fs->working_uri = uri;
+    err = NULL;
+
+done:
+    if (err) {
+        free(uri);
+    }
+    uv_mutex_unlock(&fs->working_uri_lock);
+    free(scheme);
+    free(path);
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static int ndfs_mkdir(hdfsFS bfs, const char* uri)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    MkdirsRequestProto req = MKDIRS_REQUEST_PROTO__INIT;
+    MkdirsResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.createparent = 1; // TODO: add libhdfs API for non-recursive mkdir
+    err = cnn_mkdirs(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (!resp->result) {
+        err = hadoop_lerr_alloc(EEXIST, "ndfs_mkdir(%s): a path "
+                "component already exists as a non-directory.", path);
+        goto done;
+    }
+    err = NULL;
+
+done:
+    free(path);
+    if (resp) {
+        mkdirs_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static int ndfs_set_replication(hdfsFS bfs, const char* uri,
+                               int16_t replication)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
+    SetReplicationResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.replication = replication;
+    err = cnn_set_replication(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (!resp->result) {
+        err = hadoop_lerr_alloc(EINVAL, "ndfs_set_replication(%s): path "
+                "does not exist or is not a regular file.", path);
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_replication_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static hdfsFileInfo* ndfs_list_directory(hdfsFS bfs __attribute__((unused)),
+                                         const char* uri __attribute__((unused)),
+                                        int *numEntries __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs __attribute__((unused)),
+                                        const char* uri __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+char***
+ndfs_get_hosts(hdfsFS bfs __attribute__((unused)),
+               const char* path __attribute__((unused)),
+               tOffset start __attribute__((unused)),
+               tOffset length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+static tOffset ndfs_get_default_block_size(hdfsFS bfs)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    return fs->default_block_size;
+}
+
+static tOffset ndfs_get_default_block_size_at_path(hdfsFS bfs,
+                    const char *uri)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    GetPreferredBlockSizeRequestProto req =
+        GET_PREFERRED_BLOCK_SIZE_REQUEST_PROTO__INIT;
+    GetPreferredBlockSizeResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    tOffset ret = 0;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.filename = path;
+    err = cnn_get_preferred_block_size(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    ret = resp->bsize;
+    err = NULL;
+
+done:
+    free(path);
+    if (resp) {
+        get_preferred_block_size_response_proto__free_unpacked(resp, NULL);
+    }
+    if (err)
+        return hadoopfs_errno_and_retcode(err);
+    return ret;
+}
+
+static struct hadoop_err *ndfs_statvfs(struct hadoop_fs_base *hfs,
+        struct hadoop_vfs_stats *stats)
+{
+    struct native_fs *fs = (struct native_fs*)hfs;
+
+    GetFsStatusRequestProto req = GET_FS_STATUS_REQUEST_PROTO__INIT;
+    GetFsStatsResponseProto *resp = NULL;
+    struct hadoop_err *err = NULL;
+    struct hrpc_proxy proxy;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = cnn_get_fs_stats(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    stats->capacity = resp->capacity;
+    stats->used = resp->used;
+    stats->remaining = resp->remaining;
+    stats->under_replicated = resp->under_replicated;
+    stats->corrupt_blocks = resp->corrupt_blocks;
+    stats->missing_blocks = resp->missing_blocks;
+
+done:
+    if (resp) {
+        get_fs_stats_response_proto__free_unpacked(resp, NULL);
+    }
+    return err;
+}
+
+static tOffset ndfs_get_capacity(hdfsFS bfs)
+{
+    struct hadoop_err *err;
+    struct hadoop_vfs_stats stats;
+
+    err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
+    if (err)
+        return hadoopfs_errno_and_retcode(err);
+    return stats.capacity;
+}
+
+static tOffset ndfs_get_used(hdfsFS bfs)
+{
+    struct hadoop_err *err;
+    struct hadoop_vfs_stats stats;
+
+    err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
+    if (err)
+        return hadoopfs_errno_and_retcode(err);
+    return stats.used;
+}
+
+static int ndfs_chown(hdfsFS bfs, const char* uri,
+                      const char *user, const char *group)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    SetOwnerRequestProto req = SET_OWNER_REQUEST_PROTO__INIT;
+    SetOwnerResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.username = (char*)user;
+    req.groupname = (char*)group;
+    err = cnn_set_owner(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_owner_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static int ndfs_chmod(hdfsFS bfs, const char* uri, short mode)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    FsPermissionProto perm = FS_PERMISSION_PROTO__INIT;
+    SetPermissionRequestProto req = SET_PERMISSION_REQUEST_PROTO__INIT;
+    SetPermissionResponseProto *resp = NULL;
+    struct hadoop_err *err = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.permission = &perm;
+    perm.perm = mode;
+    err = cnn_set_permission(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_permission_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static int ndfs_utime(hdfsFS bfs, const char* uri,
+                      int64_t mtime, int64_t atime)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    SetTimesRequestProto req = SET_TIMES_REQUEST_PROTO__INIT ;
+    SetTimesResponseProto *resp = NULL;
+    struct hadoop_err *err = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    // If mtime or atime are -1, that means "no change."
+    // Otherwise, we need to multiply by 1000, to take into account the fact
+    // that libhdfs times are in seconds, and HDFS times are in milliseconds.
+    // It's unfortunate that libhdfs doesn't support the full millisecond
+    // precision.  We need to redo the API at some point.
+    if (mtime < 0) {
+        req.mtime = -1;
+    } else {
+        req.mtime = mtime;
+        req.mtime *= 1000;
+    }
+    if (atime < 0) {
+        req.atime = -1;
+    } else {
+        req.atime = atime;
+        req.atime *= 1000;
+    }
+    err = cnn_set_times(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_times_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static struct hadoopRzBuffer* ndfs_read_zero(
+            hdfsFile bfile __attribute__((unused)),
+            struct hadoopRzOptions *opts __attribute__((unused)),
+            int32_t maxLength __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+static void ndfs_rz_buffer_free(hdfsFile bfile __attribute__((unused)),
+                struct hadoopRzBuffer *buffer __attribute__((unused)))
+{
+}
+
+int ndfs_file_uses_direct_read(hdfsFile bfile)
+{
+    // Set the 'disable direct reads' flag so that old test harnesses designed
+    // to test jniFS will run against NDFS.  The flag doesn't do anything,
+    // since all reads are always direct in NDFS.
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    return (!(file->flags & NDFS_FILE_FLAG_DISABLE_DIRECT_READ));
+}
+
+void ndfs_file_disable_direct_read(hdfsFile bfile __attribute__((unused)))
+{
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    file->flags |= NDFS_FILE_FLAG_DISABLE_DIRECT_READ;
+}
+
+const struct hadoop_fs_ops g_ndfs_ops = {
+    .name = "ndfs",
+    .file_is_open_for_read = ndfs_file_is_open_for_read,
+    .file_is_open_for_write = ndfs_file_is_open_for_write,
+    .get_read_statistics = ndfs_file_get_read_statistics,
+    .connect = ndfs_connect,
+    .disconnect = ndfs_disconnect,
+    .open = ndfs_open_file,
+    .close = ndfs_close_file,
+    .exists = ndfs_file_exists,
+    .seek = ndfs_seek,
+    .tell = ndfs_tell,
+    .read = ndfs_read,
+    .pread = ndfs_pread,
+    .write = ndfs_write,
+    .flush = ndfs_flush,
+    .hflush = ndfs_hflush,
+    .hsync = ndfs_hsync,
+    .available = ndfs_available,
+    .copy = ndfs_copy,
+    .move = ndfs_move,
+    .unlink = ndfs_unlink,
+    .rename = ndfs_rename,
+    .get_working_directory = ndfs_get_working_directory,
+    .set_working_directory = ndfs_set_working_directory,
+    .mkdir = ndfs_mkdir,
+    .set_replication = ndfs_set_replication,
+    .list_directory = ndfs_list_directory,
+    .get_path_info = ndfs_get_path_info,
+    .get_hosts = ndfs_get_hosts,
+    .get_default_block_size = ndfs_get_default_block_size,
+    .get_default_block_size_at_path = ndfs_get_default_block_size_at_path,
+    .get_capacity = ndfs_get_capacity,
+    .get_used = ndfs_get_used,
+    .chown = ndfs_chown,
+    .chmod = ndfs_chmod,
+    .utime = ndfs_utime,
+    .read_zero = ndfs_read_zero,
+    .rz_buffer_free = ndfs_rz_buffer_free,
+
+    // test
+    .file_uses_direct_read = ndfs_file_uses_direct_read,
+    .file_disable_direct_read = ndfs_file_disable_direct_read,
+};
+
+// vim: ts=4:sw=4:tw=79:et

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/conn.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/conn.c?rev=1602280&r1=1602279&r2=1602280&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/conn.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/conn.c Thu Jun 12 19:56:23 2014
@@ -17,6 +17,7 @@
  */
 
 #include "common/hadoop_err.h"
+#include "common/net.h"
 #include "common/string.h"
 #include "protobuf/IpcConnectionContext.pb-c.h.s"
 #include "protobuf/ProtobufRpcEngine.pb-c.h.s"
@@ -348,8 +349,11 @@ struct hadoop_err *hrpc_conn_create_outb
     res = uv_tcp_connect(&conn->conn_req, &conn->stream,
             (struct sockaddr*)&conn->remote, conn_connect_cb);
     if (res) {
+        char remote_str[64] = { 0 };
         err = hadoop_uverr_alloc(res,
-                "hrpc_conn_create_outbound: uv_tcp_connect failed");
+                "hrpc_conn_create_outbound: uv_tcp_connect(%s) failed",
+                net_ipv4_name_and_port(&conn->remote, remote_str,
+                                       sizeof(remote_str)));
         goto done;
     }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/protoc-gen-hrpc.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/protoc-gen-hrpc.cc?rev=1602280&r1=1602279&r2=1602280&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/protoc-gen-hrpc.cc (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/protoc-gen-hrpc.cc Thu Jun 12 19:56:23 2014
@@ -378,13 +378,15 @@ private:
 "        $req_ty_uscore$__get_packed_size(req),\n"
 "        (hrpc_pack_cb_t)$req_ty_uscore$__pack,\n"
 "        hrpc_proxy_sync_cb, ctx);\n"
+"    uv_sem_wait(&ctx->sem);\n"
 "    if (ctx->err) {\n"
-"        hrpc_free_sync_ctx(ctx);\n"
-"        return ctx->err;\n"
+"        err = ctx->err;\n"
+"        hrpc_release_sync_ctx(ctx);\n"
+"        return err;\n"
 "    }\n"
 "    resp = $resp_ty_uscore$__unpack(NULL, ctx->resp.pb_len,\n"
 "                                                  ctx->resp.pb_base);\n"
-"    hrpc_free_sync_ctx(ctx);\n"
+"    hrpc_release_sync_ctx(ctx);\n"
 "    if (!resp) {\n"
 "        return hadoop_lerr_alloc(EINVAL,\n"
 "           \"$sync_call$: failed to parse response from server\");\n"

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.c?rev=1602280&r1=1602279&r2=1602280&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.c Thu Jun 12 19:56:23 2014
@@ -92,7 +92,7 @@ void *hrpc_proxy_alloc_userdata(struct h
 struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy)
 {
     struct hrpc_sync_ctx *ctx = 
-        hrpc_proxy_alloc_userdata(proxy, sizeof(struct hrpc_proxy));
+        hrpc_proxy_alloc_userdata(proxy, sizeof(*ctx));
     if (!ctx) {
         return NULL;
     }
@@ -103,7 +103,7 @@ struct hrpc_sync_ctx *hrpc_proxy_alloc_s
     return ctx;
 }
 
-void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx)
+void hrpc_release_sync_ctx(struct hrpc_sync_ctx *ctx)
 {
     free(ctx->resp.base);
     uv_sem_destroy(&ctx->sem);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.h?rev=1602280&r1=1602279&r2=1602280&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/proxy.h Thu Jun 12 19:56:23 2014
@@ -136,11 +136,12 @@ void *hrpc_proxy_alloc_userdata(struct h
 struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy);
 
 /**
- * Free a sync context allocated from a proxy.
+ * Release the memory associated with a sync context.  This doesn't free the
+ * context object itself.
  *
  * @param proxy                 The sync context.
  */
-void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx);
+void hrpc_release_sync_ctx(struct hrpc_sync_ctx *ctx);
 
 /**
  * A callback which synchronous RPCs can use.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/reactor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/reactor.c?rev=1602280&r1=1602279&r2=1602280&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/reactor.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/reactor.c Thu Jun 12 19:56:23 2014
@@ -104,9 +104,10 @@ static void reactor_async_start_outbound
 
     conn = reuse_idle_conn(reactor, &call->remote, call);
     if (conn) {
-        reactor_log_debug(reactor, "start_outbound(remote=%s) assigning to "
-                       "connection %p\n",
-            net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
+        reactor_log_debug(reactor, "reactor_async_start_outbound(remote=%s) "
+                          "assigning to connection %p\n",
+                net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
+                conn);
         hrpc_conn_start_outbound(conn, call);
     } else {
         err = hrpc_conn_create_outbound(reactor, call, &conn);
@@ -118,9 +119,10 @@ static void reactor_async_start_outbound
             hrpc_call_deliver_err(call, err);
             return;
         }
-        reactor_log_debug(reactor, "start_outbound(remote=%s) created new "
-                       "connection %p\n",
-            net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
+        reactor_log_debug(reactor, "reactor_async_start_outbound("
+                "remote=%s) created new connection %p\n",
+                net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
+                conn);
     }
     // Add or re-add the connection to the reactor's tree.
     RB_INSERT(hrpc_conns, &reactor->conns, conn);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/varint-unit.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/varint-unit.c?rev=1602280&r1=1602279&r2=1602280&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/varint-unit.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/rpc/varint-unit.c Thu Jun 12 19:56:23 2014
@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 
-#include "common/test.h"
 #include "rpc/varint.h"
+#include "test/test.h"
 
 #include <errno.h>
 #include <stdint.h>

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/core-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/core-site.xml?rev=1602280&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/core-site.xml
------------------------------------------------------------------------------
    svn:mime-type = application/xml

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/hdfs-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/hdfs-site.xml?rev=1602280&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/hdfs-site.xml
------------------------------------------------------------------------------
    svn:mime-type = application/xml

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/include.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/include.xml?rev=1602280&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/common/conf/include.xml
------------------------------------------------------------------------------
    svn:mime-type = application/xml

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_meta_ops.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_meta_ops.c?rev=1602280&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_meta_ops.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_meta_ops.c Thu Jun 12 19:56:23 2014
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/hdfs.h"
+#include "test/native_mini_dfs.h"
+#include "test/test.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/** Test performing metadata operations via libhdfs. */
+
+int main(void)
+{
+    struct hdfsBuilder *hdfs_bld = NULL;
+    hdfsFS fs = NULL;
+    struct NativeMiniDfsCluster* dfs_cluster = NULL;
+    struct NativeMiniDfsConf dfs_conf = {
+        .doFormat = 1,
+    };
+    const char *nn_uri;
+
+    nn_uri = getenv("NAMENODE_URI");
+    if (!nn_uri) {
+        dfs_cluster = nmdCreate(&dfs_conf);
+        EXPECT_NONNULL(dfs_cluster);
+        EXPECT_INT_ZERO(nmdWaitClusterUp(dfs_cluster));
+    }
+    hdfs_bld = hdfsNewBuilder();
+    if (nn_uri) {
+        hdfsBuilderSetNameNode(hdfs_bld, nn_uri);
+        EXPECT_INT_ZERO(hdfsBuilderConfSetStr(hdfs_bld,
+                "default.native.handler", "ndfs"));
+    } else {
+        hdfsBuilderSetNameNode(hdfs_bld, "localhost");
+        hdfsBuilderSetNameNodePort(hdfs_bld, nmdGetNameNodePort(dfs_cluster));
+    }
+    EXPECT_NONNULL(hdfs_bld);
+    fs = hdfsBuilderConnect(hdfs_bld);
+    EXPECT_NONNULL(fs);
+    EXPECT_INT_ZERO(hdfsDisconnect(fs));
+    if (dfs_cluster) {
+        EXPECT_INT_ZERO(nmdShutdown(dfs_cluster));
+        nmdFree(dfs_cluster);
+    }
+    return EXIT_SUCCESS;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_threaded.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_threaded.c?rev=1602280&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_threaded.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_threaded.c Thu Jun 12 19:56:23 2014
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/hdfs.h"
+#include "test/native_mini_dfs.h"
+#include "test/test.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TLH_MAX_THREADS 100
+
+#define TLH_DEFAULT_BLOCK_SIZE 134217728
+
+static sem_t tlhSem;
+
+static struct NativeMiniDfsCluster* tlhCluster;
+
+struct tlhThreadInfo {
+    /** Thread index */
+    int threadIdx;
+    /** 0 = thread was successful; error code otherwise */
+    int success;
+    /** pthread identifier */
+    pthread_t thread;
+};
+
+static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs,
+                                     const char *username)
+{
+    int ret, port;
+    hdfsFS hdfs;
+    struct hdfsBuilder *bld;
+    
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+        fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
+                "returned error %d\n", port);
+        return port;
+    }
+    bld = hdfsNewBuilder();
+    if (!bld)
+        return -ENOMEM;
+    hdfsBuilderSetForceNewInstance(bld);
+    hdfsBuilderSetNameNode(bld, "localhost");
+    hdfsBuilderSetNameNodePort(bld, port);
+    hdfsBuilderConfSetStr(bld, "dfs.block.size",
+                          TO_STR(TLH_DEFAULT_BLOCK_SIZE));
+    hdfsBuilderConfSetStr(bld, "dfs.blocksize",
+                          TO_STR(TLH_DEFAULT_BLOCK_SIZE));
+    if (username) {
+        hdfsBuilderSetUserName(bld, username);
+    }
+    hdfs = hdfsBuilderConnect(bld);
+    if (!hdfs) {
+        ret = -errno;
+        return ret;
+    }
+    *fs = hdfs;
+    return 0;
+}
+
+static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path)
+{
+    int64_t blockSize;
+    int ret;
+
+    blockSize = hdfsGetDefaultBlockSize(fs);
+    if (blockSize < 0) {
+        ret = errno;
+        fprintf(stderr, "hdfsGetDefaultBlockSize failed with error %d\n", ret);
+        return ret;
+    } else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) {
+        fprintf(stderr, "hdfsGetDefaultBlockSize got %"PRId64", but we "
+                "expected %d\n", blockSize, TLH_DEFAULT_BLOCK_SIZE);
+        return EIO;
+    }
+
+    blockSize = hdfsGetDefaultBlockSizeAtPath(fs, path);
+    if (blockSize < 0) {
+        ret = errno;
+        fprintf(stderr, "hdfsGetDefaultBlockSizeAtPath(%s) failed with "
+                "error %d\n", path, ret);
+        return ret;
+    } else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) {
+        fprintf(stderr, "hdfsGetDefaultBlockSizeAtPath(%s) got "
+                "%"PRId64", but we expected %d\n", 
+                path, blockSize, TLH_DEFAULT_BLOCK_SIZE);
+        return EIO;
+    }
+    return 0;
+}
+
+struct tlhPaths {
+    char prefix[256];
+    char file1[256];
+    char file2[256];
+};
+
+static int setupPaths(const struct tlhThreadInfo *ti, struct tlhPaths *paths)
+{
+    memset(paths, sizeof(*paths), 0);
+    if ((size_t)snprintf(paths->prefix, sizeof(paths->prefix), "/tlhData%04d",
+                 ti->threadIdx) >= sizeof(paths->prefix)) {
+        return ENAMETOOLONG;
+    }
+    if ((size_t)snprintf(paths->file1, sizeof(paths->file1), "%s/file1",
+                 paths->prefix) >= sizeof(paths->file1)) {
+        return ENAMETOOLONG;
+    }
+    if ((size_t)snprintf(paths->file2, sizeof(paths->file2), "%s/file2",
+                 paths->prefix) >= sizeof(paths->file2)) {
+        return ENAMETOOLONG;
+    }
+    return 0;
+}
+
+static int doTestHdfsOperations(hdfsFS fs, const struct tlhPaths *paths)
+{
+    char tmp[4096];
+    hdfsFile file;
+    int ret, expected;
+    hdfsFileInfo *fileInfo;
+    struct hdfsReadStatistics *readStats = NULL;
+
+    if (hdfsExists(fs, paths->prefix) == 0) {
+        EXPECT_INT_ZERO(hdfsDelete(fs, paths->prefix, 1));
+    }
+    EXPECT_INT_ZERO(hdfsCreateDirectory(fs, paths->prefix));
+
+    EXPECT_INT_ZERO(doTestGetDefaultBlockSize(fs, paths->prefix));
+
+    /* There should not be any file to open for reading. */
+    EXPECT_NULL(hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0));
+
+    /* hdfsOpenFile should not accept mode = 3 */
+    EXPECT_NULL(hdfsOpenFile(fs, paths->file1, 3, 0, 0, 0));
+
+    file = hdfsOpenFile(fs, paths->file1, O_WRONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+
+    /* TODO: implement writeFully and use it here */
+    expected = strlen(paths->prefix);
+    ret = hdfsWrite(fs, file, paths->prefix, expected);
+    if (ret < 0) {
+        ret = errno;
+        fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
+        return ret;
+    }
+    if (ret != expected) {
+        fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
+                "it wrote %d\n", ret, expected);
+        return EIO;
+    }
+    EXPECT_INT_ZERO(hdfsFlush(fs, file));
+    EXPECT_INT_ZERO(hdfsHSync(fs, file));
+    EXPECT_INT_ZERO(hdfsCloseFile(fs, file));
+
+    /* Let's re-open the file for reading */
+    file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+
+    EXPECT_INT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
+    errno = 0;
+    EXPECT_INT_ZERO(readStats->totalBytesRead);
+    EXPECT_INT_ZERO(readStats->totalLocalBytesRead);
+    EXPECT_INT_ZERO(readStats->totalShortCircuitBytesRead);
+    hdfsFileFreeReadStatistics(readStats);
+    /* TODO: implement readFully and use it here */
+    ret = hdfsRead(fs, file, tmp, sizeof(tmp));
+    if (ret < 0) {
+        ret = errno;
+        fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
+        return ret;
+    }
+    if (ret != expected) {
+        fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
+                "it read %d\n", ret, expected);
+        return EIO;
+    }
+    EXPECT_INT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
+    errno = 0;
+    EXPECT_INT_EQ(expected, readStats->totalBytesRead);
+    hdfsFileFreeReadStatistics(readStats);
+    EXPECT_INT_ZERO(memcmp(paths->prefix, tmp, expected));
+    EXPECT_INT_ZERO(hdfsCloseFile(fs, file));
+
+    // TODO: Non-recursive delete should fail?
+    //EXPECT_NONZERO(hdfsDelete(fs, prefix, 0));
+    EXPECT_INT_ZERO(hdfsCopy(fs, paths->file1, fs, paths->file2));
+
+    EXPECT_INT_ZERO(hdfsChown(fs, paths->file2, NULL, NULL));
+    EXPECT_INT_ZERO(hdfsChown(fs, paths->file2, NULL, "doop"));
+    fileInfo = hdfsGetPathInfo(fs, paths->file2);
+    EXPECT_NONNULL(fileInfo);
+    EXPECT_INT_ZERO(strcmp("doop", fileInfo->mGroup));
+    hdfsFreeFileInfo(fileInfo, 1);
+
+    EXPECT_INT_ZERO(hdfsChown(fs, paths->file2, "ha", "doop2"));
+    fileInfo = hdfsGetPathInfo(fs, paths->file2);
+    EXPECT_NONNULL(fileInfo);
+    EXPECT_INT_ZERO(strcmp("ha", fileInfo->mOwner));
+    EXPECT_INT_ZERO(strcmp("doop2", fileInfo->mGroup));
+    hdfsFreeFileInfo(fileInfo, 1);
+
+    EXPECT_INT_ZERO(hdfsChown(fs, paths->file2, "ha2", NULL));
+    fileInfo = hdfsGetPathInfo(fs, paths->file2);
+    EXPECT_NONNULL(fileInfo);
+    EXPECT_INT_ZERO(strcmp("ha2", fileInfo->mOwner));
+    EXPECT_INT_ZERO(strcmp("doop2", fileInfo->mGroup));
+    hdfsFreeFileInfo(fileInfo, 1);
+
+    snprintf(tmp, sizeof(tmp), "%s/nonexistent-file-name", paths->prefix);
+    EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, tmp, "ha3", NULL), ENOENT);
+    return 0;
+}
+
+static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
+{
+    hdfsFS fs = NULL;
+    struct tlhPaths paths;
+
+    fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
+        ti->threadIdx);
+    EXPECT_INT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+    EXPECT_INT_ZERO(setupPaths(ti, &paths));
+    // test some operations
+    EXPECT_INT_ZERO(doTestHdfsOperations(fs, &paths));
+    EXPECT_INT_ZERO(hdfsDisconnect(fs));
+    // reconnect as user "foo" and verify that we get permission errors
+    EXPECT_INT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "foo"));
+    EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, paths.file1, "ha3", NULL), EACCES);
+    EXPECT_INT_ZERO(hdfsDisconnect(fs));
+    // reconnect to do the final delete.
+    EXPECT_INT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+    EXPECT_INT_ZERO(hdfsDelete(fs, paths.prefix, 1));
+    EXPECT_INT_ZERO(hdfsDisconnect(fs));
+    return 0;
+}
+
+static void *testHdfsOperations(void *v)
+{
+    struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
+    int ret = testHdfsOperationsImpl(ti);
+    ti->success = ret;
+    return NULL;
+}
+
+static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
+{
+    int i, threadsFailed = 0;
+    const char *sep = "";
+
+    for (i = 0; i < tlhNumThreads; i++) {
+        if (ti[i].success != 0) {
+            threadsFailed = 1;
+        }
+    }
+    if (!threadsFailed) {
+        fprintf(stderr, "testLibHdfs: all threads succeeded.  SUCCESS.\n");
+        return EXIT_SUCCESS;
+    }
+    fprintf(stderr, "testLibHdfs: some threads failed: [");
+    for (i = 0; i < tlhNumThreads; i++) {
+        if (ti[i].success != 0) {
+            fprintf(stderr, "%s%d", sep, i);
+            sep = ", "; 
+        }
+    }
+    fprintf(stderr, "].  FAILURE.\n");
+    return EXIT_FAILURE;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(void)
+{
+    int i, tlhNumThreads;
+    const char *tlhNumThreadsStr;
+    struct tlhThreadInfo ti[TLH_MAX_THREADS];
+    struct NativeMiniDfsConf conf = {
+        .doFormat = 1,
+    };
+
+    tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
+    if (!tlhNumThreadsStr) {
+        tlhNumThreadsStr = "3";
+    }
+    tlhNumThreads = atoi(tlhNumThreadsStr);
+    if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
+        fprintf(stderr, "testLibHdfs: must have a number of threads "
+                "between 1 and %d inclusive, not %d\n",
+                TLH_MAX_THREADS, tlhNumThreads);
+        return EXIT_FAILURE;
+    }
+    memset(&ti[0], 0, sizeof(ti));
+    for (i = 0; i < tlhNumThreads; i++) {
+        ti[i].threadIdx = i;
+    }
+
+    EXPECT_INT_ZERO(sem_init(&tlhSem, 0, tlhNumThreads));
+    tlhCluster = nmdCreate(&conf);
+    EXPECT_NONNULL(tlhCluster);
+    EXPECT_INT_ZERO(nmdWaitClusterUp(tlhCluster));
+
+    for (i = 0; i < tlhNumThreads; i++) {
+        EXPECT_INT_ZERO(pthread_create(&ti[i].thread, NULL,
+            testHdfsOperations, &ti[i]));
+    }
+    for (i = 0; i < tlhNumThreads; i++) {
+        EXPECT_INT_ZERO(pthread_join(ti[i].thread, NULL));
+    }
+
+    EXPECT_INT_ZERO(nmdShutdown(tlhCluster));
+    nmdFree(tlhCluster);
+    EXPECT_INT_ZERO(sem_destroy(&tlhSem));
+    return checkFailures(ti, tlhNumThreads);
+}

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_zerocopy.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_zerocopy.c?rev=1602280&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_zerocopy.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/fs/test_libhdfs_zerocopy.c Thu Jun 12 19:56:23 2014
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/hdfs.h"
+#include "test/native_mini_dfs.h"
+#include "test/test.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TEST_FILE_NAME_LENGTH 128
+#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
+#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
+#define TEST_ZEROCOPY_NUM_BLOCKS 6
+#define SMALL_READ_LEN 16
+#define TEST_ZEROCOPY_FILE_LEN \
+  (((TEST_ZEROCOPY_NUM_BLOCKS - 1) * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + \
+    TEST_ZEROCOPY_LAST_BLOCK_SIZE)
+
+#define ZC_BUF_LEN 32768
+
+static uint8_t *getZeroCopyBlockData(int blockIdx)
+{
+    uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    int i;
+    if (!buf) {
+        fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+        exit(1);
+    }
+    for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
+      buf[i] = blockIdx + (i % 17);
+    }
+    return buf;
+}
+
+static int getZeroCopyBlockLen(int blockIdx)
+{
+    if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
+        return 0;
+    } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
+        return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
+    } else {
+        return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
+    }
+}
+
+static int expectFileStats(hdfsFile file,
+      uint64_t expectedTotalBytesRead,
+      uint64_t expectedTotalLocalBytesRead,
+      uint64_t expectedTotalShortCircuitBytesRead,
+      uint64_t expectedTotalZeroCopyBytesRead)
+{
+    struct hdfsReadStatistics *stats = NULL;
+    EXPECT_INT_ZERO(hdfsFileGetReadStatistics(file, &stats));
+    fprintf(stderr, "expectFileStats(expectedTotalBytesRead=%"PRId64", "
+            "expectedTotalLocalBytesRead=%"PRId64", "
+            "expectedTotalShortCircuitBytesRead=%"PRId64", "
+            "expectedTotalZeroCopyBytesRead=%"PRId64", "
+            "totalBytesRead=%"PRId64", "
+            "totalLocalBytesRead=%"PRId64", "
+            "totalShortCircuitBytesRead=%"PRId64", "
+            "totalZeroCopyBytesRead=%"PRId64")\n",
+            expectedTotalBytesRead,
+            expectedTotalLocalBytesRead,
+            expectedTotalShortCircuitBytesRead,
+            expectedTotalZeroCopyBytesRead,
+            stats->totalBytesRead,
+            stats->totalLocalBytesRead,
+            stats->totalShortCircuitBytesRead,
+            stats->totalZeroCopyBytesRead);
+    if (expectedTotalBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
+    }
+    if (expectedTotalLocalBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
+                      stats->totalLocalBytesRead);
+    }
+    if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
+                      stats->totalShortCircuitBytesRead);
+    }
+    if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
+                      stats->totalZeroCopyBytesRead);
+    }
+    hdfsFileFreeReadStatistics(stats);
+    return 0;
+}
+
+static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
+{
+    hdfsFile file = NULL;
+    struct hadoopRzOptions *opts = NULL;
+    struct hadoopRzBuffer *buffer = NULL;
+    uint8_t *block;
+
+    file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+    opts = hadoopRzOptionsAlloc();
+    EXPECT_NONNULL(opts);
+    EXPECT_INT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
+    /* haven't read anything yet */
+    EXPECT_INT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
+    block = getZeroCopyBlockData(0);
+    EXPECT_NONNULL(block);
+    /* first read is half of a block. */
+    fprintf(stderr, "WATERMELON: 0\n");
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+    fprintf(stderr, "WATERMELON: 1\n");
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+          hadoopRzBufferLength(buffer));
+    EXPECT_INT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    hadoopRzBufferFree(file, buffer);
+    /* read the next half of the block */
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+          hadoopRzBufferLength(buffer));
+    EXPECT_INT_ZERO(memcmp(hadoopRzBufferGet(buffer),
+          block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    hadoopRzBufferFree(file, buffer);
+    free(block);
+    EXPECT_INT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, 
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    /* Now let's read just a few bytes. */
+    buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
+    block = getZeroCopyBlockData(1);
+    EXPECT_NONNULL(block);
+    EXPECT_INT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
+    hadoopRzBufferFree(file, buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+                  hdfsTell(fs, file));
+    EXPECT_INT_ZERO(expectFileStats(file,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+
+    /* Clear 'skip checksums' and test that we can't do zero-copy reads any
+     * more.  Since there is no ByteBufferPool set, we should fail with
+     * EPROTONOSUPPORT.
+     */
+    EXPECT_INT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
+    EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
+
+    /* Verify that setting a NULL ByteBufferPool class works. */
+    EXPECT_INT_ZERO(hadoopRzOptionsSetByteBufferPool(opts, NULL));
+    EXPECT_INT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
+    EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
+
+    /* Now set a ByteBufferPool and try again.  It should succeed this time. */
+    EXPECT_INT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
+          ELASTIC_BYTE_BUFFER_POOL_CLASS));
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
+    EXPECT_INT_ZERO(expectFileStats(file,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+    EXPECT_INT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
+        TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
+    free(block);
+    block = getZeroCopyBlockData(2);
+    EXPECT_NONNULL(block);
+    EXPECT_INT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
+        (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+    hadoopRzBufferFree(file, buffer);
+
+    /* Check the result of a zero-length read. */
+    buffer = hadoopReadZero(file, opts, 0);
+    EXPECT_NONNULL(buffer);
+    EXPECT_NONNULL(hadoopRzBufferGet(buffer));
+    EXPECT_INT_EQ(0, hadoopRzBufferLength(buffer));
+    hadoopRzBufferFree(file, buffer);
+
+    /* Check the result of reading past EOF */
+    EXPECT_INT_EQ(0, hdfsSeek(fs, file, TEST_ZEROCOPY_FILE_LEN));
+    buffer = hadoopReadZero(file, opts, 1);
+    EXPECT_NONNULL(buffer);
+    EXPECT_NULL(hadoopRzBufferGet(buffer));
+    hadoopRzBufferFree(file, buffer);
+
+    /* Cleanup */
+    free(block);
+    hadoopRzOptionsFree(opts);
+    EXPECT_INT_ZERO(hdfsCloseFile(fs, file));
+    return 0;
+}
+
+static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
+                                  size_t testFileNameLen)
+{
+    int blockIdx, blockLen;
+    hdfsFile file;
+    uint8_t *data;
+
+    snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
+             getpid(), rand());
+    file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
+                        TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    EXPECT_NONNULL(file);
+    for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
+        blockLen = getZeroCopyBlockLen(blockIdx);
+        data = getZeroCopyBlockData(blockIdx);
+        EXPECT_NONNULL(data);
+        EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
+    }
+    EXPECT_INT_ZERO(hdfsCloseFile(fs, file));
+    return 0;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(void)
+{
+    int port;
+    struct NativeMiniDfsConf conf = {
+        .doFormat = 1,
+        .configureShortCircuit = 1,
+    };
+    char testFileName[TEST_FILE_NAME_LENGTH];
+    hdfsFS fs;
+    struct NativeMiniDfsCluster* cl;
+    struct hdfsBuilder *bld;
+
+    cl = nmdCreate(&conf);
+    EXPECT_NONNULL(cl);
+    EXPECT_INT_ZERO(nmdWaitClusterUp(cl));
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+        fprintf(stderr, "TEST_ERROR: test_zerocopy: "
+                "nmdGetNameNodePort returned error %d\n", port);
+        return EXIT_FAILURE;
+    }
+    bld = hdfsNewBuilder();
+    EXPECT_NONNULL(bld);
+    EXPECT_INT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
+    hdfsBuilderSetForceNewInstance(bld);
+    hdfsBuilderConfSetStr(bld, "dfs.block.size",
+                          TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    /* ensure that we'll always get our mmaps */
+    hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
+                          "true");
+    fs = hdfsBuilderConnect(bld);
+    EXPECT_NONNULL(fs);
+    EXPECT_INT_ZERO(createZeroCopyTestFile(fs, testFileName,
+          TEST_FILE_NAME_LENGTH));
+    EXPECT_INT_ZERO(doTestZeroCopyReads(fs, testFileName));
+    EXPECT_INT_ZERO(hdfsDisconnect(fs));
+    EXPECT_INT_ZERO(nmdShutdown(cl));
+    nmdFree(cl);
+    fprintf(stderr, "TEST_SUCCESS\n"); 
+    return EXIT_SUCCESS;
+}

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/native_mini_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/native_mini_dfs.c?rev=1602280&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/native_mini_dfs.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/test/native_mini_dfs.c Thu Jun 12 19:56:23 2014
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/hdfs.h"
+#include "fs/hdfs_test.h"
+#include "jni/exception.h"
+#include "jni/jni_helper.h"
+#include "test/native_mini_dfs.h"
+
+#include <errno.h>
+#include <jni.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
+#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
+#define HADOOP_CONF     "org/apache/hadoop/conf/Configuration"
+#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
+#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
+
+#define DFS_WEBHDFS_ENABLED_KEY "dfs.webhdfs.enabled"
+
+struct NativeMiniDfsCluster {
+    /**
+     * The NativeMiniDfsCluster object
+     */
+    jobject obj;
+
+    /**
+     * Path to the domain socket, or the empty string if there is none.
+     */
+    char domainSocketPath[PATH_MAX];
+};
+
+static int hdfsDisableDomainSocketSecurity(void)
+{
+    jthrowable jthr;
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
+    }
+    jthr = invokeMethod(env, NULL, STATIC, NULL,
+            "org/apache/hadoop/net/unix/DomainSocket",
+            "disableBindPathValidation", "()V");
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "DomainSocket#disableBindPathValidation");
+        return -1;
+    }
+    return 0;
+}
+
+static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
+              struct NativeMiniDfsCluster *cl, jobject cobj)
+{
+    jthrowable jthr;
+    char *tmpDir;
+
+    int ret = hdfsDisableDomainSocketSecurity();
+    if (ret) {
+        return newRuntimeError(env, "failed to disable hdfs domain "
+                               "socket security: error %d", ret);
+    }
+    jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
+    if (jthr) {
+        return jthr;
+    }
+    tmpDir = getenv("TMPDIR");
+    if (!tmpDir) {
+        tmpDir = "/tmp";
+    }
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+    if (jthr) {
+        return jthr;
+    }
+    return NULL;
+}
+
+struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
+{
+    struct NativeMiniDfsCluster* cl = NULL;
+    jobject bld = NULL, cobj = NULL, cluster = NULL;
+    jvalue  val;
+    JNIEnv *env = getJNIEnv();
+    jthrowable jthr;
+    jstring jconfStr = NULL;
+
+    if (!env) {
+        fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
+        return NULL;
+    }
+    cl = calloc(1, sizeof(struct NativeMiniDfsCluster));
+    if (!cl) {
+        fprintf(stderr, "nmdCreate: OOM");
+        goto error;
+    }
+    jthr = constructNewObjectOfClass(env, &cobj, HADOOP_CONF, "()V");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "nmdCreate: new Configuration");
+        goto error;
+    }
+    if (conf->webhdfsEnabled) {
+        jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                  "nmdCreate: new String");
+            goto error;
+        }
+        jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+                            "setBoolean", "(Ljava/lang/String;Z)V",
+                            jconfStr, conf->webhdfsEnabled);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                  "nmdCreate: Configuration::setBoolean");
+            goto error;
+        }
+    }
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setBoolean");
+        goto error;
+    }
+    // Disable 'minimum block size' -- it's annoying in tests.
+    (*env)->DeleteLocalRef(env, jconfStr);
+    jconfStr = NULL;
+    jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: new String");
+        goto error;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+                        "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setLong");
+        goto error;
+    }
+    // Creae MiniDFSCluster object
+    jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
+                    "(L"HADOOP_CONF";)V", cobj);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
+        goto error;
+    }
+    if (conf->configureShortCircuit) {
+        jthr = nmdConfigureShortCircuit(env, cl, cobj);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "nmdCreate: nmdConfigureShortCircuit error");
+            goto error;
+        }
+    }
+    jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
+            "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
+                              "Builder::format");
+        goto error;
+    }
+    (*env)->DeleteLocalRef(env, val.l);
+    if (conf->webhdfsEnabled) {
+        jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
+                        "nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";",
+                        conf->namenodeHttpPort);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
+                                  "Builder::nameNodeHttpPort");
+            goto error;
+        }
+        (*env)->DeleteLocalRef(env, val.l);
+    }
+    jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
+            "build", "()L" MINIDFS_CLUSTER ";");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Builder#build");
+        goto error;
+    }
+    cluster = val.l;
+	  cl->obj = (*env)->NewGlobalRef(env, val.l);
+    if (!cl->obj) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "nmdCreate: NewGlobalRef");
+        goto error;
+    }
+    (*env)->DeleteLocalRef(env, cluster);
+    (*env)->DeleteLocalRef(env, bld);
+    (*env)->DeleteLocalRef(env, cobj);
+    (*env)->DeleteLocalRef(env, jconfStr);
+    return cl;
+
+error:
+    (*env)->DeleteLocalRef(env, cluster);
+    (*env)->DeleteLocalRef(env, bld);
+    (*env)->DeleteLocalRef(env, cobj);
+    (*env)->DeleteLocalRef(env, jconfStr);
+    free(cl);
+    return NULL;
+}
+
+void nmdFree(struct NativeMiniDfsCluster* cl)
+{
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        fprintf(stderr, "nmdFree: getJNIEnv failed\n");
+        free(cl);
+        return;
+    }
+    (*env)->DeleteGlobalRef(env, cl->obj);
+    free(cl);
+}
+
+int nmdShutdown(struct NativeMiniDfsCluster* cl)
+{
+    JNIEnv *env = getJNIEnv();
+    jthrowable jthr;
+
+    if (!env) {
+        fprintf(stderr, "nmdShutdown: getJNIEnv failed\n");
+        return -EIO;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, cl->obj,
+            MINIDFS_CLUSTER, "shutdown", "()V");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "nmdShutdown: MiniDFSCluster#shutdown");
+        return -EIO;
+    }
+    return 0;
+}
+
+int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl)
+{
+    jthrowable jthr;
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        fprintf(stderr, "nmdWaitClusterUp: getJNIEnv failed\n");
+        return -EIO;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, cl->obj,
+            MINIDFS_CLUSTER, "waitClusterUp", "()V");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "nmdWaitClusterUp: MiniDFSCluster#waitClusterUp ");
+        return -EIO;
+    }
+    return 0;
+}
+
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
+{
+    JNIEnv *env = getJNIEnv();
+    jvalue jVal;
+    jthrowable jthr;
+
+    if (!env) {
+        fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
+        return -EIO;
+    }
+    // Note: this will have to be updated when HA nativeMiniDfs clusters are
+    // supported
+    jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj,
+            MINIDFS_CLUSTER, "getNameNodePort", "()I");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "nmdHdfsConnect: MiniDFSCluster#getNameNodePort");
+        return -EIO;
+    }
+    return jVal.i;
+}
+
+int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
+                               int *port, const char **hostName)
+{
+    JNIEnv *env = getJNIEnv();
+    jvalue jVal;
+    jobject jNameNode, jAddress;
+    jthrowable jthr;
+    int ret = 0;
+    const char *host;
+    
+    if (!env) {
+        fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
+        return -EIO;
+    }
+    // First get the (first) NameNode of the cluster
+    jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, MINIDFS_CLUSTER,
+                        "getNameNode", "()L" HADOOP_NAMENODE ";");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdGetNameNodeHttpAddress: "
+                              "MiniDFSCluster#getNameNode");
+        return -EIO;
+    }
+    jNameNode = jVal.l;
+    
+    // Then get the http address (InetSocketAddress) of the NameNode
+    jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
+                        "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "nmdGetNameNodeHttpAddress: "
+                                    "NameNode#getHttpAddress");
+        goto error_dlr_nn;
+    }
+    jAddress = jVal.l;
+    
+    jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+                        JAVA_INETSOCKETADDRESS, "getPort", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "nmdGetNameNodeHttpAddress: "
+                                    "InetSocketAddress#getPort");
+        goto error_dlr_addr;
+    }
+    *port = jVal.i;
+    
+    jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
+                        "getHostName", "()Ljava/lang/String;");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                    "nmdGetNameNodeHttpAddress: "
+                                    "InetSocketAddress#getHostName");
+        goto error_dlr_addr;
+    }
+    host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
+    *hostName = strdup(host);
+    (*env)->ReleaseStringUTFChars(env, jVal.l, host);
+    
+error_dlr_addr:
+    (*env)->DeleteLocalRef(env, jAddress);
+error_dlr_nn:
+    (*env)->DeleteLocalRef(env, jNameNode);
+    
+    return ret;
+}
+
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld)
+{
+    int port, ret;
+
+    hdfsBuilderSetNameNode(bld, "localhost");
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+      fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
+      return EIO;
+    }
+    hdfsBuilderSetNameNodePort(bld, port);
+    if (cl->domainSocketPath[0]) {
+      ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
+      if (ret) {
+          return ret;
+      }
+      ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+      if (ret) {
+          return ret;
+      }
+    }
+    return 0;
+}



Mime
View raw message