hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1371232 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/ src/main/native/fuse-dfs/ src/main/native/fuse-dfs/test/ src/main/native/libhdfs/ src/main/native/util/
Date Thu, 09 Aug 2012 14:45:09 GMT
Author: atm
Date: Thu Aug  9 14:45:08 2012
New Revision: 1371232

URL: http://svn.apache.org/viewvc?rev=1371232&view=rev
Log:
HDFS-3634. Add self-contained, mavenized fuse_dfs test. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1371232&r1=1371231&r2=1371232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug  9
14:45:08 2012
@@ -190,6 +190,9 @@ Release 2.0.1-alpha - UNRELEASED
     HDFS-3291. add test that covers HttpFS working w/ a non-HDFS Hadoop
     filesystem (tucu)
 
+    HDFS-3634. Add self-contained, mavenized fuse_dfs test. (Colin Patrick
+    McCabe via atm)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1371232&r1=1371231&r2=1371232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Thu
Aug  9 14:45:08 2012
@@ -107,6 +107,10 @@ set(LIBHDFS_VERSION "0.0.0")
 set_target_properties(hdfs PROPERTIES
     SOVERSION ${LIBHDFS_VERSION})
 
+add_library(posix_util
+    main/native/util/posix_util.c
+)
+
 add_executable(test_libhdfs_ops
     main/native/libhdfs/test/test_libhdfs_ops.c
 )

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1371232&r1=1371231&r2=1371232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
Thu Aug  9 14:45:08 2012
@@ -71,6 +71,16 @@ IF(FUSE_FOUND)
         m
         pthread
     )
+    add_executable(test_fuse_dfs
+        test/test_fuse_dfs.c
+        test/fuse_workload.c
+    )
+    target_link_libraries(test_fuse_dfs
+        ${FUSE_LIBRARIES}
+        native_mini_dfs
+        posix_util
+        pthread
+    )
 ELSE(FUSE_FOUND)
     IF(REQUIRE_FUSE)
         MESSAGE(FATAL_ERROR "Required component fuse_dfs could not be built.")

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c?rev=1371232&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
Thu Aug  9 14:45:08 2012
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fuse-dfs/test/fuse_workload.h"
+#include "libhdfs/expect.h"
+#include "util/posix_util.h"
+
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <utime.h>
+
+typedef int (*testReadDirFn)(const struct dirent *de, void *v);
+
+struct fileCtx {
+  int fd;
+  char *str;
+  int strLen;
+  char *path;
+};
+
+static const char *DIRS_A_AND_B[] = { "a", "b", NULL };
+static const char *DIRS_B_AND_C[] = { "b", "c", NULL };
+
+#define LONG_STR_LEN 1024
+#define NUM_FILE_CTX 3
+#define MAX_TRIES 120
+
+// TODO: implement/test access, mknod, symlink
+// TODO: rmdir on non-dir, non-empty dir
+// TODO: test unlink-during-writing
+// TODO: test weird open flags failing
+// TODO: test chown, chmod
+
+static int testReadDirImpl(DIR *dp, testReadDirFn fn, void *data)
+{
+  struct dirent *de;
+  int ret, noda = 0;
+
+  while (1) {
+    de = readdir(dp);
+    if (!de)
+      return noda;
+    if (!strcmp(de->d_name, "."))
+      continue;
+    if (!strcmp(de->d_name, ".."))
+      continue;
+    ret = fn(de, data);
+    if (ret < 0)
+      return ret;
+    ++noda;
+  }
+  return noda;
+}
+
+static int testReadDir(const char *dirName, testReadDirFn fn, void *data)
+{
+  int ret;
+  DIR *dp;
+
+  dp = opendir(dirName);
+  if (!dp) {
+    return -errno;
+  }
+  ret = testReadDirImpl(dp, fn, data);
+  closedir(dp);
+  return ret;
+}
+
+static int expectDirs(const struct dirent *de, void *v)
+{
+  const char **names = v;
+  const char **n;
+
+  for (n = names; *n; ++n) {
+    if (!strcmp(de->d_name, *n)) {
+      return 0;
+    }
+  }
+  return -ENOENT;
+}
+
+static int safeWrite(int fd, const void *buf, size_t amt)
+{
+  while (amt > 0) {
+    int r = write(fd, buf, amt);
+    if (r < 0) {
+      if (errno != EINTR)
+        return -errno;
+      continue;
+    }
+    amt -= r;
+    buf = (const char *)buf + r;
+  }
+  return 0;
+}
+
+static int safeRead(int fd, void *buf, int c)
+{
+  int res;
+  size_t amt = 0;
+
+  while (amt < c) {
+    res = read(fd, buf, c - amt);
+    if (res <= 0) {
+      if (res == 0)
+        return amt;
+      if (errno != EINTR)
+        return -errno;
+      continue;
+    }
+    amt += res;
+    buf = (char *)buf + res;
+  }
+  return amt;
+}
+
+int runFuseWorkloadImpl(const char *root, const char *pcomp,
+    struct fileCtx *ctx)
+{
+  char base[PATH_MAX], tmp[PATH_MAX], *tmpBuf;
+  char src[PATH_MAX], dst[PATH_MAX];
+  struct stat stBuf;
+  int ret, i, try;
+  struct utimbuf tbuf;
+  struct statvfs stvBuf;
+
+  // The root must be a directory
+  EXPECT_ZERO(stat(root, &stBuf));
+  EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
+
+  // base = <root>/<pcomp>.  base must not exist yet
+  snprintf(base, sizeof(base), "%s/%s", root, pcomp);
+  EXPECT_NEGATIVE_ONE_WITH_ERRNO(stat(base, &stBuf), ENOENT);
+
+  // mkdir <base>
+  RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(base, 0755));
+  EXPECT_ZERO(ret);
+
+  // rmdir <base>
+  RETRY_ON_EINTR_GET_ERRNO(ret, rmdir(base));
+  EXPECT_ZERO(ret);
+
+  // mkdir <base>
+  RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(base, 0755));
+  EXPECT_ZERO(ret);
+
+  // stat <base>
+  EXPECT_ZERO(stat(base, &stBuf));
+  EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
+
+  // mkdir <base>/a
+  snprintf(tmp, sizeof(tmp), "%s/a", base);
+  RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(tmp, 0755));
+  EXPECT_ZERO(ret);
+
+  /* readdir test */
+  EXPECT_INT_EQ(1, testReadDir(base, expectDirs, DIRS_A_AND_B));
+
+  // mkdir <base>/b
+  snprintf(tmp, sizeof(tmp), "%s/b", base);
+  RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(tmp, 0755));
+  EXPECT_ZERO(ret);
+
+  // readdir a and b
+  EXPECT_INT_EQ(2, testReadDir(base, expectDirs, DIRS_A_AND_B));
+
+  // rename a -> c
+  snprintf(src, sizeof(src), "%s/a", base);
+  snprintf(dst, sizeof(dst), "%s/c", base);
+  EXPECT_ZERO(rename(src, dst));
+
+  // readdir c and b
+  EXPECT_INT_EQ(-ENOENT, testReadDir(base, expectDirs, DIRS_A_AND_B));
+  EXPECT_INT_EQ(2, testReadDir(base, expectDirs, DIRS_B_AND_C));
+
+  // statvfs
+  memset(&stvBuf, 0, sizeof(stvBuf));
+  EXPECT_ZERO(statvfs(root, &stvBuf));
+
+  // set utime on base
+  memset(&tbuf, 0, sizeof(tbuf));
+  tbuf.actime = 123;
+  tbuf.modtime = 456;
+  EXPECT_ZERO(utime(base, &tbuf));
+
+  // stat(base)
+  EXPECT_ZERO(stat(base, &stBuf));
+  EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
+  //EXPECT_INT_EQ(456, stBuf.st_atime); // hdfs doesn't store atime on directories
+  EXPECT_INT_EQ(456, stBuf.st_mtime);
+
+  // open some files and write to them
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    snprintf(tmp, sizeof(tmp), "%s/b/%d", base, i);
+    ctx[i].path = strdup(tmp);
+    if (!ctx[i].path) {
+      fprintf(stderr, "FUSE_WORKLOAD: OOM on line %d\n", __LINE__);
+      return -ENOMEM;
+    }
+    ctx[i].strLen = strlen(ctx[i].str);
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    ctx[i].fd = open(ctx[i].path, O_RDONLY);
+    if (ctx[i].fd >= 0) {
+      fprintf(stderr, "FUSE_WORKLOAD: Error: was able to open %s for read before it "
+              "was created!\n", ctx[i].path);
+      return -EIO; 
+    }
+    ctx[i].fd = creat(ctx[i].path, 0755);
+    if (ctx[i].fd < 0) {
+      fprintf(stderr, "FUSE_WORKLOAD: Failed to create file %s for writing!\n",
+              ctx[i].path);
+      return -EIO;
+    }
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    EXPECT_ZERO(safeWrite(ctx[i].fd, ctx[i].str, ctx[i].strLen));
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
+    EXPECT_ZERO(ret);
+    ctx[i].fd = -1;
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    /* Bug: HDFS-2551.
+     * When a program writes a file, closes it, and immediately re-opens it,
+     * it might not appear to have the correct length.  This is because FUSE
+     * invokes the release() callback asynchronously.
+     *
+     * To work around this, we keep retrying until the file length is what we
+     * expect.
+     */
+    for (try = 0; try < MAX_TRIES; try++) {
+      EXPECT_ZERO(stat(ctx[i].path, &stBuf));
+      EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
+      if (ctx[i].strLen == stBuf.st_size) {
+        break;
+      }
+      sleepNoSig(1);
+    }
+    if (try == MAX_TRIES) {
+      fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length "
+              "%d; instead, it had length %lld\n",
+              ctx[i].path, ctx[i].strLen, (long long)stBuf.st_size);
+      return -EIO;
+    }
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    ctx[i].fd = open(ctx[i].path, O_RDONLY);
+    if (ctx[i].fd < 0) {
+      fprintf(stderr, "FUSE_WORKLOAD: Failed to open file %s for reading!\n",
+              ctx[i].path);
+      return -EIO;
+    }
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    tmpBuf = calloc(1, ctx[i].strLen);
+    if (!tmpBuf) {
+      fprintf(stderr, "FUSE_WORKLOAD: OOM on line %d\n", __LINE__);
+      return -ENOMEM;
+    }
+    EXPECT_INT_EQ(ctx[i].strLen, safeRead(ctx[i].fd, tmpBuf, ctx[i].strLen));
+    EXPECT_ZERO(memcmp(ctx[i].str, tmpBuf, ctx[i].strLen));
+    RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
+    ctx[i].fd = -1;
+    EXPECT_ZERO(ret);
+    free(tmpBuf);
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    EXPECT_ZERO(truncate(ctx[i].path, 0));
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    EXPECT_ZERO(stat(ctx[i].path, &stBuf));
+    EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
+    EXPECT_INT_EQ(0, stBuf.st_size);
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    RETRY_ON_EINTR_GET_ERRNO(ret, unlink(ctx[i].path));
+    EXPECT_ZERO(ret);
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    EXPECT_NEGATIVE_ONE_WITH_ERRNO(stat(ctx[i].path, &stBuf), ENOENT);
+  }
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    free(ctx[i].path);
+  }
+  EXPECT_ZERO(recursiveDelete(base));
+  return 0;
+}
+
+int runFuseWorkload(const char *root, const char *pcomp)
+{
+  int ret, err;
+  size_t i;
+  char longStr[LONG_STR_LEN];
+  struct fileCtx ctx[NUM_FILE_CTX] = {
+    {
+      .fd = -1,
+      .str = "hello, world",
+    },
+    {
+      .fd = -1,
+      .str = "A",
+    },
+    {
+      .fd = -1,
+      .str = longStr,
+    },
+  };
+  for (i = 0; i < LONG_STR_LEN - 1; i++) {
+    longStr[i] = 'a' + (i % 10);
+  }
+  longStr[LONG_STR_LEN - 1] = '\0';
+
+  ret = runFuseWorkloadImpl(root, pcomp, ctx);
+  // Make sure all file descriptors are closed, or else we won't be able to
+  // unmount
+  for (i = 0; i < NUM_FILE_CTX; i++) {
+    if (ctx[i].fd >= 0) {
+      RETRY_ON_EINTR_GET_ERRNO(err, close(ctx[i].fd));
+    }
+  }
+  return ret;
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h?rev=1371232&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
Thu Aug  9 14:45:08 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FUSE_WORKLOAD_H__
+#define __FUSE_WORKLOAD_H__
+
+/**
+ * Perform some FUSE operations.
+ *
+ * The operations will be performed under <root>/<pcomp>.
+ * This directory should not exist prior to the test, and should not be used by
+ * any other tests concurrently.
+ *
+ * @param root             The root directory for the testing.
+ * @param pcomp            Path component to add to root
+ *
+ * @return                 0 on success; negative error code otherwise
+ */
+int runFuseWorkload(const char *root, const char *pcomp);
+
+#endif

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c?rev=1371232&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
Thu Aug  9 14:45:08 2012
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fuse-dfs/test/fuse_workload.h"
+#include "libhdfs/expect.h"
+#include "libhdfs/hdfs.h"
+#include "libhdfs/native_mini_dfs.h"
+#include "util/posix_util.h"
+
+#include <ctype.h>
+#include <errno.h>
+#include <libgen.h>
+#include <limits.h>
+#include <mntent.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/** Exit status to return when there is an exec error.
+ *
+ * We assume that fusermount and fuse_dfs don't normally return this error code.
+ */
+#define EXIT_STATUS_EXEC_ERROR 240
+
+/** Maximum number of times to try to unmount the fuse filesystem we created.
+ */
+#define MAX_UNMOUNT_TRIES 20
+
+/**
+ * Verify that the fuse workload works on the local FS.
+ *
+ * @return        0 on success; error code otherwise
+ */
+static int verifyFuseWorkload(void)
+{
+  char tempDir[PATH_MAX];
+
+  EXPECT_ZERO(createTempDir(tempDir, sizeof(tempDir), 0755));
+  EXPECT_ZERO(runFuseWorkload(tempDir, "test"));
+  EXPECT_ZERO(recursiveDelete(tempDir));
+
+  return 0;
+}
+
+static int fuserMount(int *procRet, ...) __attribute__((sentinel));
+
+/**
+ * Invoke the fusermount binary.
+ *
+ * @param retVal  (out param) The return value from fusermount
+ *
+ * @return        0 on success; error code if the fork fails
+ */
+static int fuserMount(int *procRet, ...)
+{
+  int ret, status;
+  size_t i = 0;
+  char *args[64], *c, *env[] = { NULL };
+  va_list ap;
+  pid_t pid, pret;
+
+  args[i++] = "fusermount";
+  va_start(ap, procRet);
+  while (1) {
+    c = va_arg(ap, char *);
+    args[i++] = c;
+    if (!c)
+      break;
+    if (i > sizeof(args)/sizeof(args[0])) {
+      va_end(ap);
+      return -EINVAL;
+    }
+  }
+  va_end(ap);
+  pid = fork();
+  if (pid < 0) {
+    ret = errno;
+    fprintf(stderr, "FUSE_TEST: failed to fork: error %d: %s\n",
+            ret, strerror(ret));
+    return -ret;
+  } else if (pid == 0) {
+    if (execvpe("fusermount", args, env)) {
+      ret = errno;
+      fprintf(stderr, "FUSE_TEST: failed to execute fusermount: "
+              "error %d: %s\n", ret, strerror(ret));
+      exit(EXIT_STATUS_EXEC_ERROR);
+    }
+  }
+  pret = waitpid(pid, &status, 0);
+  if (pret != pid) {
+    ret = errno;
+    fprintf(stderr, "FUSE_TEST: failed to wait for pid %d: returned %d "
+            "(error %d: %s)\n", pid, pret, ret, strerror(ret));
+    return -ret;
+  }
+  if (WIFEXITED(status)) {
+    *procRet = WEXITSTATUS(status);
+  } else if (WIFSIGNALED(status)) {
+    fprintf(stderr, "FUSE_TEST: fusermount exited with signal %d\n",
+            WTERMSIG(status));
+    *procRet = -1;
+  } else {
+    fprintf(stderr, "FUSE_TEST: fusermount exited with unknown exit type\n");
+    *procRet = -1;
+  }
+  return 0;
+}
+
+static int isMounted(const char *mntPoint)
+{
+  int ret;
+  FILE *fp;
+  struct mntent *mntEnt;
+
+  fp = setmntent("/proc/mounts", "r");
+  if (!fp) {
+    ret = errno;
+    fprintf(stderr, "FUSE_TEST: isMounted(%s) failed to open /proc/mounts: "
+            "error %d: %s", mntPoint, ret, strerror(ret));
+    return -ret;
+  }
+  while ((mntEnt = getmntent(fp))) {
+    if (!strcmp(mntEnt->mnt_dir, mntPoint)) {
+      endmntent(fp);
+      return 1;
+    }
+  }
+  endmntent(fp);
+  return 0;
+}
+
+static int waitForMount(const char *mntPoint, int retries)
+{
+  int ret, try = 0;
+
+  while (try++ < retries) {
+    ret = isMounted(mntPoint);
+    if (ret < 0) {
+      fprintf(stderr, "FUSE_TEST: waitForMount(%s, %d): isMounted returned "
+              "error %d\n", mntPoint, retries, ret);
+    } else if (ret == 1) {
+      return 0;
+    }
+    sleepNoSig(2);
+  }
+  return -ETIMEDOUT;
+}
+
+/**
+ * Try to unmount the fuse filesystem we mounted.
+ *
+ * Normally, only one try should be sufficient to unmount the filesystem.
+ * However, if our tests needs to exit right after starting the fuse_dfs process,
+ * the fuse_dfs process might not have gotten around to mounting itself yet.  The
+ * retry loop in this function ensures that we always unmount FUSE before
+ * exiting, rather than leaving around a zombie fuse_dfs.
+ *
+ * @param mntPoint            Where the FUSE filesystem is mounted
+ * @return                    0 on success; error code otherwise
+ */
+static int cleanupFuse(const char *mntPoint)
+{
+  int ret, pret, tries = 0;
+
+  while (1) {
+    ret = fuserMount(&pret, "-u", mntPoint, NULL);
+    if (ret) {
+      fprintf(stderr, "FUSE_TEST: unmountFuse: failed to invoke fuserMount: "
+              "error %d\n", ret);
+      return ret;
+    }
+    if (pret == 0) {
+      fprintf(stderr, "FUSE_TEST: successfully unmounted FUSE filesystem.\n");
+      return 0;
+    }
+    if (tries++ > MAX_UNMOUNT_TRIES) {
+      return -EIO;
+    }
+    fprintf(stderr, "FUSE_TEST: retrying unmount in 2 seconds...\n");
+    sleepNoSig(2);
+  }
+}
+
+/**
+ * Create a fuse_dfs process using the miniDfsCluster we set up.
+ *
+ * @param argv0                 argv[0], as passed into main
+ * @param cluster               The NativeMiniDfsCluster to connect to
+ * @param mntPoint              The mount point
+ * @param pid                   (out param) the fuse_dfs process
+ *
+ * @return                      0 on success; error code otherwise
+ */
+static int spawnFuseServer(const char *argv0,
+    const struct NativeMiniDfsCluster *cluster, const char *mntPoint,
+    pid_t *pid)
+{
+  int ret, procRet;
+  char scratch[PATH_MAX], *dir, fusePath[PATH_MAX], portOpt[128];
+
+  snprintf(scratch, sizeof(scratch), "%s", argv0);
+  dir = dirname(scratch);
+  snprintf(fusePath, sizeof(fusePath), "%s/fuse_dfs", dir);
+  if (access(fusePath, X_OK)) {
+    fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to find fuse_dfs "
+            "binary at %s!\n", fusePath);
+    return -ENOENT;
+  }
+  /* Let's make sure no other FUSE filesystem is mounted at mntPoint */
+  ret = fuserMount(&procRet, "-u", mntPoint, NULL);
+  if (ret) {
+    fprintf(stderr, "FUSE_TEST: fuserMount -u %s failed with error %d\n",
+            mntPoint, ret);
+    return -EIO;
+  }
+  if (procRet == EXIT_STATUS_EXEC_ERROR) {
+    fprintf(stderr, "FUSE_TEST: fuserMount probably could not be executed\n");
+    return -EIO;
+  }
+  /* fork and exec the fuse_dfs process */
+  *pid = fork();
+  if (*pid < 0) {
+    ret = errno;
+    fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to fork: "
+            "error %d: %s\n", ret, strerror(ret));
+    return -ret;
+  } else if (*pid == 0) {
+    snprintf(portOpt, sizeof(portOpt), "-oport=%d",
+             nmdGetNameNodePort(cluster));
+    if (execl(fusePath, fusePath, "-obig_writes", "-oserver=hdfs://localhost",
+          portOpt, "-onopermissions", "-ononempty", "-oinitchecks",
+          mntPoint, "-f", NULL)) {
+      ret = errno;
+      fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to execv %s: "
+              "error %d: %s\n", fusePath, ret, strerror(ret));
+      exit(EXIT_STATUS_EXEC_ERROR);
+    }
+  }
+  return 0;
+}
+
+/**
+ * Test that we can start up fuse_dfs and do some stuff.
+ */
+int main(int argc, char **argv)
+{
+  int ret, pret, status;
+  pid_t fusePid;
+  const char *mntPoint;
+  char mntTmp[PATH_MAX] = "";
+  struct NativeMiniDfsCluster* tlhCluster;
+  struct NativeMiniDfsConf conf = {
+      .doFormat = 1,
+  };
+
+  mntPoint = getenv("TLH_FUSE_MNT_POINT");
+  if (!mntPoint) {
+    if (createTempDir(mntTmp, sizeof(mntTmp), 0755)) {
+      fprintf(stderr, "FUSE_TEST: failed to create temporary directory for "
+              "fuse mount point.\n");
+      ret = EXIT_FAILURE;
+      goto done;
+    }
+    fprintf(stderr, "FUSE_TEST: creating mount point at '%s'\n", mntTmp);
+    mntPoint = mntTmp;
+  }
+  if (verifyFuseWorkload()) {
+    fprintf(stderr, "FUSE_TEST: failed to verify fuse workload on "
+            "local FS.\n");
+    ret = EXIT_FAILURE;
+    goto done_rmdir;
+  }
+  tlhCluster = nmdCreate(&conf);
+  if (!tlhCluster) {
+    ret = EXIT_FAILURE;
+    goto done_rmdir;
+  }
+  if (nmdWaitClusterUp(tlhCluster)) {
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  ret = spawnFuseServer(argv[0], tlhCluster, mntPoint, &fusePid);
+  if (ret) {
+    fprintf(stderr, "FUSE_TEST: spawnFuseServer failed with error "
+            "code %d\n", ret);
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  ret = waitForMount(mntPoint, 20);
+  if (ret) {
+    fprintf(stderr, "FUSE_TEST: waitForMount(%s) failed with error "
+            "code %d\n", mntPoint, ret);
+    cleanupFuse(mntPoint);
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  ret = runFuseWorkload(mntPoint, "test");
+  if (ret) {
+    fprintf(stderr, "FUSE_TEST: runFuseWorkload failed with error "
+            "code %d\n", ret);
+    cleanupFuse(mntPoint);
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  if (cleanupFuse(mntPoint)) {
+    fprintf(stderr, "FUSE_TEST: fuserMount -u %s failed with error "
+            "code %d\n", mntPoint, ret);
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  alarm(120);
+  pret = waitpid(fusePid, &status, 0);
+  if (pret != fusePid) {
+    ret = errno;
+    fprintf(stderr, "FUSE_TEST: failed to wait for fusePid %d: "
+            "returned %d: error %d (%s)\n",
+            fusePid, pret, ret, strerror(ret));
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  if (WIFEXITED(status)) {
+    ret = WEXITSTATUS(status);
+    if (ret) {
+      fprintf(stderr, "FUSE_TEST: fuse exited with failure status "
+              "%d!\n", ret);
+      ret = EXIT_FAILURE;
+      goto done_nmd_shutdown;
+    }
+  } else if (WIFSIGNALED(status)) {
+    ret = WTERMSIG(status);
+    if (ret != SIGTERM) {
+      fprintf(stderr, "FUSE_TEST: fuse exited with unexpected "
+              "signal %d!\n", ret);
+      ret = EXIT_FAILURE;
+      goto done_nmd_shutdown;
+    }
+  } else {
+    fprintf(stderr, "FUSE_TEST: fusermount exited with unknown exit type\n");
+    ret = EXIT_FAILURE;
+    goto done_nmd_shutdown;
+  }
+  ret = EXIT_SUCCESS;
+
+done_nmd_shutdown:
+  EXPECT_ZERO(nmdShutdown(tlhCluster));
+  nmdFree(tlhCluster);
+done_rmdir:
+  if (mntTmp[0]) {
+    rmdir(mntTmp);
+  }
+done:
+  if (ret == EXIT_SUCCESS) {
+    fprintf(stderr, "FUSE_TEST: SUCCESS.\n");
+  } else {
+    fprintf(stderr, "FUSE_TEST: FAILURE!\n");
+  }
+  return ret;
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1371232&r1=1371231&r2=1371232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
Thu Aug  9 14:45:08 2012
@@ -78,7 +78,7 @@
     do { \
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
-        if (__my_ret__) { \
+        if (!__my_ret__) { \
             fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
 		    "code %d (errno: %d): got zero from %s\n", __LINE__, \
                 __my_ret__, __my_errno__, #x); \
@@ -98,4 +98,23 @@
         } \
     } while (0);
 
+#define EXPECT_INT_EQ(x, y) \
+    do { \
+        int __my_ret__ = y; \
+        int __my_errno__ = errno; \
+        if (__my_ret__ != (x)) { \
+            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+              "code %d (errno: %d): expected %d\n", \
+               __LINE__, __my_ret__, __my_errno__, (x)); \
+            return -1; \
+        } \
+    } while (0);
+
+#define RETRY_ON_EINTR_GET_ERRNO(ret, expr) do { \
+    ret = expr; \
+    if (!ret) \
+        break; \
+    ret = -errno; \
+    } while (ret == -EINTR);
+
 #endif

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1371232&r1=1371231&r2=1371232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
Thu Aug  9 14:45:08 2012
@@ -156,7 +156,7 @@ int nmdWaitClusterUp(struct NativeMiniDf
     return 0;
 }
 
-int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl)
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
 {
     JNIEnv *env = getJNIEnv();
     jvalue jVal;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1371232&r1=1371231&r2=1371232&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
Thu Aug  9 14:45:08 2012
@@ -76,6 +76,5 @@ void nmdFree(struct NativeMiniDfsCluster
  *
  * @return          the port, or a negative error code
  */
-int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl);
-
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); 
 #endif

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c?rev=1371232&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
Thu Aug  9 14:45:08 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/posix_util.h"
+
+#include <dirent.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static pthread_mutex_t gTempdirLock = PTHREAD_MUTEX_INITIALIZER;
+
+static int gTempdirNonce = 0;
+
+int recursiveDeleteContents(const char *path)
+{
+  int ret;
+  DIR *dp;
+  struct dirent *de;
+  char tmp[PATH_MAX];
+
+  dp = opendir(path);
+  if (!dp) {
+    ret = -errno;
+    fprintf(stderr, "recursiveDelete(%s) failed with error %d\n", path, ret);
+    return ret;
+  }
+  while (1) {
+    de = readdir(dp);
+    if (!de) {
+      ret = 0;
+      break;
+    }
+    if ((de->d_name[0] == '.') && (de->d_name[1] == '\0'))
+      continue;
+    if ((de->d_name[0] == '.') && (de->d_name[1] == '.') &&
+        (de->d_name[2] == '\0'))
+      continue;
+    snprintf(tmp, sizeof(tmp), "%s/%s", path, de->d_name);
+    ret = recursiveDelete(tmp);
+    if (ret)
+      break;
+  }
+  if (closedir(dp)) {
+    ret = -errno;
+    fprintf(stderr, "recursiveDelete(%s): closedir failed with "
+            "error %d\n", path, ret);
+    return ret;
+  }
+  return ret;
+}
+
+/*
+ * Simple recursive delete implementation.
+ * It could be optimized, but there is no need at the moment.
+ * TODO: use fstat, etc.
+ */
+int recursiveDelete(const char *path)
+{
+  int ret;
+  struct stat stBuf;
+
+  ret = stat(path, &stBuf);
+  if (ret != 0) {
+    ret = -errno;
+    fprintf(stderr, "recursiveDelete(%s): stat failed with "
+            "error %d\n", path, ret);
+    return ret;
+  }
+  if (S_ISDIR(stBuf.st_mode)) {
+    ret = recursiveDeleteContents(path);
+    if (ret)
+      return ret;
+    ret = rmdir(path);
+    if (ret) {
+      ret = errno;
+      fprintf(stderr, "recursiveDelete(%s): rmdir failed with error %d\n",
+              path, ret);
+      return ret;
+    }
+  } else {
+    ret = unlink(path);
+    if (ret) {
+      ret = -errno;
+      fprintf(stderr, "recursiveDelete(%s): unlink failed with "
+              "error %d\n", path, ret);
+      return ret;
+    }
+  }
+  return 0;
+}
+
+int createTempDir(char *tempDir, int nameMax, int mode)
+{
+  char tmp[PATH_MAX];
+  int pid, nonce;
+  const char *base = getenv("TMPDIR");
+  if (!base)
+    base = "/tmp";
+  if (base[0] != '/') {
+    // canonicalize non-absolute TMPDIR
+    if (realpath(base, tmp) == NULL) {
+      return -errno;
+    }
+    base = tmp;
+  }
+  pid = getpid();
+  pthread_mutex_lock(&gTempdirLock);
+  nonce = gTempdirNonce++;
+  pthread_mutex_unlock(&gTempdirLock);
+  snprintf(tempDir, nameMax, "%s/temp.%08d.%08d", base, pid, nonce);
+  if (mkdir(tempDir, mode) == -1) {
+    int ret = errno;
+    return -ret;
+  }
+  return 0;
+}
+
+void sleepNoSig(int sec)
+{
+  int ret;
+  struct timespec req, rem;
+
+  rem.tv_sec = sec;
+  rem.tv_nsec = 0;
+  do {
+    req = rem;
+    ret = nanosleep(&req, &rem);
+  } while ((ret == -1) && (errno == EINTR));
+  if (ret == -1) {
+    ret = errno;
+    fprintf(stderr, "nanosleep error %d (%s)\n", ret, strerror(ret));
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h?rev=1371232&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
Thu Aug  9 14:45:08 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __POSIX_UTIL_H__
+#define __POSIX_UTIL_H__
+
+/**
+ * Recursively delete the contents of a directory
+ *
+ * @param path      directory whose contents we should delete
+ *
+ * @return          0 on success; error code otherwise
+ */
+int recursiveDeleteContents(const char *path);
+
+/**
+ * Recursively delete a local path, using unlink or rmdir as appropriate.
+ *
+ * @param path      path to delete
+ *
+ * @return          0 on success; error code otherwise
+ */
+int recursiveDelete(const char *path);
+
+/**
+ * Get a temporary directory 
+ *
+ * @param tempDir   (out param) path to the temporary directory
+ * @param nameMax   Length of the tempDir buffer
+ * @param mode      Mode to create with
+ *
+ * @return          0 on success; error code otherwise
+ */
+int createTempDir(char *tempDir, int nameMax, int mode);
+
+/**
+ * Sleep without using signals
+ *
+ * @param sec       Number of seconds to sleep
+ */
+void sleepNoSig(int sec);
+
+#endif



Mime
View raw message