hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r678074 - in /hadoop/core/trunk/src/contrib/fuse-dfs: build.xml src/fuse_dfs.c src/fuse_dfs_wrapper.sh src/test/ src/test/TestFuseDFS.java test/TestFuseDFS.java
Date Fri, 18 Jul 2008 23:32:44 GMT
Author: dhruba
Date: Fri Jul 18 16:32:44 2008
New Revision: 678074

URL: http://svn.apache.org/viewvc?rev=678074&view=rev
Log:
HADOOP-3485. Allow writing to files over fuse.
(Pete Wyckoff via dhruba)


Added:
    hadoop/core/trunk/src/contrib/fuse-dfs/src/test/
    hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java
Removed:
    hadoop/core/trunk/src/contrib/fuse-dfs/test/TestFuseDFS.java
Modified:
    hadoop/core/trunk/src/contrib/fuse-dfs/build.xml
    hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c
    hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh

Modified: hadoop/core/trunk/src/contrib/fuse-dfs/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/build.xml?rev=678074&r1=678073&r2=678074&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/build.xml (original)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/build.xml Fri Jul 18 16:32:44 2008
@@ -44,8 +44,16 @@
       <env key="HADOOP_HOME" value="${hadoop.root}"/>
       <env key="PROTECTED_PATHS" value="/,/Trash,/user"/>
       <env key="PACKAGE_VERSION" value="0.1.0"/>
-      <env key="FUSE_HOME" value="/usr/local"/>
     </exec>
+    <mkdir dir="${build.dir}"/>
+    <mkdir dir="${build.dir}/test"/>
+    <exec executable="cp" failonerror="true">
+    <arg line="${root}/src/fuse_dfs ${build.dir}"/>
+    </exec>
+    <exec executable="cp" failonerror="true">
+    <arg line="${root}/src/fuse_dfs_wrapper.sh ${build.dir}"/>
+    </exec>
+
   </target>
 
   <!-- override jar target !-->
@@ -65,6 +73,7 @@
     </exec>
   </target>
 
+
   <!-- override clean target !-->
   <target name="clean" depends="check-libhdfs-fuse" if="libhdfs-fuse">
     <echo message="contrib: ${name}"/>

Modified: hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c?rev=678074&r1=678073&r2=678074&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c (original)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c Fri Jul 18 16:32:44 2008
@@ -88,7 +88,6 @@
 
 #define OPTIMIZED_READS 1
 
-
 enum
   {
     KEY_VERSION,
@@ -178,7 +177,7 @@
 
   // if not connected, try to connect and fail out if we can't.
   if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port)))
{
-    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    syslog(LOG_ERR, "ERROR: could not connect to %s:%d %s:%d\n", dfs->nn_hostname, dfs->nn_port,__FILE__,
__LINE__);
     return -EIO;
   }
 
@@ -733,12 +732,12 @@
   return -ENOTSUP;
 }
 
-static int dfs_truncate(const char *path, off_t size)
-{
-  (void)path;
-  (void)size;
-  return -ENOTSUP;
-}
+//static int dfs_truncate(const char *path, off_t size)
+//{
+//  (void)path;
+//  (void)size;
+//  return -ENOTSUP;
+//}
 
 long tempfh = 0;
 
@@ -746,6 +745,7 @@
 {
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
 
+
   // check params and the context var
   assert(path);
   assert('/' == *path);
@@ -763,12 +763,14 @@
   // bugbug figure out what this flag is and report problem to Hadoop JIRA
   int flags = (fi->flags & 0x7FFF);
 
+
 #ifdef OPTIMIZED_READS
   // retrieve dfs specific data
   dfs_fh *fh = (dfs_fh*)malloc(sizeof (dfs_fh));
   fi->fh = (uint64_t)fh;
   fh->hdfsFH = (hdfsFile)hdfsOpenFile(dfs->fs, path, flags,  0, 3, 0);
   fh->buf = (char*)malloc(rd_cache_buf_size*sizeof (char));
+
   fh->startOffset = 0;
   fh->sizeBuffer = 0;
 
@@ -777,6 +779,25 @@
     ret = -EIO;
   }
 #else
+  //  fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
+
+  // bugbug should stop O_RDWR flag here.
+
+
+  // bugbug when fix  https://issues.apache.org/jira/browse/HADOOP-3723 can remove the below
code
+  if (flags & O_WRONLY) {
+    flags = O_WRONLY;
+
+  }
+
+  if (flags & O_RDWR) {
+    // NOTE - should not normally be checking policy in the middleman, but the handling of
Unix flags in DFS is not
+    // consistent right now. 2008-07-16
+    syslog(LOG_ERR, "ERROR: trying to open a file with O_RDWR and DFS does not support that
%s dfs %s:%d\n", path,__FILE__, __LINE__);
+    return -EIO;
+  }
+
+  //  fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
 
   // retrieve dfs specific data
   fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, flags,  0, 3, 0);
@@ -821,24 +842,27 @@
   }
 #endif
 
-  //  syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
-//  tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
+  syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
+  tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
 
- // if (cur_offset != offset) {
-  //  syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s
%s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
-//    return -EIO;
-//  }
+  if (cur_offset != offset) {
+    syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",(int)cur_offset,
(int)offset,path, __FILE__, __LINE__);
+    return -EIO;
+  }
 
 
-  syslog(LOG_DEBUG,"hdfsWrite(dfs,%ld,'%s',%d)\n",(long)file_handle,buf,(int)size);
   tSize length = hdfsWrite(dfs->fs, file_handle, buf, size);
 
-
-  if (length != size) {
+  if(length <= 0) {
     syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size,
__FILE__, __LINE__);
     return -EIO;
   }
-  return 0;
+
+  if (length != size) {
+    syslog(LOG_ERR, "WARN: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size,
__FILE__, __LINE__);
+  }
+
+  return length;
 
 }
 
@@ -866,7 +890,6 @@
   hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
   free(fh->buf);
   free(fh);
-
 #else
   hdfsFile file_handle = (hdfsFile)fi->fh;
 #endif
@@ -876,7 +899,8 @@
  }
 
   if (hdfsCloseFile(dfs->fs, file_handle) != 0) {
-    syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle for %s %s:%d\n",path,
__FILE__, __LINE__);
+    syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path,
__FILE__, __LINE__);
+    //    fprintf(stderr, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path,
__FILE__, __LINE__);
     return -EIO;
   }
 
@@ -891,12 +915,47 @@
 
 static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
 {
-  syslog(LOG_DEBUG,"in dfs_create");
   fi->flags |= mode;
-
   return dfs_open(path, fi);
 }
+
 int dfs_flush(const char *path, struct fuse_file_info *fi) {
+
+  // retrieve dfs specific data
+  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
+  // check params and the context var
+  assert(path);
+  assert(dfs);
+  assert('/' == *path);
+
+
+  // if not connected, try to connect and fail out if we can't.
+  if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port)))
{
+    syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
+    return -EIO;
+  }
+
+  if (NULL == (void*)fi->fh) {
+    return  0;
+  }
+
+  // note that fuse calls flush on RO files too and hdfs does not like that and will return
an error
+  if(fi->flags & O_WRONLY) {
+
+#ifdef OPTIMIZED_READS
+    dfs_fh *fh = (dfs_fh*)fi->fh;
+    hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
+#else
+    hdfsFile file_handle = (hdfsFile)fi->fh;
+#endif
+
+    if (hdfsFlush(dfs->fs, file_handle) != 0) {
+      syslog(LOG_ERR, "ERROR: dfs problem - could not flush file_handle(%x) for %s %s:%d\n",(long)file_handle,path,
__FILE__, __LINE__);
+      return -EIO;
+    }
+  }
+
   return 0;
 }
 
@@ -1024,11 +1083,11 @@
   .rename	= dfs_rename,
   .unlink       = dfs_unlink,
   .release      = dfs_release,
-  //  .create       = dfs_create,
-  //  .write	= dfs_write,
-  //  .flush        = dfs_flush,
+  .create       = dfs_create,
+  .write	= dfs_write,
+  .flush        = dfs_flush,
   //.xsetattr      = dfs_setattr,
-  //  .mknod        = dfs_mknod,
+    .mknod        = dfs_mknod,
   .chmod	= dfs_chmod,
   .chown	= dfs_chown,
   //  .truncate	= dfs_truncate,
@@ -1037,6 +1096,7 @@
 
 int main(int argc, char *argv[])
 {
+
   umask(0);
 
   program = argv[0];
@@ -1049,10 +1109,13 @@
     /** error parsing options */
     return -1;
 
+
   if (options.server == NULL || options.port == 0) {
     print_usage(argv[0]);
     exit(0);
   }
+
+
   int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
 
   if (ret) printf("\n");

Modified: hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh?rev=678074&r1=678073&r2=678074&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh (original)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh Fri Jul 18 16:32:44 2008
@@ -15,23 +15,26 @@
 #
 
 if [ "$HADOOP_HOME" = "" ]; then
- HADOOP_HOME=/usr/local/share/hadoop
+export HADOOP_HOME=/usr/local/share/hadoop
 fi
 
+export PATH=$HADOOP_HOME/contrib/fuse_dfs:$PATH
+
 for f in ls $HADOOP_HOME/lib/*.jar $HADOOP_HOME/*.jar ; do
-  CLASSPATH=$CLASSPATH:$f
+export  CLASSPATH=$CLASSPATH:$f
 done
 
 if [ "$OS_ARCH" = "" ]; then
- OS_ARCH=amd64
+export OS_ARCH=amd64
 fi
 
 if [ "$JAVA_HOME" = "" ]; then
-  JAVA_HOME=/usr/local/java
+export  JAVA_HOME=/usr/local/java
 fi
 
 if [ "$LD_LIBRARY_PATH" = "" ]; then
- LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
+export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
 fi
+echo $LD_LIBRARY_PATH
 
 ./fuse_dfs $@  -o-o allow_other

Added: hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java?rev=678074&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java (added)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java Fri Jul 18 16:32:44 2008
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.dfs.*;
+import junit.framework.TestCase;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import java.net.*;
+
+/**
+ * This class tests that the Fuse module for DFS can mount properly
+ * and does a few simple commands:
+ * mkdir
+ * rmdir
+ * ls
+ * cat
+ *
+ * cp and touch are purposely not tested because they won't work with the current module
+
+ *
+ */
+public class TestFuseDFS extends TestCase {
+
+  /**
+   * mount the fuse file system using assumed fuse module library installed in /usr/local/lib
or somewhere else on your
+   * pre-existing LD_LIBRARY_PATH
+   *
+   */
+
+  static Process fuse_process;
+  static String fuse_cmd;
+  static private void mount(String mountpoint, URI dfs) throws IOException, InterruptedException
 {
+
+    String cp = System.getProperty("java.class.path");
+    Runtime r = Runtime.getRuntime();
+    fuse_cmd = System.getProperty("build.test") + "/../fuse_dfs";
+    String libhdfs = System.getProperty("build.test") + "/../../../libhdfs/";
+    String jvm = System.getProperty("java.home") + "/lib/amd64/server";
+    String lp = System.getProperty("LD_LIBRARY_PATH") + ":" + "/usr/local/lib:" + libhdfs
+ ":" + jvm;
+    System.err.println("LD_LIBRARY_PATH=" + lp);
+    String cmd[] = new String[4];
+    int index = 0;
+
+    cmd[index++] = fuse_cmd;
+    cmd[index++] = "dfs://" + dfs.getHost() + ":" + String.valueOf(dfs.getPort());
+    cmd[index++] = mountpoint;
+    cmd[index++] = "-d";
+    final String [] envp = {
+      "CLASSPATH="+  cp,
+      "LD_LIBRARY_PATH=" + lp,
+      "PATH=" + "/usr/bin:/bin"
+
+    };
+
+    // ensure the mount point is not currently mounted
+    Process p = r.exec("fusermount -u " + mountpoint);
+    p.waitFor();
+
+    // clean up the mount point
+    p = r.exec("rm -rf " + mountpoint);
+    assertTrue(p.waitFor() == 0);
+
+    // make the mount point if needed
+    p = r.exec("mkdir -p " + mountpoint);
+    assertTrue(p.waitFor() == 0);
+
+    // mount fuse to the mount point
+    fuse_process = r.exec(cmd, envp);
+
+    // give DFS a chance to come up
+    try { Thread.sleep(3000); } catch(Exception e) { }
+  }
+
+  /**
+   * unmounts fuse for before shutting down.
+   */
+  static private void umount(String mpoint) throws IOException, InterruptedException {
+    Runtime r= Runtime.getRuntime();
+    Process p = r.exec("fusermount -u " + mpoint);
+    p.waitFor();
+  }
+
+  /**
+   * Set things up - create mini dfs cluster and mount the fuse filesystem.
+   */
+  public TestFuseDFS() throws IOException,InterruptedException  {
+  }
+
+  static private MiniDFSCluster cluster;
+  static private FileSystem fileSys;
+  final static private String mpoint;
+
+  static {
+    mpoint = System.getProperty("build.test") + "/mnt";
+    System.runFinalizersOnExit(true);
+    startStuff();
+  }
+
+
+  static public void startStuff() {
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.permissions",false);
+      cluster = new MiniDFSCluster(conf, 1, true, null);
+      fileSys = cluster.getFileSystem();
+      assertTrue(fileSys.getFileStatus(new Path("/")).isDir());
+      mount(mpoint, fileSys.getUri());
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void setUp() {
+  }
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testMkdir() throws IOException,InterruptedException, Exception  {
+    try {
+      // First create a new directory with mkdirs
+      Path path = new Path("/foo");
+      Runtime r = Runtime.getRuntime();
+      String cmd = "mkdir -p " + mpoint + path.toString();
+      Process p = r.exec(cmd);
+      assertTrue(p.waitFor() == 0);
+
+      // check it is there
+      assertTrue(fileSys.getFileStatus(path).isDir());
+
+      // check again through the shell
+      String lsCmd = "ls " + mpoint + path.toString();
+      p = r.exec(lsCmd);
+      assertTrue(p.waitFor() == 0);
+    } catch(Exception e) {
+      System.err.println("e=" + e);
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+
+  /**
+   * use shell to create a dir and then use filesys to see it exists.
+   */
+  public void testWrites() throws IOException,InterruptedException  {
+    try {
+
+      // write a hello file
+      File file = new File(mpoint, "hello.txt");
+      FileOutputStream f = new FileOutputStream(file);
+      String s = "hello ";
+      f.write(s.getBytes());
+      s = "world";
+      f.write(s.getBytes());
+      f.flush();
+      f.close();
+
+      // check the file exists.
+      Path myPath = new Path("/hello.txt");
+      assertTrue(fileSys.exists(myPath));
+
+      // check the data is ok
+      FileInputStream fi = new FileInputStream(new File(mpoint, "hello.txt"));
+      byte b[] = new byte[12];
+      int length = fi.read(b,0,12);
+      String s2 = new String( b);
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+    }
+  }
+
+
+
+  /**
+   * Test ls for dir already created in testMkdDir also tests bad ls
+   */
+  public void testLs() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+      Runtime r = Runtime.getRuntime();
+
+      // mkdir
+      Process p = r.exec("mkdir -p " + mpoint + "/test/mkdirs");
+      assertTrue(p.waitFor() == 0);
+
+      // ls
+      p = r.exec("ls " + mpoint + "/test/mkdirs");
+      assertTrue(p.waitFor() == 0);
+
+      // ls non-existant directory
+      p = r.exec("ls " + mpoint + "/test/mkdirsNotThere");
+      int res = p.waitFor();
+      assertFalse(res == 0);
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  /**
+   * Remove a dir using the shell and use filesys to see it no longer exists.
+   */
+  public void testRmdir() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+
+      Runtime r = Runtime.getRuntime();
+      Process p = r.exec("mkdir -p " + mpoint + "/test/mkdirs");
+      Path myPath = new Path("/test/mkdirs");
+      assertTrue(fileSys.exists(myPath));
+
+      // remove it
+      p = r.exec("rmdir " + mpoint + "/test/mkdirs");
+      assertTrue(p.waitFor() == 0);
+
+      // check it is not there
+      assertFalse(fileSys.exists(myPath));
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+
+  /**
+   * Use filesys to create the hello world! file and then cat it and see its contents are
correct.
+   */
+  public void testCat() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+      Runtime r = Runtime.getRuntime();
+      Process p = r.exec("rm -rf " + mpoint + "/test/hello");
+      assertTrue(p.waitFor() == 0);
+
+      // create the file
+      Path myPath = new Path("/test/hello");
+      FSDataOutputStream s = fileSys.create(myPath);
+      String hello = "hello world!";
+      s.write(hello.getBytes());
+      s.close();
+
+      // check it exists
+      assertTrue(fileSys.exists(myPath));
+
+      // cat the file
+      p = r.exec("cat " + mpoint + "/test/hello");
+      assertTrue(p != null);
+      assertTrue(p.waitFor() == 0);
+
+      // check the data is the same
+      {
+        InputStream i = p.getInputStream();
+        byte b[] = new byte[1024];
+        int length = i.read(b);
+        String s2 = new String(b,0,length);
+        assertTrue(s2.equals(hello));
+      }
+
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+      close();
+    }
+  }
+
+
+  /**
+   * Unmount and close
+   */
+  protected void tearDown() throws Exception {
+  }
+
+  /**
+   * Unmount and close
+   */
+  protected void finalize() throws Throwable {
+    close();
+  }
+
+  public void close() {
+    try {
+
+      // print out the fuse debug output
+      {
+      InputStream i = fuse_process.getInputStream();
+      byte b[] = new byte[i.available()];
+      int length = i.read(b);
+      System.err.println("read x bytes: " + length);
+      System.err.write(b,0,b.length);
+      }
+
+      int length;
+      do {
+      InputStream i = fuse_process.getErrorStream();
+      byte b[] = new byte[i.available()];
+      length = i.read(b);
+      System.err.println("read x bytes: " + length);
+      System.err.write(b,0,b.length);
+      } while(length > 0) ;
+
+      umount(mpoint);
+
+      fuse_process.destroy();
+      fuse_process = null;
+      if(fileSys != null) {
+        fileSys.close();
+        fileSys = null;
+      }
+      if(cluster != null) {
+        cluster.shutdown();
+        cluster = null;
+      }
+    } catch(Exception e) { }
+  }
+};



Mime
View raw message