hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r706358 - in /hadoop/core/branches/branch-0.18: ./ src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/fuse-dfs/src/test/
Date Mon, 20 Oct 2008 18:23:48 GMT
Author: dhruba
Date: Mon Oct 20 11:23:48 2008
New Revision: 706358

URL: http://svn.apache.org/viewvc?rev=706358&view=rev
Log:
HADOOP-4399. Make fuse-dfs multi-thread access safe.
(Pete Wyckoff via dhruba)


Added:
    hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/test/
    hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/test/TestFuseDFS.java
Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/README
    hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/build.xml
    hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/Makefile.am
    hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/fuse_dfs.c

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=706358&r1=706357&r2=706358&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Mon Oct 20 11:23:48 2008
@@ -28,6 +28,9 @@
 
     HADOOP-4292. Do not support append() for LocalFileSystem. (hairong)
 
+    HADOOP-4399. Make fuse-dfs multi-thread access safe.
+    (Pete Wyckoff via dhruba)
+
   NEW FEATURES
 
     HADOOP-2421.  Add jdiff output to documentation, listing all API

Modified: hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/README
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/README?rev=706358&r1=706357&r2=706358&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/README (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/README Mon Oct 20 11:23:48 2008
@@ -34,7 +34,7 @@
 
 To build:
 
-   1. in HADOOP_HOME: ant compile-contrib -Dcompile.c++=1 -Dfusedfs=1
+   1. in HADOOP_HOME: ant compile-contrib -Dcompile.c++=1 -Dfusedfs=1 -Dlibhdfs.noperms=1
 
 
 NOTE: for amd64 architecture, libhdfs will not compile unless you edit

Modified: hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/build.xml?rev=706358&r1=706357&r2=706358&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/build.xml (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/build.xml Mon Oct 20 11:23:48 2008
@@ -30,22 +30,48 @@
     </condition>
   </target>
 
+
+  <target name="check-libhdfs-exists" if="fusedfs">
+  <property name="libhdfs.lib" value="${hadoop.root}/build/libhdfs/libhdfs.so"/>
+        <available file="${libhdfs.lib}" property="libhdfs-exists"/>
+    <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile-libhdfs -Dlibhdfs=1">
+         <condition>
+            <not><isset property="libhdfs-exists"/></not>
+          </condition>
+   </fail>
+   </target>
+
   <!-- override compile target !-->
-  <target name="compile" depends="check-libhdfs-fuse" if="libhdfs-fuse">
+  <target name="compile" depends="check-libhdfs-fuse,check-libhdfs-exists" if="libhdfs-fuse">
     <echo message="contrib: ${name}"/>
 
+    <condition property="perms" value="1" else="0">
+    <not>
+      <isset property="libhdfs.noperms"/>
+    </not>
+    </condition>
+
     <exec executable="/bin/sh" failonerror="true">
       <arg value="${root}/bootstrap.sh"/>
     </exec>
-
     <exec executable="make" failonerror="true">
       <env key="OS_NAME" value="${os.name}"/>
       <env key="OS_ARCH" value="${os.arch}"/>
       <env key="HADOOP_HOME" value="${hadoop.root}"/>
-      <env key="PROTECTED_PATHS" value="/,/Trash,/user"/>
       <env key="PACKAGE_VERSION" value="0.1.0"/>
-      <env key="FUSE_HOME" value="/usr/local"/>
+
+      <env key="PERMS" value="${perms}"/>
+    </exec>
+    <mkdir dir="${build.dir}"/>
+    <mkdir dir="${build.dir}/test"/>
+    <exec executable="cp" failonerror="true">
+    <arg line="${root}/src/fuse_dfs ${build.dir}"/>
+    </exec>
+    <mkdir dir="${build.dir}/test"/>
+    <exec executable="cp" failonerror="true">
+    <arg line="${root}/src/fuse_dfs_wrapper.sh ${build.dir}"/>
     </exec>
+
   </target>
 
   <!-- override jar target !-->
@@ -65,6 +91,12 @@
     </exec>
   </target>
 
+  <target name="test" if="fusedfs">
+    <echo message="testing FuseDFS ..."/>
+   <antcall target="hadoopbuildcontrib.test"> 
+   </antcall>
+  </target>  
+
   <!-- override clean target !-->
   <target name="clean" depends="check-libhdfs-fuse" if="libhdfs-fuse">
     <echo message="contrib: ${name}"/>

Modified: hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/Makefile.am?rev=706358&r1=706357&r2=706358&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/Makefile.am (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/Makefile.am Mon Oct 20 11:23:48 2008
@@ -15,6 +15,6 @@
 #
 bin_PROGRAMS = fuse_dfs
 fuse_dfs_SOURCES = fuse_dfs.c
-AM_CPPFLAGS= -D_FILE_OFFSET_BITS=64 -I$(JAVA_HOME)/include -I$(HADOOP_HOME)/src/c++/libhdfs/ -I$(JAVA_HOME)/include/linux/ -D_FUSE_DFS_VERSION=\"$(PACKAGE_VERSION)\" -DPROTECTED_PATHS=\"$(PROTECTED_PATHS)\"
+AM_CPPFLAGS= -DPERMS=$(PERMS) -D_FILE_OFFSET_BITS=64 -I$(JAVA_HOME)/include -I$(HADOOP_HOME)/src/c++/libhdfs/ -I$(JAVA_HOME)/include/linux/ -D_FUSE_DFS_VERSION=\"$(PACKAGE_VERSION)\" -DPROTECTED_PATHS=\"$(PROTECTED_PATHS)\" -I$(FUSE_HOME)/include
 AM_LDFLAGS= -L$(HADOOP_HOME)/build/libhdfs -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
 

Modified: hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/fuse_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/fuse_dfs.c?rev=706358&r1=706357&r2=706358&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/fuse_dfs.c (original)
+++ hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/fuse_dfs.c Mon Oct 20 11:23:48 2008
@@ -46,63 +46,133 @@
 #include <strings.h>
 
 #include <hdfs.h>
+#include <stddef.h>
+#include <sys/types.h>
+#include <grp.h>
+#include <pwd.h>
+#include <pthread.h>
 
 // Constants
 //
 static const int default_id       = 99; // nobody  - not configurable since soon uids in dfs, yeah!
-static const size_t rd_buf_size   = 128 * 1024;
 static const int blksize = 512;
-static const size_t rd_cache_buf_size = 10*1024*1024;//how much of reads to buffer here
+static const char *const TrashPrefixDir = "/user/root/.Trash";
+static const char *const TrashDir = "/user/root/.Trash/Current";
+static const char *program;
+
 
 /** options for fuse_opt.h */
 struct options {
+  char* protected;
   char* server;
   int port;
   int debug;
-  int nowrites;
-  int no_trash;
-}options;
+  int read_only;
+  int initchecks;
+  int no_permissions;
+  int usetrash;
+  int entry_timeout;
+  int attribute_timeout;
+  int private;
+  size_t rdbuffer_size;
+  int direct_io;
+} options;
+
+void print_options() {
+  fprintf(stderr,"options:\n");
+  fprintf(stderr, "\tprotected=%s\n",options.protected);
+  fprintf(stderr, "\tserver=%s\n",options.server);
+  fprintf(stderr, "\tport=%d\n",options.port);
+  fprintf(stderr, "\tdebug=%d\n",options.debug);
+  fprintf(stderr, "\tread_only=%d\n",options.read_only);
+  fprintf(stderr, "\tusetrash=%d\n",options.usetrash);
+  fprintf(stderr, "\tentry_timeout=%d\n",options.entry_timeout);
+  fprintf(stderr, "\tattribute_timeout=%d\n",options.attribute_timeout);
+  fprintf(stderr, "\tprivate=%d\n",options.private);
+  fprintf(stderr, "\trdbuffer_size=%d (KBs)\n",(int)options.rdbuffer_size/1024);
+}
+
+//#define DOTRACE
+#ifdef DOTRACE
+#define TRACE(x) \
+  syslog(LOG_ERR, "fuse_dfs TRACE - %s\n", x);  \
+  fprintf(stderr, "fuse_dfs TRACE - %s\n", x);
+
+#define TRACE1(x,y)                              \
+  syslog(LOG_ERR, "fuse_dfs TRACE - %s %s\n", x,y);  \
+  fprintf(stderr, "fuse_dfs TRACE - %s %s\n", x,y);
+#else
+#define TRACE(x) ; 
+#define TRACE1(x,y) ; 
+#endif
 
+/**
+ *
+ * dfs_fh_struct is passed around for open files. Fuse provides a hook (the context) 
+ * for storing file specific data.
+ *
+ * 2 Types of information:
+ * a) a read buffer for performance reasons since fuse is typically called on 4K chunks only
+ * b) the hdfs fs handle 
+ *
+ */
 
 typedef struct dfs_fh_struct {
   hdfsFile hdfsFH;
   char *buf;
-  tSize sizeBuffer;  //what is the size of the buffer we have
-  off_t startOffset; //where the buffer starts in the file
+  tSize bufferSize;  //what is the size of the buffer we have
+  off_t buffersStartOffset; //where the buffer starts in the file
+  hdfsFS fs; // for reads/writes need to access as the real user
+  pthread_mutex_t mutex;
 } dfs_fh;
 
-#include <stddef.h>
 
 /** macro to define options */
 #define DFSFS_OPT_KEY(t, p, v) { t, offsetof(struct options, p), v }
 
-/** keys for FUSE_OPT_ options */
 static void print_usage(const char *pname)
 {
-  fprintf(stdout,"USAGE: %s [--debug] [--help] [--version] [--nowrites] [--notrash] --server=<hadoop_servername> --port=<hadoop_port> <mntpoint> [fuse options]\n",pname);
-  fprintf(stdout,"NOTE: a useful fuse option is -o allow_others and -o default_permissions\n");
-  fprintf(stdout,"NOTE: optimizations include -o entry_timeout=500 -o attr_timeout=500\n");
+  fprintf(stdout,"USAGE: %s [debug] [--help] [--version] [-oprotected=<colon_seped_list_of_paths] [rw] [-onotrash] [-ousetrash] [-obig_writes] [-oprivate (single user)] [ro] [-oserver=<hadoop_servername>] [-oport=<hadoop_port>] [-oentry_timeout=<secs>] [-oattribute_timeout=<secs>] [-odirect_io] [-onopoermissions] [-o<other fuse option>] <mntpoint> [fuse options]\n",pname);
   fprintf(stdout,"NOTE: debugging option for fuse is -debug\n");
 }
 
 
-#define OPTIMIZED_READS 1
-
-
+/** keys for FUSE_OPT_ options */
 enum
   {
     KEY_VERSION,
     KEY_HELP,
+    KEY_USETRASH,
+    KEY_NOTRASH,
+    KEY_RO,
+    KEY_RW,
+    KEY_PRIVATE,
+    KEY_BIGWRITES,
+    KEY_DEBUG,
+    KEY_INITCHECKS,
+    KEY_NOPERMISSIONS,
+    KEY_DIRECTIO,
   };
 
 static struct fuse_opt dfs_opts[] =
   {
-    DFSFS_OPT_KEY("--server=%s", server, 0),
-    DFSFS_OPT_KEY("--port=%d", port, 0),
-    DFSFS_OPT_KEY("--debug", debug, 1),
-    DFSFS_OPT_KEY("--nowrites", nowrites, 1),
-    DFSFS_OPT_KEY("--notrash", no_trash, 1),
-
+    DFSFS_OPT_KEY("server=%s", server, 0),
+    DFSFS_OPT_KEY("entry_timeout=%d", entry_timeout, 0),
+    DFSFS_OPT_KEY("attribute_timeout=%d", attribute_timeout, 0),
+    DFSFS_OPT_KEY("protected=%s", protected, 0),
+    DFSFS_OPT_KEY("port=%d", port, 0),
+    DFSFS_OPT_KEY("rdbuffer=%d", rdbuffer_size,0),
+
+    FUSE_OPT_KEY("private", KEY_PRIVATE),
+    FUSE_OPT_KEY("ro", KEY_RO),
+    FUSE_OPT_KEY("debug", KEY_DEBUG),
+    FUSE_OPT_KEY("initchecks", KEY_INITCHECKS),
+    FUSE_OPT_KEY("nopermissions", KEY_NOPERMISSIONS),
+    FUSE_OPT_KEY("big_writes", KEY_BIGWRITES),
+    FUSE_OPT_KEY("rw", KEY_RW),
+    FUSE_OPT_KEY("usetrash", KEY_USETRASH),
+    FUSE_OPT_KEY("notrash", KEY_NOTRASH),
+    FUSE_OPT_KEY("direct_io", KEY_DIRECTIO),
     FUSE_OPT_KEY("-v",             KEY_VERSION),
     FUSE_OPT_KEY("--version",      KEY_VERSION),
     FUSE_OPT_KEY("-h",             KEY_HELP),
@@ -110,30 +180,75 @@
     FUSE_OPT_END
   };
 
-static const char *program;
-
 int dfs_options(void *data, const char *arg, int key,  struct fuse_args *outargs)
 {
+  (void) data;
 
-  if (key == KEY_VERSION) {
+  switch (key) {
+  case FUSE_OPT_KEY_OPT:
+    fprintf(stderr,"fuse-dfs ignoring option %s\n",arg);
+    return 1;
+  case  KEY_VERSION:
     fprintf(stdout,"%s %s\n",program,_FUSE_DFS_VERSION);
     exit(0);
-  } else if (key == KEY_HELP) {
+  case KEY_HELP:
     print_usage(program);
     exit(0);
-  } else {
+  case KEY_USETRASH:
+    options.usetrash = 1;
+    break;
+  case KEY_NOTRASH:
+    options.usetrash = 1;
+    break;
+  case KEY_RO:
+    options.read_only = 1;
+    break;
+  case KEY_RW:
+    options.read_only = 0;
+    break;
+  case KEY_PRIVATE:
+    options.private = 1;
+    break;
+  case KEY_DEBUG:
+    fuse_opt_add_arg(outargs, "-d");
+    options.debug = 1;
+    break;
+  case KEY_INITCHECKS:
+    options.initchecks = 1;
+    break;
+  case KEY_NOPERMISSIONS:
+    options.no_permissions = 1;
+    break;
+  case KEY_DIRECTIO:
+    options.direct_io = 1;
+    break;
+  case KEY_BIGWRITES:
+#ifdef FUSE_CAP_BIG_WRITES
+    fuse_opt_add_arg(outargs, "-obig_writes");
+#endif
+    break;
+  default: {
     // try and see if the arg is a URI for DFS
     int tmp_port;
     char tmp_server[1024];
 
     if (!sscanf(arg,"dfs://%1024[a-zA-Z0-9_.-]:%d",tmp_server,&tmp_port)) {
-      printf("didn't recognize %s\n",arg);
-      fuse_opt_add_arg(outargs,arg);
+      if (strcmp(arg,"ro") == 0) {
+        options.read_only = 1;
+      } else if (strcmp(arg,"rw") == 0) {
+        options.read_only = 0;
+      } else {
+        fprintf(stderr,"fuse-dfs didn't recognize %s,%d\n",arg,key);
+        fuse_opt_add_arg(outargs,arg);
+        return 0;
+      }
     } else {
       options.port = tmp_port;
       options.server = strdup(tmp_server);
+      fprintf(stderr, "port=%d,server=%s\n", options.port, options.server);
     }
   }
+  }
   return 0;
 }
 
@@ -149,9 +264,11 @@
   char *nn_hostname;
   int nn_port;
   hdfsFS fs;
-  int nowrites;
-  int no_trash;
-
+  int read_only;
+  int usetrash;
+  int direct_io;
+  char **protectedpaths;
+  size_t rdbuffer_size;
   // todo:
   // total hack city - use this to strip off the dfs url from the filenames
   // that the dfs API is now providing in 0.14.5
@@ -161,6 +278,388 @@
   int dfs_uri_len;
 } dfs_context;
 
+#define TRASH_RENAME_TRIES  100
+
+//
+// Some forward declarations
+//
+static int dfs_mkdir(const char *path, mode_t mode);
+static int dfs_rename(const char *from, const char *to);
+
+
+//
+// NOTE: this function is a c implementation of org.apache.hadoop.fs.Trash.moveToTrash(Path path).
+//
+
+int move_to_trash(const char *item, hdfsFS userFS) {
+
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
+  // check params and the context var
+  assert(item);
+  assert(dfs);
+  assert('/' == *item);
+  assert(rindex(item,'/') >= 0);
+
+
+  char fname[4096]; // or last element of the directory path
+  char parent_directory[4096]; // the directory the fname resides in
+
+  if (strlen(item) > sizeof(fname) - strlen(TrashDir)) {
+    syslog(LOG_ERR, "ERROR: internal buffer too small to accomodate path of length %d %s:%d\n", (int)strlen(item), __FILE__, __LINE__);
+    return -EIO;
+  }
+
+  // separate the file name and the parent directory of the item to be deleted
+  {
+    int length_of_parent_dir = rindex(item, '/') - item ;
+    int length_of_fname = strlen(item) - length_of_parent_dir - 1; // the '/'
+
+    // note - the below strncpys should be safe from overflow because of the check on item's string length above.
+    strncpy(parent_directory, item, length_of_parent_dir);
+    parent_directory[length_of_parent_dir ] = 0;
+    strncpy(fname, item + length_of_parent_dir + 1, strlen(item));
+    fname[length_of_fname + 1] = 0;
+  }
+
+  // create the target trash directory
+  char trash_dir[4096];
+  if (snprintf(trash_dir, sizeof(trash_dir), "%s%s",TrashDir,parent_directory) >= sizeof trash_dir) {
+    syslog(LOG_ERR, "move_to_trash error target is not big enough to hold new name for %s %s:%d\n",item, __FILE__, __LINE__);
+    return -EIO;
+  }
+
+  // create the target trash directory in trash (if needed)
+  if ( hdfsExists(userFS, trash_dir)) {
+    int status;
+    // make the directory to put it in in the Trash - NOTE
+    // dfs_mkdir also creates parents, so Current will be created if it does not exist.
+    if ((status = dfs_mkdir(trash_dir,0777)) != 0) {
+      return status;
+    }
+  }
+
+  //
+  // if the target path in Trash already exists, then append with
+  // a number. Start from 1.
+  //
+  char target[4096];
+  int j ;
+  if ( snprintf(target, sizeof target,"%s/%s",trash_dir, fname) >= sizeof target) {
+    syslog(LOG_ERR, "move_to_trash error target is not big enough to hold new name for %s %s:%d\n",item, __FILE__, __LINE__);
+    return -EIO;
+  }
+
+  // NOTE: this loop differs from the java version by capping the #of tries
+  for (j = 1; ! hdfsExists(userFS, target) && j < TRASH_RENAME_TRIES ; j++) {
+    if (snprintf(target, sizeof target,"%s/%s.%d",trash_dir, fname, j) >= sizeof target) {
+      syslog(LOG_ERR, "move_to_trash error target is not big enough to hold new name for %s %s:%d\n",item, __FILE__, __LINE__);
+      return -EIO;
+    }
+  }
+  return dfs_rename(item,target);
+} 
+ 
+
+/**
+ * getpwuid and getgrgid return static structs so we safeguard the contents
+ * while retrieving fields using the 2 structs below.
+ * NOTE: if using both, always get the passwd struct firt!
+ */
+static pthread_mutex_t passwdstruct_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t groupstruct_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/**
+ * Converts from a hdfs hdfsFileInfo to a POSIX stat struct
+ *
+ */
+int fill_stat_structure(hdfsFileInfo *info, struct stat *st) 
+{
+  assert(st);
+  assert(info);
+
+  // initialize the stat structure
+  memset(st, 0, sizeof(struct stat));
+
+  // by default: set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory
+  st->st_nlink = (info->mKind == kObjectKindDirectory) ? 0 : 1;
+
+  uid_t owner_id = default_id;
+#if PERMS
+  if (info->mOwner != NULL) {
+    //
+    // Critical section - protect from concurrent calls in different threads since
+    // the struct below is static.
+    // (no returns until end)
+    //
+    pthread_mutex_lock(&passwdstruct_mutex);
+
+    struct passwd *passwd_info = getpwnam(info->mOwner);
+    owner_id = passwd_info == NULL ? default_id : passwd_info->pw_uid;
+
+    //
+    // End critical section 
+    // 
+    pthread_mutex_unlock(&passwdstruct_mutex);
+
+  } 
+#endif
+  gid_t group_id = default_id;
+#if PERMS
+  if (info->mGroup == NULL) {
+    //
+    // Critical section - protect from concurrent calls in different threads since
+    // the struct below is static.
+    // (no returns until end)
+    //
+    pthread_mutex_lock(&groupstruct_mutex);
+
+    struct group *grp = getgrnam(info->mGroup);
+    group_id = grp == NULL ? default_id : grp->gr_gid;
+
+    //
+    // End critical section 
+    // 
+    pthread_mutex_unlock(&groupstruct_mutex);
+
+  }
+#endif
+
+  short perm = (info->mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) :  (S_IFREG | 0666);
+#if PERMS
+  if (info->mPermissions > 0) {
+    perm = (info->mKind == kObjectKindDirectory) ? S_IFDIR:  S_IFREG ;
+    perm |= info->mPermissions;
+  }
+#endif
+
+  // set stat metadata
+  st->st_size     = (info->mKind == kObjectKindDirectory) ? 4096 : info->mSize;
+  st->st_blksize  = blksize;
+  st->st_blocks   =  ceil(st->st_size/st->st_blksize);
+  st->st_mode     = perm;
+  st->st_uid      = owner_id;
+  st->st_gid      = group_id;
+#if PERMS
+  st->st_atime    = info->mLastAccess;
+#else
+  st->st_atime    = info->mLastMod;
+#endif
+  st->st_mtime    = info->mLastMod;
+  st->st_ctime    = info->mLastMod;
+
+  return 0;
+}
+
+
+#if PERMS
+
+/**
+ * Utility for getting the user making the fuse call in char * form
+ * NOTE: if non-null return, the return must be freed by the caller.
+ */
+static char *getUsername(uid_t uid)
+{
+  //
+  // Critical section - protect from concurrent calls in different threads.
+  // since the struct below is static.
+  // (no returns until end)
+  //
+
+  pthread_mutex_lock(&passwdstruct_mutex);
+
+  struct passwd *userinfo = getpwuid(uid);
+  char * ret = userinfo && userinfo->pw_name ? strdup(userinfo->pw_name) : NULL;
+
+  pthread_mutex_unlock(&passwdstruct_mutex);
+
+  //
+  // End critical section 
+  // 
+  return ret;
+}
+
+/**
+ * Cleans up a char ** group pointer
+ */
+
+static void freeGroups(char **groups, int numgroups) {
+  if (groups == NULL) {
+    return;
+  }
+  int i ;
+  for (i = 0; i < numgroups; i++) {
+    free(groups[i]);
+  }
+  free(groups);
+}
+
+#define GROUPBUF_SIZE 5
+
+static char *getGroup(gid_t gid) {
+  //
+  // Critical section - protect from concurrent calls in different threads.
+  // since the struct below is static.
+  // (no returns until end)
+  //
+
+  pthread_mutex_lock(&groupstruct_mutex);
+
+  struct group* grp = getgrgid(gid);
+  char * ret = grp && grp->gr_name ? strdup(grp->gr_name) : NULL;
+
+  //
+  // End critical section 
+  // 
+  pthread_mutex_unlock(&groupstruct_mutex);
+
+  return ret;
+}
+
+
+/**
+ * Utility for getting the group from the uid
+ * NOTE: if non-null return, the return must be freed by the caller.
+ */
+char *getGroupUid(uid_t uid) {
+  //
+  // Critical section - protect from concurrent calls in different threads
+  // since the structs below are static.
+  // (no returns until end)
+  //
+
+  pthread_mutex_lock(&passwdstruct_mutex);
+  pthread_mutex_lock(&groupstruct_mutex);
+
+  char *ret = NULL;
+  struct passwd *userinfo = getpwuid(uid);
+  if (NULL != userinfo) {
+    struct group* grp = getgrgid( userinfo->pw_gid);
+    ret = grp && grp->gr_name ? strdup(grp->gr_name) : NULL;
+  }
+
+  //
+  // End critical section 
+  // 
+  pthread_mutex_unlock(&groupstruct_mutex);
+  pthread_mutex_unlock(&passwdstruct_mutex);
+
+  return ret;
+}
+
+
+/**
+ * lookup the gid based on the uid
+ */
+gid_t getGidUid(uid_t uid) {
+  //
+  // Critical section - protect from concurrent calls in different threads
+  // since the struct below is static.
+  // (no returns until end)
+  //
+
+  pthread_mutex_lock(&passwdstruct_mutex);
+
+  struct passwd *userinfo = getpwuid(uid);
+  gid_t gid = userinfo == NULL ? 0 : userinfo->pw_gid;
+
+  //
+  // End critical section 
+  // 
+  pthread_mutex_unlock(&passwdstruct_mutex);
+
+  return gid;
+}
+
+/**
+ * Utility for getting the groups for the user making the fuse call in char * form
+ */
+static char ** getGroups(uid_t uid, int *num_groups)
+{
+  char *user = getUsername(uid);
+
+  if (user == NULL)
+    return NULL;
+
+  char **groupnames = NULL;
+
+  // see http://www.openldap.org/lists/openldap-devel/199903/msg00023.html
+
+  //#define GETGROUPS_T 1 
+#ifdef GETGROUPS_T
+  *num_groups = GROUPBUF_SIZE;
+
+  gid_t* grouplist = malloc(GROUPBUF_SIZE * sizeof(gid_t)); 
+  assert(grouplist != NULL);
+  gid_t* tmp_grouplist; 
+  int rtr;
+
+  gid_t gid = getGidUid(uid);
+
+  if ((rtr = getgrouplist(user, gid, grouplist, num_groups)) == -1) {
+    // the buffer we passed in is < *num_groups
+    if ((tmp_grouplist = realloc(grouplist, *num_groups * sizeof(gid_t))) != NULL) {
+      grouplist = tmp_grouplist;
+      getgrouplist(user, gid, grouplist, num_groups);
+    }
+  }
+
+  groupnames = (char**)malloc(sizeof(char*)* (*num_groups) + 1);
+  assert(groupnames);
+  int i;
+  for (i=0; i < *num_groups; i++)  {
+    groupnames[i] = getGroup(grouplist[i]);
+    if (groupnames[i] == NULL) {
+      fprintf(stderr, "error could not lookup group %d\n",(int)grouplist[i]);
+    }
+  } 
+  free(grouplist);
+  assert(user != NULL);
+  groupnames[i] = user;
+
+#else
+
+  int i = 0;
+  assert(user != NULL);
+  groupnames[i] = user;
+  i++;
+
+  groupnames[i] = getGroupUid(uid);
+  if (groupnames[i]) {
+    i++;
+  }
+
+  *num_groups = i;
+
+#endif
+  return groupnames;
+}
+
+
+/**
+ * Connects to the NN as the current user/group according to FUSE
+ *
+ */
+static hdfsFS doConnectAsUser(const char *hostname, int port) {
+  uid_t uid = fuse_get_context()->uid;
+
+  char *user = getUsername(uid);
+  if (NULL == user)
+    return NULL;
+  int numgroups = 0;
+  char **groups = getGroups(uid, &numgroups);
+  hdfsFS fs = hdfsConnectAsUser(hostname, port, user, (const char **)groups, numgroups);
+  freeGroups(groups, numgroups);
+  if (user) 
+    free(user);
+  return fs;
+}
+#else
+static hdfsFS doConnectAsUser(const char *hostname, int port) {
+  return hdfsConnect(hostname, port);
+}
+#endif
 
 //
 // Start of read-only functions
@@ -168,6 +667,8 @@
 
 static int dfs_getattr(const char *path, struct stat *st)
 {
+  TRACE1("getattr", path)
+
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
@@ -178,7 +679,7 @@
 
   // if not connected, try to connect and fail out if we can't.
   if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    syslog(LOG_ERR, "ERROR: could not connect to %s:%d %s:%d\n", dfs->nn_hostname, dfs->nn_port,__FILE__, __LINE__);
     return -EIO;
   }
 
@@ -189,8 +690,7 @@
     return -ENOENT;
   }
 
-  // initialize the stat structure
-  memset(st, 0, sizeof(struct stat));
+  fill_stat_structure(&info[0], st);
 
   // setup hard link info - for a file it is 1 else num entries in a dir + 2 (for . and ..)
   if (info[0].mKind == kObjectKindDirectory) {
@@ -206,17 +706,6 @@
     st->st_nlink = 1;
   }
 
-  // set stat metadata
-  st->st_size     = (info[0].mKind == kObjectKindDirectory) ? 4096 : info[0].mSize;
-  st->st_blksize  = blksize;
-  st->st_blocks   =  ceil(st->st_size/st->st_blksize);
-  st->st_mode     = (info[0].mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) :  (S_IFREG | 0666);
-  st->st_uid      = default_id;
-  st->st_gid      = default_id;
-  st->st_atime    = info[0].mLastMod;
-  st->st_mtime    = info[0].mLastMod;
-  st->st_ctime    = info[0].mLastMod;
-
   // free the info pointer
   hdfsFreeFileInfo(info,1);
 
@@ -226,6 +715,8 @@
 static int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
                        off_t offset, struct fuse_file_info *fi)
 {
+  TRACE1("readdir",path)
+
   (void) offset;
   (void) fi;
 
@@ -237,17 +728,19 @@
   assert(path);
   assert(buf);
 
+  int path_len = strlen(path);
+
+  hdfsFS userFS;
   // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
     syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
     return -EIO;
   }
 
-  int path_len = strlen(path);
-
   // call dfs to read the dir
   int numEntries = 0;
-  hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries);
+  hdfsFileInfo *info = hdfsListDirectory(userFS,path,&numEntries);
+  userFS = NULL;
 
   // NULL means either the directory doesn't exist or maybe IO error.
   if (NULL == info) {
@@ -264,21 +757,7 @@
     }
 
     struct stat st;
-    memset(&st, 0, sizeof(struct stat));
-
-    // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory
-    st.st_nlink = (info[i].mKind == kObjectKindDirectory) ? 0 : 1;
-
-    // setup stat size and acl meta data
-    st.st_size    = info[i].mSize;
-    st.st_blksize = 512;
-    st.st_blocks  =  ceil(st.st_size/st.st_blksize);
-    st.st_mode    = (info[i].mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) :  (S_IFREG | 0666);
-    st.st_uid     = default_id;
-    st.st_gid     = default_id;
-    st.st_atime   = info[i].mLastMod;
-    st.st_mtime   = info[i].mLastMod;
-    st.st_ctime   = info[i].mLastMod;
+    fill_stat_structure(&info[i], &st);
 
     // hack city: todo fix the below to something nicer and more maintainable but
     // with good performance
@@ -291,7 +770,6 @@
     if ((res = filler(buf,str,&st,0)) != 0) {
       syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d\n",res, __FILE__, __LINE__);
     }
-
   }
 
   // insert '.' and '..'
@@ -324,16 +802,27 @@
         syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d", res, __FILE__, __LINE__);
       }
     }
-
   // free the info pointers
   hdfsFreeFileInfo(info,numEntries);
-
   return 0;
 }
 
+static size_t min(const size_t x, const size_t y) {
+  return x < y ? x : y;
+}
+
+/**
+ * dfs_read
+ *
+ * Reads from dfs or the open file's buffer.  Note that fuse requires that
+ * either the entire read be satisfied or the EOF is hit or direct_io is enabled
+ *
+ */
 static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
                     struct fuse_file_info *fi)
 {
+  TRACE1("read",path)
+  
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
@@ -341,72 +830,106 @@
   assert(dfs);
   assert(path);
   assert(buf);
+  assert(offset >= 0);
+  assert(size >= 0);
+  assert(fi);
 
-  // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
-    return -EIO;
+  dfs_fh *fh = (dfs_fh*)fi->fh;
+
+  assert(fh != NULL);
+  assert(fh->fs != NULL);
+  assert(fh->hdfsFH != NULL);
+
+  if (size >= dfs->rdbuffer_size) {
+    int num_read;
+    int total_read = 0;
+    while (size - total_read > 0 && (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
+      total_read += num_read;
+    }
+    return total_read;
   }
 
+  assert(fh->bufferSize >= 0);
 
-#ifdef OPTIMIZED_READS
-  dfs_fh *fh = (dfs_fh*)fi->fh;
-  //fprintf(stderr, "Cache bounds for %s: %llu -> %llu (%d bytes). Check for offset %llu\n", path, fh->startOffset, fh->startOffset + fh->sizeBuffer, fh->sizeBuffer, offset);
-  if (fh->sizeBuffer == 0  || offset < fh->startOffset || offset > (fh->startOffset + fh->sizeBuffer)  )
-  {
-    // do the actual read
-    //fprintf (stderr,"Reading %s from HDFS, offset %llu, amount %d\n", path, offset, rd_cache_buf_size);
-    const tSize num_read = hdfsPread(dfs->fs, fh->hdfsFH, offset, fh->buf, rd_cache_buf_size);
-    if (num_read < 0) {
-      syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, num_read, __FILE__, __LINE__);
-      hdfsDisconnect(dfs->fs);
-      dfs->fs = NULL;
-      return -EIO;
+  //
+  // Critical section - protect from multiple reads in different threads accessing the read buffer
+  // (no returns until end)
+  //
+
+  pthread_mutex_lock(&fh->mutex);
+
+  // used only to check the postcondition of this function - namely that we satisfy
+  // the entire read or EOF is hit.
+  int isEOF = 0;
+  int ret = 0;
+
+  // check if the buffer is empty or
+  // the read starts before the buffer starts or
+  // the read ends after the buffer ends
+
+  if (fh->bufferSize == 0  || 
+      offset < fh->buffersStartOffset || 
+      offset + size > fh->buffersStartOffset + fh->bufferSize) 
+    {
+      // Read into the buffer from DFS
+      size_t num_read;
+      size_t total_read = 0;
+
+      while (dfs->rdbuffer_size  - total_read > 0 && 
+             (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) {
+        total_read += num_read;
+      }
+
+      if (num_read < 0) {
+        // invalidate the buffer 
+        fh->bufferSize = 0; 
+        syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, (int)num_read, __FILE__, __LINE__);
+        ret = -EIO;
+      } else {
+        fh->bufferSize = total_read;
+        fh->buffersStartOffset = offset;
+
+        if (dfs->rdbuffer_size - total_read > 0) {
+          isEOF = 1;
+        }
+      }
     }
-    fh->sizeBuffer = num_read;
-    fh->startOffset = offset;
-    //fprintf (stderr,"Read %d bytes of %s from HDFS\n", num_read, path);
-  }
-
-  char* local_buf = fh->buf;
-  const tSize cacheLookupOffset = offset - fh->startOffset;
-  local_buf += cacheLookupOffset;
-  //fprintf(stderr,"FUSE requested %d bytes of %s for offset %d in file\n", size, path, offset);
-  const tSize amount = cacheLookupOffset + size > fh->sizeBuffer
-    ?  fh->sizeBuffer - cacheLookupOffset
-    : size;
-  //fprintf(stderr,"Reading %s from cache, %d bytes from position %d\n", path, amount, cacheLookupOffset);
-  //fprintf(stderr,"Cache status for %s: %d bytes cached from offset %llu\n", path, fh->sizeBuffer, fh->startOffset);
-  memcpy(buf, local_buf, amount);
-  //fprintf(stderr,"Read %s from cache, %d bytes from position %d\n", path, amount, cacheLookupOffset);
-  //fprintf(stderr,"Cache status for %s: %d bytes cached from offset %llu\n", path, fh->sizeBuffer, fh->startOffset);
-  return amount;
+  if (ret == 0) {
 
-#else
-  // NULL means either file doesn't exist or maybe IO error - i.e., the dfs_open must have failed
-  if (NULL == (void*)fi->fh) {
-    // should never happen
-    return  -EIO;
-  }
-  syslog(LOG_DEBUG,"buffer size=%d\n",(int)size);
+    assert(offset >= fh->buffersStartOffset);
+    assert(fh->buf);
 
-  // do the actual read
-  const tSize num_read = hdfsPread(dfs->fs, (hdfsFile)fi->fh, offset, buf, size);
+    const size_t bufferReadIndex = offset - fh->buffersStartOffset;
+    assert(bufferReadIndex >= 0 && bufferReadIndex < fh->bufferSize);
 
-  // handle errors
-  if (num_read < 0) {
-    syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, num_read, __FILE__, __LINE__);
-    hdfsDisconnect(dfs->fs);
-    dfs->fs = NULL;
-    return -EIO;
+    const size_t amount = min(fh->buffersStartOffset + fh->bufferSize - offset, size);
+    assert(amount >= 0 && amount <= fh->bufferSize);
+
+    const char *offsetPtr = fh->buf + bufferReadIndex;
+    assert(offsetPtr >= fh->buf);
+    assert(offsetPtr + amount <= fh->buf + fh->bufferSize);
+    
+    memcpy(buf, offsetPtr, amount);
+
+    // fuse requires the below and the code should guarantee this assertion
+    assert(amount == size || isEOF);
+    ret = amount;
   }
-  return num_read;
-#endif
 
+  //
+  // Critical section end 
+  //
+  pthread_mutex_unlock(&fh->mutex);
+
+  return ret;
 }
 
+
+
 static int dfs_statfs(const char *path, struct statvfs *st)
 {
+  TRACE1("statfs",path)
+
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
@@ -418,15 +941,16 @@
   // init the stat structure
   memset(st,0,sizeof(struct statvfs));
 
+  hdfsFS userFS;
   // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
     syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
     return -EIO;
   }
 
-  const long cap   = hdfsGetCapacity(dfs->fs);
-  const long used  = hdfsGetUsed(dfs->fs);
-  const long bsize = hdfsGetDefaultBlockSize(dfs->fs);
+  const long cap   = hdfsGetCapacity(userFS);
+  const long used  = hdfsGetUsed(userFS);
+  const long bsize = hdfsGetDefaultBlockSize(userFS);
 
   // fill in the statvfs structure
 
@@ -447,10 +971,13 @@
   */
 
   st->f_bsize   =  bsize;
-  st->f_frsize  =  st->f_bsize;
-  st->f_blocks  =  cap/st->f_bsize;
-  st->f_bfree   =  (cap-used)/st->f_bsize;
-  st->f_bavail  =  st->f_bfree;
+  st->f_frsize  =  bsize;
+
+  st->f_blocks  =  cap/bsize;
+
+  st->f_bfree   =  (cap-used)/bsize;
+  st->f_bavail  =  cap/bsize;
+
   st->f_files   =  1000;
   st->f_ffree   =  500;
   st->f_favail  =  500;
@@ -461,47 +988,57 @@
   return 0;
 }
 
-static int dfs_access(const char *path, int mask)
-{
-  // no permissions on dfs, always a success
+
+static int is_protected(const char *path) {
+
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+  assert(dfs != NULL);
+  assert(dfs->protectedpaths);
+
+  int i ;
+  for (i = 0; dfs->protectedpaths[i]; i++) {
+    if (strcmp(path, dfs->protectedpaths[i]) == 0) {
+      return 1;
+    }
+  }
   return 0;
 }
 
 //
-// The remainder are write functionality and therefore not implemented right now
+// Start of write functions
 //
 
 
-static char **protectedpaths;
-
-
 static int dfs_mkdir(const char *path, mode_t mode)
 {
+  TRACE1("mkdir", path)
+
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
   // check params and the context var
   assert(path);
   assert(dfs);
+  assert('/' == *path);
+
+  if (is_protected(path)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to create the directory: %s", path);
+    return -EACCES;
+  }
 
+  if (dfs->read_only) {
+    syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot create the directory %s\n",path);
+    return -EACCES;
+  }
+  
+  hdfsFS userFS;
   // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
     syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
     return -EIO;
   }
 
-  assert('/' == *path);
-
-  int i ;
-  for (i = 0; protectedpaths[i]; i++) {
-    if (strcmp(path, protectedpaths[i]) == 0) {
-      syslog(LOG_ERR,"ERROR: hdfs trying to create the directory: %s", path);
-      return -EACCES;
-    }
-  }
-
-
-  if (dfs->nowrites || hdfsCreateDirectory(dfs->fs, path)) {
+  if (hdfsCreateDirectory(userFS, path)) {
     syslog(LOG_ERR,"ERROR: hdfs trying to create directory %s",path);
     return -EIO;
   }
@@ -512,7 +1049,9 @@
 
 static int dfs_rename(const char *from, const char *to)
 {
-  // retrieve dfs specific data
+  TRACE1("rename", from) 
+
+ // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
   // check params and the context var
@@ -520,31 +1059,31 @@
   assert(to);
   assert(dfs);
 
+  assert('/' == *from);
+  assert('/' == *to);
+
+  if (is_protected(from) || is_protected(to)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to rename: %s %s", from, to);
+    return -EACCES;
+  }
+
+  if (dfs->read_only) {
+    syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot rename the directory %s\n",from);
+    return -EACCES;
+  }
+
+  hdfsFS userFS;
   // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
     syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
     return -EIO;
   }
 
-  assert('/' == *from);
-  assert('/' == *to);
-
-  int i ;
-  for (i = 0; protectedpaths[i] != NULL; i++) {
-    if (strcmp(from, protectedpaths[i]) == 0) {
-      syslog(LOG_ERR,"ERROR: hdfs trying to rename directories %s to %s",from,to);
-      return -EACCES;
-    }
-    if (strcmp(to, protectedpaths[i]) == 0) {
-      syslog(LOG_ERR,"ERROR: hdfs trying to rename directories %s to %s",from,to);
-      return -EACCES;
-    }
-  }
-
-  if (dfs->nowrites ||  hdfsRename(dfs->fs, from, to)) {
+  if (hdfsRename(userFS, from, to)) {
     syslog(LOG_ERR,"ERROR: hdfs trying to rename %s to %s",from, to);
     return -EIO;
   }
+
   return 0;
 
 }
@@ -552,31 +1091,35 @@
 
 static int dfs_rmdir(const char *path)
 {
+  TRACE1("rmdir", path)
+
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
   // check params and the context var
   assert(path);
   assert(dfs);
+  assert('/' == *path);
 
-  // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
-    return -EIO;
+  if (is_protected(path)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to delete a protected directory: %s ",path);
+    return -EACCES;
   }
 
-  assert('/' == *path);
+  if (dfs->read_only) {
+    syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot delete the directory %s\n",path);
+    return -EACCES;
+  }
 
-  int i ;
-  for (i = 0; protectedpaths[i]; i++) {
-    if (strcmp(path, protectedpaths[i]) == 0) {
-      syslog(LOG_ERR,"ERROR: hdfs trying to delete the directory: %s ",path);
-      return -EACCES;
-    }
+  hdfsFS userFS;
+  // if not connected, try to connect and fail out if we can't.
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
   }
 
   int numEntries = 0;
-  hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries);
+  hdfsFileInfo *info = hdfsListDirectory(userFS,path,&numEntries);
 
   // free the info pointers
   hdfsFreeFileInfo(info,numEntries);
@@ -585,165 +1128,190 @@
     return -ENOTEMPTY;
   }
 
+  if (dfs->usetrash && strncmp(path, TrashPrefixDir, strlen(TrashPrefixDir)) != 0) {
+    fprintf(stderr, "moving to trash %s\n", path);
+    int ret= move_to_trash(path, userFS);
+    return ret;
+  }
 
 
-  // since these commands go through the programmatic hadoop API, there is no
-  // trash feature. So, force it here.
-  // But make sure the person isn't deleting from Trash itself :)
-  // NOTE: /Trash is in protectedpaths so they cannot delete all of trash
-  if (!dfs->no_trash && strncmp(path, "/Trash", strlen("/Trash")) != 0) {
+  if (hdfsDelete(userFS, path)) {
+    syslog(LOG_ERR,"ERROR: hdfs error trying to delete the directory %s\n",path);
+    return -EIO;
+  }
 
-    char target[4096];
-    char dir[4096];
-    int status;
+  return 0;
+}
 
-    {
-      // find the directory and full targets in Trash
 
-      sprintf(target, "/Trash/Current%s",path);
+static int dfs_unlink(const char *path)
+{
+  TRACE1("unlink", path)
 
-      // strip off the actual file or directory name from the fullpath
-      char *name = rindex(path, '/');
-      assert(name);
-      *name = 0;
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
-      // use that path to ensure the directory exists in the Trash dir
-      // prepend Trash to the directory
-      sprintf(dir,"/Trash/Current%s/",path);
+  // check params and the context var
+  assert(path);
+  assert(dfs);
+  assert('/' == *path);
 
-      // repair the path not used again but in case the caller expects it.
-      *name = '/';
-    }
+  if (is_protected(path)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to delete a protected directory: %s ",path);
+    return -EACCES;
+  }
 
-    // if the directory doesn't already exist in the Trash
-    // then we go through with the rename
-    if ( hdfsExists(dfs->fs, target) != 0) { // 0 means it exists. weird
-      // make the directory to put it in in the Trash
-      if ((status = dfs_mkdir(dir,0)) != 0) {
-        return status;
-      }
+  if (dfs->read_only) {
+    syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot create the directory %s\n",path);
+    return -EACCES;
+  }
 
-      // do the rename
-      return dfs_rename(path,target);
+  hdfsFS userFS;
+  // if not connected, try to connect and fail out if we can't.
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
 
-    }
-    // if the directory exists in the Trash, then we don't bother doing the rename
-    // and just delete the existing one by falling though.
+  // move the file to the trash if this is enabled and its not actually in the trash.
+  if (dfs->usetrash && strncmp(path, TrashPrefixDir, strlen(TrashPrefixDir)) != 0) {
+    int ret= move_to_trash(path, userFS);
+    return ret;
   }
 
-  if (dfs->nowrites ||  hdfsDelete(dfs->fs, path)) {
-    syslog(LOG_ERR,"ERROR: hdfs trying to delete the directory %s",path);
+  if (hdfsDelete(userFS, path)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to delete the file %s",path);
     return -EIO;
   }
+
   return 0;
-}
 
+}
 
-static int dfs_unlink(const char *path)
+static int dfs_utimens(const char *path, const struct timespec ts[2])
 {
+  TRACE1("utimens", path)
+#if PERMS
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
   // check params and the context var
   assert(path);
   assert(dfs);
+  assert('/' == *path);
 
+  time_t aTime = ts[0].tv_sec;
+  time_t mTime = ts[1].tv_sec;
+
+  hdfsFS userFS;
   // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
     syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
     return -EIO;
   }
 
-  assert('/' == *path);
-
-  int i ;
-  for (i = 0; protectedpaths[i]; i++) {
-    if (strcmp(path, protectedpaths[i]) == 0) {
-      syslog(LOG_ERR,"ERROR: hdfs trying to delete the directory: %s ",path);
-      return -EACCES;
-    }
+  if (hdfsUtime(userFS, path, mTime, aTime)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to utime %s to %ld/%ld",path, (long)mTime, (long)aTime);
+    fprintf(stderr,"ERROR: could not set utime for path %s\n",path);
+    return -EIO;
   }
+#endif  
+  return 0;
+}
 
 
+static int dfs_chmod(const char *path, mode_t mode)
+{
+  TRACE1("chmod", path)
 
-  // since these commands go through the programmatic hadoop API, there is no
-  // trash feature. So, force it here.
-  // But make sure the person isn't deleting from Trash itself :)
-  // NOTE: /Trash is in protectedpaths so they cannot delete all of trash
-  if (!dfs->no_trash && strncmp(path, "/Trash", strlen("/Trash")) != 0) {
+#if PERMS
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
-    char target[4096];
-    char dir[4096];
-    int status;
+  // check params and the context var
+  assert(path);
+  assert(dfs);
+  assert('/' == *path);
 
-    {
-      // find the directory and full targets in Trash
+  hdfsFS userFS;
+  // if not connected, try to connect and fail out if we can't.
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
 
-      sprintf(target, "/Trash/Current%s",path);
+  if (hdfsChmod(userFS, path, (short)mode)) {
+    syslog(LOG_ERR,"ERROR: hdfs trying to chmod %s to %d",path, (int)mode);
+    return -EIO;
+  }
+#endif
+  return 0;
+}
 
-      // strip off the actual file or directory name from the fullpath
-      char *name = rindex(path, '/');
-      assert(name);
-      *name = 0;
+static int dfs_chown(const char *path, uid_t uid, gid_t gid)
+{
+  TRACE1("chown", path)
 
-      // use that path to ensure the directory exists in the Trash dir
-      // prepend Trash to the directory
-      sprintf(dir,"/Trash/Current%s/",path);
+  int ret = 0;
 
-      // repair the path not used again but in case the caller expects it.
-      *name = '/';
-    }
+#if PERMS
+  char *user = NULL;
+  char *group = NULL;
 
-    // if this is a file and it's already got a copy in the Trash, to be conservative, we
-    // don't do the delete.
-    if (hdfsExists(dfs->fs, target) == 0) {
-      syslog(LOG_ERR,"ERROR: hdfs trying to delete a file that was already deleted so cannot back it to Trash: %s",target);
-      return -EIO;
-    }
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
-    // make the directory to put it in in the Trash
-    if ((status = dfs_mkdir(dir,0)) != 0) {
-      return status;
-    }
+  // check params and the context var
+  assert(path);
+  assert(dfs);
+  assert('/' == *path);
 
-    // do the rename
-    return dfs_rename(path,target);
+  user = getUsername(uid);
+  if (NULL == user) {
+    syslog(LOG_ERR,"Could not lookup the user id string %d\n",(int)uid); 
+    fprintf(stderr, "could not lookup userid %d\n", (int)uid); 
+    ret = -EIO;
   }
 
-  if (dfs->nowrites ||  hdfsDelete(dfs->fs, path)) {
-    syslog(LOG_ERR,"ERROR: hdfs trying to delete the file %s",path);
-    return -EIO;
+  if (0 == ret) {
+    group = getGroup(gid);
+    if (group == NULL) {
+      syslog(LOG_ERR,"Could not lookup the group id string %d\n",(int)gid); 
+      fprintf(stderr, "could not lookup group %d\n", (int)gid); 
+      ret = -EIO;
+    } 
   }
-  return 0;
-
-}
 
-static int dfs_chmod(const char *path, mode_t mode)
-{
-  (void)path;
-  (void)mode;
-  return -ENOTSUP;
-}
+  hdfsFS userFS = NULL;
+  if (0 == ret) {
+    // if not connected, try to connect and fail out if we can't.
+    if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port))== NULL) {
+      syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+      ret = -EIO;
+    }
+  }
 
-static int dfs_chown(const char *path, uid_t uid, gid_t gid)
-{
-  (void)path;
-  (void)uid;
-  (void)gid;
-  return -ENOTSUP;
-}
+  if (0 == ret) {
+    //  fprintf(stderr, "DEBUG: chown %s %d->%s %d->%s\n", path, (int)uid, user, (int)gid, group);
+    if (hdfsChown(userFS, path, user, group)) {
+      syslog(LOG_ERR,"ERROR: hdfs trying to chown %s to %d/%d",path, (int)uid, gid);
+      ret = -EIO;
+    }
+  }
+  if (user) 
+    free(user);
+  if (group)
+    free(group);
+#endif
+  return ret;
 
-static int dfs_truncate(const char *path, off_t size)
-{
-  (void)path;
-  (void)size;
-  return -ENOTSUP;
 }
 
-long tempfh = 0;
 
 static int dfs_open(const char *path, struct fuse_file_info *fi)
 {
+  TRACE1("open", path)
+
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
   // check params and the context var
@@ -753,40 +1321,49 @@
 
   int ret = 0;
 
-  // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
-    return -EIO;
-  }
-
   // 0x8000 is always passed in and hadoop doesn't like it, so killing it here
   // bugbug figure out what this flag is and report problem to Hadoop JIRA
   int flags = (fi->flags & 0x7FFF);
 
-#ifdef OPTIMIZED_READS
   // retrieve dfs specific data
   dfs_fh *fh = (dfs_fh*)malloc(sizeof (dfs_fh));
-  fi->fh = (uint64_t)fh;
-  fh->hdfsFH = (hdfsFile)hdfsOpenFile(dfs->fs, path, flags,  0, 3, 0);
-  fh->buf = (char*)malloc(rd_cache_buf_size*sizeof (char));
-  fh->startOffset = 0;
-  fh->sizeBuffer = 0;
+  assert(fh != NULL);
 
-  if (0 == fh->hdfsFH) {
-    syslog(LOG_ERR, "ERROR: could not open file %s dfs %s:%d\n", path,__FILE__, __LINE__);
-    ret = -EIO;
+  if ((fh->fs = doConnectAsUser(dfs->nn_hostname,dfs->nn_port)) == NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
   }
-#else
 
-  // retrieve dfs specific data
-  fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, flags,  0, 3, 0);
+  if ((fh->hdfsFH = (hdfsFile)hdfsOpenFile(fh->fs, path, flags,  0, 3, 0)) == NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect open file %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
+
+  if (fi->flags & O_WRONLY || fi->flags & O_CREAT) {
+    // write specific initialization
+
+    fh->buf = NULL;
+  } else  {
+    // read specific initialization
+
+    assert(dfs->rdbuffer_size > 0);
+
+    if (NULL == (fh->buf = (char*)malloc(dfs->rdbuffer_size*sizeof (char)))) {
+      syslog(LOG_ERR, "ERROR: could not allocate memory for file buffer for a read for file %s dfs %s:%d\n", path,__FILE__, __LINE__);
+      ret = -EIO;
+    }
+
+    fh->buffersStartOffset = 0;
+    fh->bufferSize = 0;
 
-  if (0 == fi->fh) {
-    syslog(LOG_ERR, "ERROR: could not open file %s dfs %s:%d\n", path,__FILE__, __LINE__);
-    ret = -EIO;
   }
 
-#endif
+  // 
+  // mutex needed for reads/writes
+  //
+  pthread_mutex_init(&fh->mutex, NULL);
+
+  fi->fh = (uint64_t)fh;
 
   return ret;
 }
@@ -794,55 +1371,69 @@
 static int dfs_write(const char *path, const char *buf, size_t size,
                      off_t offset, struct fuse_file_info *fi)
 {
+  TRACE1("write", path)
+
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+  int ret = 0;
 
   // check params and the context var
   assert(path);
   assert(dfs);
   assert('/' == *path);
-
-  // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
-    return -EIO;
-  }
-#ifdef OPTIMIZED_READS
+  assert(fi);
 
   dfs_fh *fh = (dfs_fh*)fi->fh;
-  hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
+  assert(fh);
 
-#else
-  hdfsFile file_handle = (hdfsFile)fi->fh;
+  hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
+  assert(file_handle);
 
-  if (NULL == file_handle) {
-    syslog(LOG_ERR, "ERROR: fuse problem - no file_handle for %s %s:%d\n",path, __FILE__, __LINE__);
-    return -EIO;
-  }
-#endif
+  //
+  // Critical section - make the sanity check (tell to see the writes are sequential) and the actual write 
+  // (no returns until end)
+  //
+  pthread_mutex_lock(&fh->mutex);
 
-  //  syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
-//  tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
+  tSize length = 0;
+  assert(fh->fs);
 
- // if (cur_offset != offset) {
-  //  syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
-//    return -EIO;
-//  }
+  tOffset cur_offset = hdfsTell(fh->fs, file_handle);
+  if (cur_offset != offset) {
+    syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
+    ret =  -EIO;
 
+  } else {
 
-  syslog(LOG_DEBUG,"hdfsWrite(dfs,%ld,'%s',%d)\n",(long)file_handle,buf,(int)size);
-  tSize length = hdfsWrite(dfs->fs, file_handle, buf, size);
+    length = hdfsWrite(fh->fs, file_handle, buf, size);
 
+    if (length <= 0) {
+      syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
+      ret = -EIO;
+    } 
 
-  if (length != size) {
-    syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
-    return -EIO;
+    if (length != size) {
+      syslog(LOG_ERR, "WARN: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
+    }
   }
-  return 0;
 
+  //
+  // Critical section end 
+  //
+
+  pthread_mutex_unlock(&fh->mutex);
+
+  return ret == 0 ? length : ret;
 }
 
+/**
+ * This mutex is to protect releasing a file handle in case the user calls close in different threads
+ * and fuse passes these calls to here.
+ */
+static pthread_mutex_t release_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 int dfs_release (const char *path, struct fuse_file_info *fi) {
+  TRACE1("release", path)
 
   // retrieve dfs specific data
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
@@ -851,109 +1442,227 @@
   assert(path);
   assert(dfs);
   assert('/' == *path);
-  // if not connected, try to connect and fail out if we can't.
-  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
-    return -EIO;
-  }
 
-  if (NULL == (void*)fi->fh) {
-    return  0;
-  }
+  int ret = 0;
 
-#ifdef OPTIMIZED_READS
-  dfs_fh *fh = (dfs_fh*)fi->fh;
-  hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
-  free(fh->buf);
-  free(fh);
+  //
+  // Critical section - protect from multiple close calls in different threads.
+  // (no returns until end)
+  //
 
-#else
-  hdfsFile file_handle = (hdfsFile)fi->fh;
-#endif
+  pthread_mutex_lock(&release_mutex);
 
-  if (NULL == file_handle) {
-    return 0;
- }
+  if (NULL != (void*)fi->fh) {
 
-  if (hdfsCloseFile(dfs->fs, file_handle) != 0) {
-    syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle for %s %s:%d\n",path, __FILE__, __LINE__);
-    return -EIO;
+    dfs_fh *fh = (dfs_fh*)fi->fh;
+    assert(fh);
+
+    hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
+
+    if (NULL != file_handle) {
+      if (hdfsCloseFile(fh->fs, file_handle) != 0) {
+        syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
+        fprintf(stderr, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
+        ret = -EIO;
+      }
+    }
+
+    if (fh->buf != NULL) {
+      free(fh->buf);
+      pthread_mutex_destroy(&fh->mutex);
+    }
+
+    free(fh);
+
+    fi->fh = (uint64_t)0;
   }
 
-  fi->fh = (uint64_t)0;
-  return 0;
+  pthread_mutex_unlock(&release_mutex);
+
+  //
+  // End critical section 
+  // 
+
+  return ret;
 }
 
 static int dfs_mknod(const char *path, mode_t mode, dev_t rdev) {
+  TRACE1("mknod", path)
   syslog(LOG_DEBUG,"in dfs_mknod");
   return 0;
 }
 
 static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
 {
-  syslog(LOG_DEBUG,"in dfs_create");
+  TRACE1("create", path)
   fi->flags |= mode;
-
   return dfs_open(path, fi);
 }
+
 int dfs_flush(const char *path, struct fuse_file_info *fi) {
+  TRACE1("flush", path)
+
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
+  // check params and the context var
+  assert(path);
+  assert(dfs);
+  assert('/' == *path);
+  assert(fi);
+
+  if (NULL == (void*)fi->fh) {
+    return  0;
+  }
+
+  // note that fuse calls flush on RO files too and hdfs does not like that and will return an error
+  if (fi->flags & O_WRONLY) {
+
+    dfs_fh *fh = (dfs_fh*)fi->fh;
+    assert(fh);
+    hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
+    assert(file_handle);
+
+    assert(fh->fs);
+    if (hdfsFlush(fh->fs, file_handle) != 0) {
+      syslog(LOG_ERR, "ERROR: dfs problem - could not flush file_handle(%lx) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
+      return -EIO;
+    }
+  }
+
   return 0;
 }
 
+static int dfs_access(const char *path, int mask)
+{
+  TRACE1("access", path)
+  // bugbug - I think we need the FileSystemAPI/libhdfs to expose this!
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
+  // check params and the context var
+  assert(dfs);
+  assert(path);
+
+  hdfsFS userFS;
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port)) == NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
+  //  return hdfsAccess(userFS, path, mask);
+  return 0;
+}
 
-void dfs_setattr(struct stat *attr, int to_set, struct fuse_file_info *fi)
+
+/**
+ * For now implement truncate here and only for size == 0.
+ * Weak implementation in that we just delete the file and 
+ * then re-create it, but don't set the user, group, and times to the old
+ * file's metadata. 
+ */
+static int dfs_truncate(const char *path, off_t size)
 {
+  TRACE1("truncate", path)
+  if (size != 0) {
+    return -ENOTSUP;
+  }
+
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
+  assert(path);
+  assert('/' == *path);
+  assert(dfs);
+
+  int ret = dfs_unlink(path);
+  if (ret != 0) {
+    return ret;
+  }
+
+  hdfsFS userFS;
+  // if not connected, try to connect and fail out if we can't.
+  if ((userFS = doConnectAsUser(dfs->nn_hostname,dfs->nn_port)) == NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
 
+  int flags = O_WRONLY | O_CREAT;
+
+  hdfsFile file;
+  if ((file = (hdfsFile)hdfsOpenFile(userFS, path, flags,  0, 3, 0)) == NULL) {
+    syslog(LOG_ERR, "ERROR: could not connect open file %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
+
+  if (hdfsCloseFile(userFS, file) != 0) {
+    syslog(LOG_ERR, "ERROR: could not connect close file %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
+  return 0;
 }
 
+
+static int dfs_symlink(const char *from, const char *to)
+{
+  TRACE1("symlink", from)
+  (void)from;
+  (void)to;
+  // bugbug we need the FileSystem to support this posix API
+  return -ENOTSUP;
+}
+
+
 void dfs_destroy (void *ptr)
 {
+  TRACE("destroy")
   dfs_context *dfs = (dfs_context*)ptr;
-  hdfsDisconnect(dfs->fs);
   dfs->fs = NULL;
 }
 
 
 // Hacked up function to basically do:
-//  protectedpaths = split(PROTECTED_PATHS,',');
+//  protectedpaths = split(options.protected,':');
 
-static void init_protectedpaths() {
-  // PROTECTED_PATHS should be a #defined value from autoconf
-  // set it with configure --with-protectedpaths=/,/user,/user/foo
-  // note , seped with no other spaces and no quotes around it
-  char *tmp = PROTECTED_PATHS;
+static void init_protectedpaths(dfs_context *dfs) {
+
+  char *tmp = options.protected;
 
-  assert(tmp);
 
   // handle degenerate case up front.
-  if (0 == *tmp) {
-    protectedpaths = (char**)malloc(sizeof(char*));
-    protectedpaths[0] = NULL;
+  if (tmp == NULL || 0 == *tmp) {
+    dfs->protectedpaths = (char**)malloc(sizeof(char*));
+    dfs->protectedpaths[0] = NULL;
     return;
   }
+  assert(tmp);
+
+  if (options.debug) {
+    print_options();
+  }
+
 
   int i = 0;
-  while (tmp && (NULL != (tmp = index(tmp,',')))) {
+  while (tmp && (NULL != (tmp = index(tmp,':')))) {
     tmp++; // pass the ,
     i++;
   }
   i++; // for the last entry
   i++; // for the final NULL
-  protectedpaths = (char**)malloc(sizeof(char*)*i);
-  printf("i=%d\n",i);
-  tmp = PROTECTED_PATHS;
+  dfs->protectedpaths = (char**)malloc(sizeof(char*)*i);
+  assert(dfs->protectedpaths);
+  tmp = options.protected;
   int j  = 0;
   while (NULL != tmp && j < i) {
     int length;
-    char *eos = index(tmp,',');
+    char *eos = index(tmp,':');
     if (NULL != eos) {
       length = eos - tmp; // length of this value
     } else {
       length = strlen(tmp);
     }
-    protectedpaths[j] = (char*)malloc(sizeof(char)*length+1);
-    strncpy(protectedpaths[j], tmp, length);
-    protectedpaths[j][length] = '\0';
+    dfs->protectedpaths[j] = (char*)malloc(sizeof(char)*length+1);
+    assert(dfs->protectedpaths[j]);
+    strncpy(dfs->protectedpaths[j], tmp, length);
+    dfs->protectedpaths[j][length] = '\0';
     if (eos) {
       tmp = eos + 1;
     } else {
@@ -961,20 +1670,20 @@
     }
     j++;
   }
-  protectedpaths[j] = NULL;
+  dfs->protectedpaths[j] = NULL;
+
   /*
-  j  = 0;
-  while (protectedpaths[j]) {
-    printf("protectedpaths[%d]=%s\n",j,protectedpaths[j]);
+    j  = 0;
+    while (dfs->protectedpaths[j]) {
+    printf("dfs->protectedpaths[%d]=%s\n",j,dfs->protectedpaths[j]);
     fflush(stdout);
     j++;
-  }
-  exit(1);
+    }
+    exit(1);
   */
 }
 
 
-
 void *dfs_init()
 {
 
@@ -994,8 +1703,11 @@
   dfs->nn_hostname           = options.server;
   dfs->nn_port               = options.port;
   dfs->fs                    = NULL;
-  dfs->nowrites              = options.nowrites;
-  dfs->no_trash              = options.no_trash;
+  dfs->read_only             = options.read_only;
+  dfs->usetrash              = options.usetrash;
+  dfs->protectedpaths        = NULL;
+  dfs->rdbuffer_size         = options.rdbuffer_size;
+  dfs->direct_io             = options.direct_io;
 
   bzero(dfs->dfs_uri,0);
   sprintf(dfs->dfs_uri,"dfs://%s:%d/",dfs->nn_hostname,dfs->nn_port);
@@ -1004,8 +1716,13 @@
   // use ERR level to ensure it makes it into the log.
   syslog(LOG_ERR, "mounting %s", dfs->dfs_uri);
 
-  init_protectedpaths();
+  init_protectedpaths(dfs);
+  assert(dfs->protectedpaths != NULL);
 
+  if (dfs->rdbuffer_size <= 0) {
+    syslog(LOG_DEBUG, "WARN: dfs->rdbuffersize <= 0 = %ld %s:%d", dfs->rdbuffer_size, __FILE__, __LINE__);
+    dfs->rdbuffer_size = 32768;
+  }
   return (void*)dfs;
 }
 
@@ -1018,25 +1735,27 @@
   .init         = dfs_init,
   .open	        = dfs_open,
   .read	        = dfs_read,
+  .symlink	= dfs_symlink,
   .statfs	= dfs_statfs,
   .mkdir	= dfs_mkdir,
   .rmdir	= dfs_rmdir,
   .rename	= dfs_rename,
   .unlink       = dfs_unlink,
   .release      = dfs_release,
-  //  .create       = dfs_create,
-  //  .write	= dfs_write,
-  //  .flush        = dfs_flush,
-  //.xsetattr      = dfs_setattr,
-  //  .mknod        = dfs_mknod,
+  .create       = dfs_create,
+  .write	= dfs_write,
+  .flush        = dfs_flush,
+  .mknod        = dfs_mknod,
+	.utimens	= dfs_utimens,
   .chmod	= dfs_chmod,
   .chown	= dfs_chown,
-  //  .truncate	= dfs_truncate,
+  .truncate	= dfs_truncate,
 };
 
 
 int main(int argc, char *argv[])
 {
+
   umask(0);
 
   program = argv[0];
@@ -1045,14 +1764,60 @@
   /* clear structure that holds our options */
   memset(&options, 0, sizeof(struct options));
 
+  // some defaults
+  options.rdbuffer_size = 10*1024*1024; 
+  options.attribute_timeout = 60; 
+  options.entry_timeout = 60;
+
   if (fuse_opt_parse(&args, &options, dfs_opts, dfs_options) == -1)
     /** error parsing options */
     return -1;
 
+
+  // Some fuse options we set
+  if (! options.private) {
+    fuse_opt_add_arg(&args, "-oallow_other");
+  }
+
+  if (!options.no_permissions) {
+    fuse_opt_add_arg(&args, "-odefault_permissions");
+  }
+
+  {
+    char buf[1024];
+
+    snprintf(buf, sizeof buf, "-oattr_timeout=%d",options.attribute_timeout);
+    fuse_opt_add_arg(&args, buf);
+
+    snprintf(buf, sizeof buf, "-oentry_timeout=%d",options.entry_timeout);
+    fuse_opt_add_arg(&args, buf);
+  }
+
   if (options.server == NULL || options.port == 0) {
     print_usage(argv[0]);
     exit(0);
   }
+
+
+  // 
+  // Check we can connect to hdfs
+  // 
+  if (options.initchecks == 1) {
+    hdfsFS temp;
+    if ((temp = hdfsConnect(options.server, options.port)) == NULL) {
+      const char *cp = getenv("CLASSPATH");
+      const char *ld = getenv("LD_LIBRARY_PATH");
+      fprintf(stderr, "FATAL: misconfiguration problem, cannot connect to hdfs - here's your environment\n");
+      fprintf(stderr, "LD_LIBRARY_PATH=%s\n",ld == NULL ? "NULL" : ld);
+      fprintf(stderr, "CLASSPATH=%s\n",cp == NULL ? "NULL" : cp);
+      syslog(LOG_ERR, "FATAL: misconfiguration problem, cannot connect to hdfs - here's your environment\n");
+      syslog(LOG_ERR, "LD_LIBRARY_PATH=%s\n",ld == NULL ? "NULL" : ld);
+      syslog(LOG_ERR, "CLASSPATH=%s\n",cp == NULL ? "NULL" : cp);
+      exit(0);
+    }  
+    temp = NULL;
+  }
+
   int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
 
   if (ret) printf("\n");

Added: hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/test/TestFuseDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/test/TestFuseDFS.java?rev=706358&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/test/TestFuseDFS.java (added)
+++ hadoop/core/branches/branch-0.18/src/contrib/fuse-dfs/src/test/TestFuseDFS.java Mon Oct 20 11:23:48 2008
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.dfs.*;
+import junit.framework.TestCase;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import java.net.*;
+
+/**
+ * This class tests that the Fuse module for DFS can mount properly
+ * and does a few simple commands:
+ * mkdir
+ * rmdir
+ * ls
+ * cat
+ *
+ * cp and touch are purposely not tested because they won't work with the current module
+
+ *
+ */
+public class TestFuseDFS extends TestCase {
+
+  /**
+   * mount the fuse file system using assumed fuse module library installed in /usr/local/lib or somewhere else on your
+   * pre-existing LD_LIBRARY_PATH
+   *
+   */
+
+  static Process fuse_process;
+  static String fuse_cmd;
+  static private void mount(String mountpoint, URI dfs) throws IOException, InterruptedException  {
+
+    String cp = System.getProperty("java.class.path");
+    Runtime r = Runtime.getRuntime();
+    fuse_cmd = System.getProperty("build.test") + "/../fuse_dfs";
+    String libhdfs = System.getProperty("build.test") + "/../../../libhdfs/";
+    String arch = System.getProperty("os.arch");
+    String jvm = System.getProperty("java.home") + "/lib/" + arch + "/server";
+    String lp = System.getProperty("LD_LIBRARY_PATH") + ":" + "/usr/local/lib:" + libhdfs + ":" + jvm;
+    System.err.println("LD_LIBRARY_PATH=" + lp);
+    String cmd[] =  {  fuse_cmd, "dfs://" + dfs.getHost() + ":" + String.valueOf(dfs.getPort()), 
+                       mountpoint, "-obig_writes", "-odebug", "-oentry_timeout=1",  "-oattribute_timeout=1", "-ousetrash", "rw", "-oinitchecks",
+                       "-ordbuffer=5000"};
+    final String [] envp = {
+      "CLASSPATH="+  cp,
+      "LD_LIBRARY_PATH=" + lp,
+      "PATH=" + "/usr/bin:/bin"
+
+    };
+
+    // ensure the mount point is not currently mounted
+    Process p = r.exec("fusermount -u " + mountpoint);
+    p.waitFor();
+
+    // clean up the mount point
+    p = r.exec("rm -rf " + mountpoint);
+    assertTrue(p.waitFor() == 0);
+
+    // make the mount point if needed
+    p = r.exec("mkdir -p " + mountpoint);
+    assertTrue(p.waitFor() == 0);
+
+    // mount fuse to the mount point
+    fuse_process = r.exec(cmd, envp);
+
+    // give DFS a chance to come up
+    try { Thread.sleep(3000); } catch(Exception e) { }
+  }
+
+  /**
+   * unmounts fuse for before shutting down.
+   */
+  static private void umount(String mpoint) throws IOException, InterruptedException {
+    Runtime r= Runtime.getRuntime();
+    Process p = r.exec("fusermount -u " + mpoint);
+    p.waitFor();
+  }
+
+  /**
+   * Set things up - create mini dfs cluster and mount the fuse filesystem.
+   */
+  public TestFuseDFS() throws IOException,InterruptedException  {
+  }
+
+  static private MiniDFSCluster cluster;
+  static private DistributedFileSystem fileSys;
+  final static private String mpoint;
+
+  static {
+    mpoint = System.getProperty("build.test") + "/mnt";
+    System.runFinalizersOnExit(true);
+    startStuff();
+  }
+
+
+  static public void startStuff() {
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.permissions",false);
+      cluster = new MiniDFSCluster(conf, 1, true, null);
+      fileSys = (DistributedFileSystem)cluster.getFileSystem();
+      assertTrue(fileSys.getFileStatus(new Path("/")).isDir());
+      mount(mpoint, fileSys.getUri());
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void setUp() {
+  }
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testMkdir() throws IOException,InterruptedException, Exception  {
+    try {
+      // First create a new directory with mkdirs
+      Path path = new Path("/foo");
+      Runtime r = Runtime.getRuntime();
+      String cmd = "mkdir -p " + mpoint + path.toString();
+      Process p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      // check it is there
+      assertTrue(fileSys.getFileStatus(path).isDir());
+
+      // check again through the shell
+      String lsCmd = "ls " + mpoint + path.toString();
+      p = r.exec(lsCmd);
+      assertTrue(p.waitFor() == 0);
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testWrites() throws IOException,InterruptedException  {
+    try {
+
+      // write a hello file
+      File file = new File(mpoint, "hello.txt");
+      FileOutputStream f = new FileOutputStream(file);
+      String s = "hello ";
+      f.write(s.getBytes());
+      s = "world";
+      f.write(s.getBytes());
+      f.flush();
+      f.close();
+
+      // check the file exists.
+      Path myPath = new Path("/hello.txt");
+      assertTrue(fileSys.exists(myPath));
+
+      // check the data is ok
+      FileInputStream fi = new FileInputStream(new File(mpoint, "hello.txt"));
+      byte b[] = new byte[12];
+      int length = fi.read(b,0,12);
+      String s2 = new String( b);
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+    }
+  }
+
+
+
+  /**
+   * Test ls for dir already created in testMkdDir also tests bad ls
+   */
+  public void testLs() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+      Runtime r = Runtime.getRuntime();
+
+      // mkdir
+      Process p = r.exec("mkdir -p " + mpoint + "/test/mkdirs");
+      assertTrue(p.waitFor() == 0);
+
+      // ls
+      p = r.exec("ls " + mpoint + "/test/mkdirs");
+      assertTrue(p.waitFor() == 0);
+
+      // ls non-existant directory
+      p = r.exec("ls " + mpoint + "/test/mkdirsNotThere");
+      int res = p.waitFor();
+      assertFalse(res == 0);
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  /**
+   * Remove a dir using the shell and use filesys to see it no longer exists.
+   */
+  public void testRmdir() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+
+      Runtime r = Runtime.getRuntime();
+      Process p = r.exec("mkdir -p " + mpoint + "/test/rmdir");
+      assertTrue(p.waitFor() == 0);
+
+      Path myPath = new Path("/test/rmdir");
+      assertTrue(fileSys.exists(myPath));
+
+      // remove it
+      p = r.exec("rmdir " + mpoint + "/test/rmdir");
+      assertTrue(p.waitFor() == 0);
+
+      // check it is not there
+      assertFalse(fileSys.exists(myPath));
+
+      Path trashPath = new Path("/user/root/.Trash/Current/test/rmdir");
+      assertTrue(fileSys.exists(trashPath));
+
+      // make it again to test trashing same thing twice
+      p = r.exec("mkdir -p " + mpoint + "/test/rmdir");
+      assertTrue(p.waitFor() == 0);
+
+      assertTrue(fileSys.exists(myPath));
+
+      // remove it
+      p = r.exec("rmdir " + mpoint + "/test/rmdir");
+      assertTrue(p.waitFor() == 0);
+
+      // check it is not there
+      assertFalse(fileSys.exists(myPath));
+
+      trashPath = new Path("/user/root/.Trash/Current/test/rmdir.1");
+      assertTrue(fileSys.exists(trashPath));
+
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testDF() throws IOException,InterruptedException, Exception  {
+    try {
+      // First create a new directory with mkdirs
+      Path path = new Path("/foo");
+      Runtime r = Runtime.getRuntime();
+      String cmd = "mkdir -p " + mpoint + path.toString();
+      Process p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+      File f = new File(mpoint + "/foo");
+
+      DistributedFileSystem.DiskStatus d = fileSys.getDiskStatus();
+
+      System.err.println("DEBUG:f.total=" + f.getTotalSpace());
+      System.err.println("DEBUG:d.capacity=" + d.getCapacity());
+
+      System.err.println("DEBUG:f.usable=" + f.getUsableSpace());
+
+      System.err.println("DEBUG:f.free=" + f.getFreeSpace());
+      System.err.println("DEBUG:d.remaining = " + d.getRemaining());
+
+      System.err.println("DEBUG:d.used = " + d.getDfsUsed());
+      System.err.println("DEBUG:f.total - f.free = " + (f.getTotalSpace() - f.getFreeSpace()));
+
+      long fileUsedBlocks =  (f.getTotalSpace() - f.getFreeSpace())/(64 * 1024 * 1024);
+      long dfsUsedBlocks = (long)Math.ceil((double)d.getDfsUsed()/(64 * 1024 * 1024));
+      System.err.println("DEBUG: fileUsedBlocks = " + fileUsedBlocks);
+      System.err.println("DEBUG: dfsUsedBlocks =  " + dfsUsedBlocks);
+
+      assertTrue(f.getTotalSpace() == f.getUsableSpace());
+      assertTrue(fileUsedBlocks == dfsUsedBlocks);
+      assertTrue(d.getCapacity() == f.getTotalSpace());
+
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testChown() throws IOException,InterruptedException, Exception  {
+    try {
+      // First create a new directory with mkdirs
+      Path path = new Path("/foo");
+      Runtime r = Runtime.getRuntime();
+      String cmd = "mkdir -p " + mpoint + path.toString();
+      Process p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      // check it is there
+      assertTrue(fileSys.getFileStatus(path).isDir());
+
+      FileStatus foo = fileSys.getFileStatus(path);
+      System.err.println("DEBUG:owner=" + foo.getOwner());
+
+      cmd = "chown nobody " + mpoint + path.toString();
+      p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      //      cmd = "chgrp nobody " + mpoint + path.toString();
+      //      p = r.exec(cmd);
+      //      assertTrue(p.waitFor() == 0);
+
+      foo = fileSys.getFileStatus(path);
+
+      System.err.println("DEBUG:owner=" + foo.getOwner());
+
+      assertTrue(foo.getOwner().equals("nobody"));
+      assertTrue(foo.getGroup().equals("nobody"));
+
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testChmod() throws IOException,InterruptedException, Exception  {
+    try {
+      // First create a new directory with mkdirs
+      Path path = new Path("/foo");
+      Runtime r = Runtime.getRuntime();
+      String cmd = "mkdir -p " + mpoint + path.toString();
+      Process p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      // check it is there
+      assertTrue(fileSys.getFileStatus(path).isDir());
+
+      cmd = "chmod 777 " + mpoint + path.toString();
+      p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      FileStatus foo = fileSys.getFileStatus(path);
+      FsPermission perm = foo.getPermission();
+      assertTrue(perm.toShort() == 0777);
+
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testUtimes() throws IOException,InterruptedException, Exception  {
+    try {
+      // First create a new directory with mkdirs
+      Path path = new Path("/utimetest");
+      Runtime r = Runtime.getRuntime();
+      String cmd = "touch " + mpoint + path.toString();
+      Process p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      // check it is there
+      assertTrue(fileSys.exists(path));
+
+      FileStatus foo = fileSys.getFileStatus(path);
+      long oldTime = foo.getModificationTime();
+      try { Thread.sleep(1000); } catch(Exception e) {}
+
+      cmd = "touch " + mpoint + path.toString();
+      p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      try { Thread.sleep(1000); } catch(Exception e) {}
+      foo = fileSys.getFileStatus(path);
+      long newTime = foo.getModificationTime();
+
+      assertTrue(newTime > oldTime);
+
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw e;
+    } finally {
+    }
+  }
+
+  /**
+   *
+   * Test dfs_read on a file size that will trigger multiple internal reads. 
+   * First, just check raw size reading is ok and then check with smaller reads
+   * including checking the validity of the data read.
+   *
+   */
+  public void testReads() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+      Runtime r = Runtime.getRuntime();
+      Process p;
+
+      // create the file
+      Path myPath = new Path("/test/hello.reads");
+      FSDataOutputStream s = fileSys.create(myPath);
+      String hello = "hello world!";
+      int written = 0;
+      int mycount = 0;
+      while(written < 1024 * 9) {
+        s.writeUTF(hello);
+        s.writeInt(mycount++);
+        written += hello.length() + 4;
+      }
+      s.close();
+
+      // check it exists
+      assertTrue(fileSys.exists(myPath));
+      FileStatus foo = fileSys.getFileStatus(myPath);
+      assertTrue(foo.getLen() >= 9 * 1024);
+
+      {
+        // cat the file
+        DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello.reads"));
+        byte buf [] = new byte[4096];
+        assertTrue(is.read(buf, 0, 1024) == 1024);
+        assertTrue(is.read(buf, 0, 4096) == 4096);
+        assertTrue(is.read(buf, 0, 4096) == 4096);
+        is.close();
+      }
+
+      {
+        DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello.reads"));
+        int read = 0;
+        int counter = 0;
+        try {
+          while(true) {
+            String s2 = DataInputStream.readUTF(is);
+            int s3 = is.readInt();
+            assertTrue(s2.equals(hello));
+            assertTrue(s3 == counter++);
+            read += hello.length() + 4;
+          }
+        } catch(EOFException e) {
+          assertTrue(read >= 9 * 1024);
+        }
+      }
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+    }
+  }
+
+
+  /**
+   * Use filesys to create the hello world! file and then cat it and see its contents are correct.
+   */
+  public void testCat() throws IOException,InterruptedException  {
+    if(true) return;
+    try {
+      // First create a new directory with mkdirs
+      Runtime r = Runtime.getRuntime();
+      Process p = r.exec("rm -rf " + mpoint + "/test/hello");
+      assertTrue(p.waitFor() == 0);
+
+      // create the file
+      Path myPath = new Path("/test/hello");
+      FSDataOutputStream s = fileSys.create(myPath);
+      String hello = "hello world!";
+      s.writeUTF(hello);
+      s.writeInt(1033);
+      s.close();
+
+      // check it exists
+      assertTrue(fileSys.exists(myPath));
+
+      // cat the file
+      DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello"));
+      String s2 = DataInputStream.readUTF(is);
+      int s3 = is.readInt();
+      assertTrue(s2.equals(hello));
+      assertTrue(s3 == 1033);
+
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+    }
+  }
+
+  public void testDone() throws IOException {
+    close();
+  }
+
+  /**
+   * Unmount and close
+   */
+  protected void tearDown() throws Exception {
+  }
+
+  /**
+   * Unmount and close
+   */
+  protected void finalize() throws Throwable {
+    close();
+  }
+
+  public void close() {
+    try {
+
+      // print out the fuse debug output
+      {
+      InputStream i = fuse_process.getInputStream();
+      byte b[] = new byte[i.available()];
+      int length = i.read(b);
+      System.err.println("read x bytes: " + length);
+      System.err.write(b,0,b.length);
+      }
+
+      int length;
+      do {
+      InputStream i = fuse_process.getErrorStream();
+      byte b[] = new byte[i.available()];
+      length = i.read(b);
+      System.err.println("read x bytes: " + length);
+      System.err.write(b,0,b.length);
+      } while(length > 0) ;
+
+      umount(mpoint);
+
+      fuse_process.destroy();
+      fuse_process = null;
+        if(fileSys != null) {
+        fileSys.close();
+        fileSys = null;
+      }
+      if(cluster != null) {
+        cluster.shutdown();
+        cluster = null;
+      }
+    } catch(Exception e) { }
+  }
+};



Mime
View raw message