hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r512499 [2/3] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop...
Date Tue, 27 Feb 2007 23:45:51 GMT
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java?view=auto&rev=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java Tue Feb 27 15:45:46 2007
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/****************************************************************
+ * A <code>FilterFileSystem</code> contains
+ * some other file system, which it uses as
+ * its  basic file system, possibly transforming
+ * the data along the way or providing  additional
+ * functionality. The class <code>FilterFileSystem</code>
+ * itself simply overrides all  methods of
+ * <code>FileSystem</code> with versions that
+ * pass all requests to the contained  file
+ * system. Subclasses of <code>FilterFileSystem</code>
+ * may further override some of  these methods
+ * and may also provide additional methods
+ * and fields.
+ *
+ * @author Hairong Kuang
+ *****************************************************************/
+public class FilterFileSystem extends FileSystem {
+  
+  protected FileSystem fs;
+  
+  public FilterFileSystem( FileSystem fs) {
+    this.fs = fs;
+  }
+
+  /** Called after a new FileSystem instance is constructed.
+   * @param name a uri whose authority section names the host, port, etc.
+   *   for this FileSystem
+   * @param conf the configuration
+   */
+  public void initialize(URI name, Configuration conf) throws IOException {
+    fs.initialize(name, conf);
+  }
+
+  /** Returns a URI whose scheme and authority identify this FileSystem.*/
+  public URI getUri() {
+    return fs.getUri();
+  }
+
+  /** @deprecated call #getUri() instead.*/
+  public String getName() {
+    return fs.getName();
+  }
+
+  /** Make sure that a path specifies a FileSystem. */
+  public Path makeQualified(Path path) {
+    return fs.makeQualified(path);
+  }
+  
+  ///////////////////////////////////////////////////////////////
+  // FileSystem
+  ///////////////////////////////////////////////////////////////
+
+  /** Check that a Path belongs to this FileSystem. */
+  protected void checkPath(Path path) {
+    fs.checkPath(path);
+  }
+
+  /**
+   * Return a 2D array of size 1x1 or greater, containing hostnames 
+   * where portions of the given file can be found.  For a nonexistent 
+   * file or regions, null will be returned.
+   *
+   * This call is most helpful with DFS, where it returns 
+   * hostnames of machines that contain the given file.
+   *
+   * The FileSystem will simply return an elt containing 'localhost'.
+   */
+  public String[][] getFileCacheHints(Path f, long start, long len)
+  throws IOException {
+    return fs.getFileCacheHints(f, start, len);
+  }
+
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   */
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return fs.open(f, bufferSize);
+  }
+  
+  /**
+   * Opens an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open
+   * @param overwrite if a file with this name already exists, then if true,
+   *   the file will be overwritten, and if false an error will be thrown.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file. 
+   */
+  public FSDataOutputStream create( Path f, 
+                                    boolean overwrite,
+                                    int bufferSize,
+                                    short replication,
+                                    long blockSize,
+                                    Progressable progress
+                                   ) throws IOException {
+    return fs.create(f, overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+  /**
+   * Get replication.
+   * 
+   * @param src file name
+   * @return file replication
+   * @throws IOException
+   */
+  public short getReplication(Path src) throws IOException {
+    return fs.getReplication(src);
+  }
+
+  /**
+   * Set replication for an existing file.
+   * 
+   * @param src file name
+   * @param replication new replication
+   * @throws IOException
+   * @return true if successful;
+   *         false if file does not exist or is a directory
+   */
+  public boolean setReplication(Path src, short replication) throws IOException {
+    return fs.setReplication(src, replication);
+  }
+  
+  /**
+   * Renames Path src to Path dst.  Can take place on local fs
+   * or remote DFS.
+   */
+  public boolean rename(Path src, Path dst) throws IOException {
+    return fs.rename(src, dst);
+  }
+  
+  /** Delete a file */
+  public boolean delete(Path f) throws IOException {
+    return fs.delete(f);
+  }
+  
+  /** Check if exists.
+   * @param f source file
+   */
+  public boolean exists(Path f) throws IOException {
+    return fs.exists(f);
+  }
+
+  /** True iff the named path is a directory. */
+  public boolean isDirectory(Path f) throws IOException {
+    return fs.isDirectory(f);
+  }
+
+  /** The number of bytes in a file. */
+  public long getLength(Path f) throws IOException {
+    return fs.getLength(f);
+  }
+  
+  /** List files in a directory. */
+  public Path[] listPaths(Path f) throws IOException {
+    return fs.listPaths(f);
+  }
+  
+  /**
+   * Set the current working directory for the given file system. All relative
+   * paths will be resolved relative to it.
+   * 
+   * @param newDir
+   */
+  public void setWorkingDirectory(Path newDir) {
+    fs.setWorkingDirectory(newDir);
+  }
+  
+  /**
+   * Get the current working directory for the given file system
+   * 
+   * @return the directory pathname
+   */
+  public Path getWorkingDirectory() {
+    return fs.getWorkingDirectory();
+  }
+  
+  /**
+   * Make the given file and all non-existent parents into directories. Has
+   * the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is
+   * not an error.
+   */
+  public boolean mkdirs(Path f) throws IOException {
+    return fs.mkdirs(f);
+  }
+
+  /**
+   * Obtain a lock on the given Path
+   * 
+   * @deprecated FS does not support file locks anymore.
+   */
+  @Deprecated
+  public void lock(Path f, boolean shared) throws IOException {
+    fs.lock(f, shared);
+  }
+
+  /**
+   * Release the lock
+   * 
+   * @deprecated FS does not support file locks anymore.     
+   */
+  @Deprecated
+  public void release(Path f) throws IOException {
+    fs.release(f);
+  }
+
+  /**
+   * The src file is on the local disk.  Add it to FS at
+   * the given dst name.
+   * delSrc indicates if the source should be removed
+   */
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+  throws IOException {
+    fs.copyFromLocalFile(delSrc, src, dst);
+  }
+  
+  /**
+   * The src file is under FS, and the dst is on the local disk.
+   * Copy it from FS control to the local dst name.
+   * delSrc indicates if the src will be removed or not.
+   */   
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+  throws IOException {
+    fs.copyToLocalFile(delSrc, src, dst);
+  }
+  
+  /**
+   * Returns a local File that the user can write output to.  The caller
+   * provides both the eventual FS target name and the local working
+   * file.  If the FS is local, we write directly into the target.  If
+   * the FS is remote, we write into the tmp local area.
+   */
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  throws IOException {
+    return fs.startLocalOutput(fsOutputFile, tmpLocalFile);
+  }
+
+  /**
+   * Called when we're all done writing to the target.  A local FS will
+   * do nothing, because we've written to exactly the right place.  A remote
+   * FS will copy the contents of tmpLocalFile to the correct target at
+   * fsOutputFile.
+   */
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  throws IOException {
+    fs.completeLocalOutput(fsOutputFile, tmpLocalFile);
+  }
+
+  /**
+   * Get the block size for a particular file.
+   * @param f the filename
+   * @return the number of bytes in a block
+   */
+  public long getBlockSize(Path f) throws IOException {
+    return fs.getBlockSize(f);
+  }
+  
+  /** Return the number of bytes that large input files should be optimally
+   * be split into to minimize i/o time. */
+  public long getDefaultBlockSize() {
+    return fs.getDefaultBlockSize();
+  }
+  
+  /**
+   * Get the default replication.
+   */
+  public short getDefaultReplication() {
+    return fs.getDefaultReplication();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return fs.getConf();
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Tue Feb 27 15:45:46 2007
@@ -21,6 +21,7 @@
 import java.text.*;
 
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.dfs.DistributedFileSystem;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.util.ToolBase;
 
@@ -142,7 +143,7 @@
         }
         Path dst = new Path( dstf );
         for( int i=0; i<srcs.length; i++ ) {
-          fs.copyToLocalFile( srcs[i], dst, copyCrc );
+          ((DistributedFileSystem)fs).copyToLocalFile(srcs[i], dst, copyCrc);
         }
       }
     }
@@ -497,7 +498,7 @@
             + "destination should be a directory." );
       }
       for( int i=0; i<srcs.length; i++ ) {
-        FileUtil.copy(fs, srcs[i], fs, dst, false, true, conf);
+        FileUtil.copy(fs, srcs[i], fs, dst, false, conf);
       }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Tue Feb 27 15:45:46 2007
@@ -19,6 +19,7 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.URI;
 import java.util.*;
 
@@ -35,12 +36,12 @@
  * @author ddas
  *
  */
-public class InMemoryFileSystem extends FileSystem {
+public class InMemoryFileSystem extends ChecksumFileSystem {
+  private static class RawInMemoryFileSystem extends FileSystem {
   private URI uri;
   private int fsSize;
   private volatile int totalUsed;
   private Path staticWorkingDir;
-  private int bytesPerSum;
   
   //pathToFileAttribs is the final place where a file is put after it is closed
   private Map <String, FileAttributes> pathToFileAttribs = 
@@ -53,19 +54,21 @@
   private Map <String, FileAttributes> tempFileAttribs = 
     Collections.synchronizedMap(new HashMap());
   
-  public InMemoryFileSystem() {}
-  
-  public InMemoryFileSystem(URI uri, Configuration conf) {
+  public RawInMemoryFileSystem() {
+    setConf(new Configuration());
+  }
+
+  public RawInMemoryFileSystem(URI uri, Configuration conf) {
     initialize(uri, conf);
   }
   
   //inherit javadoc
   public void initialize(URI uri, Configuration conf) {
+    setConf(conf);
     int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
     this.fsSize = size * 1024 * 1024;
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     this.staticWorkingDir = new Path(this.uri.getPath());
-    this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
     LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + 
              " of size (in bytes): " + fsSize);
   }
@@ -98,7 +101,7 @@
     private FileAttributes fAttr;
     
     public InMemoryInputStream(Path f) throws IOException {
-      synchronized (InMemoryFileSystem.this) {
+      synchronized (RawInMemoryFileSystem.this) {
         fAttr = pathToFileAttribs.get(getPath(f));
         if (fAttr == null) { 
           throw new FileNotFoundException("File " + f + " does not exist");
@@ -137,11 +140,11 @@
     public long skip(long n) throws IOException { return din.skip(n); }
   }
 
-  public FSInputStream openRaw(Path f) throws IOException {
-    return new InMemoryInputStream(f);
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return new FSDataInputStream(new InMemoryInputStream(f), bufferSize);
   }
 
-  private class InMemoryOutputStream extends FSOutputStream {
+  private class InMemoryOutputStream extends OutputStream {
     private int count;
     private FileAttributes fAttr;
     private Path f;
@@ -157,7 +160,7 @@
     }
     
     public void close() throws IOException {
-      synchronized (InMemoryFileSystem.this) {
+      synchronized (RawInMemoryFileSystem.this) {
         pathToFileAttribs.put(getPath(f), fAttr);
       }
     }
@@ -187,32 +190,28 @@
     }
   }
   
-  public FSOutputStream createRaw(Path f, boolean overwrite, short replication,
-      long blockSize) throws IOException {
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
+      short replication, long blockSize, Progressable progress)
+      throws IOException {
     if (exists(f) && ! overwrite) {
       throw new IOException("File already exists:"+f);
     }
     synchronized (this) {
-      FileAttributes fAttr =(FileAttributes)tempFileAttribs.remove(getPath(f));
+      FileAttributes fAttr =(FileAttributes) tempFileAttribs.remove(getPath(f));
       if (fAttr != null)
-        return createRaw(f, fAttr);
+        return create(f, fAttr);
       return null;
     }
   }
-
-  public FSOutputStream createRaw(Path f, boolean overwrite, short replication,
-      long blockSize, Progressable progress) throws IOException {
-    //ignore write-progress reporter for in-mem files
-    return createRaw(f, overwrite, replication, blockSize);
-  }
-
-  public FSOutputStream createRaw(Path f, FileAttributes fAttr) 
-  throws IOException {
-    //the path is not added into the filesystem (in the pathToFileAttribs
-    //map) until close is called on the outputstream that this method is 
-    //going to return
-    //Create an output stream out of data byte array
-    return new InMemoryOutputStream(f, fAttr);
+  
+  public FSDataOutputStream create(Path f, FileAttributes fAttr)
+      throws IOException {
+    // the path is not added into the filesystem (in the pathToFileAttribs
+    // map) until close is called on the outputstream that this method is
+    // going to return
+    // Create an output stream out of data byte array
+    return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
+        getConf());
   }
 
   public void close() throws IOException {
@@ -236,12 +235,12 @@
     return 1;
   }
 
-  public boolean setReplicationRaw(Path src, short replication)
+  public boolean setReplication(Path src, short replication)
       throws IOException {
     return true;
   }
 
-  public boolean renameRaw(Path src, Path dst) throws IOException {
+  public boolean rename(Path src, Path dst) throws IOException {
     synchronized (this) {
       if (exists(dst)) {
         throw new IOException ("Path " + dst + " already exists");
@@ -253,7 +252,7 @@
     }
   }
 
-  public boolean deleteRaw(Path f) throws IOException {
+  public boolean delete(Path f) throws IOException {
     synchronized (this) {
       FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
       if (fAttr != null) {
@@ -275,7 +274,11 @@
    * Directory operations are not supported
    */
   public boolean isDirectory(Path f) throws IOException {
-    return false;
+    return !isFile(f);
+  }
+
+  public boolean isFile(Path f) throws IOException {
+    return exists(f);
   }
 
   public long getLength(Path f) throws IOException {
@@ -287,15 +290,20 @@
   /**
    * Directory operations are not supported
    */
-  public Path[] listPathsRaw(Path f) throws IOException {
+  public Path[] listPaths(Path f) throws IOException {
     return null;
   }
-  public void setWorkingDirectory(Path new_dir) {}
+
+  public void setWorkingDirectory(Path new_dir) {
+    staticWorkingDir = new_dir;
+  }
+  
   public Path getWorkingDirectory() {
     return staticWorkingDir;
   }
+
   public boolean mkdirs(Path f) throws IOException {
-    return false;
+    return true;
   }
   
   /** lock operations are not supported */
@@ -303,10 +311,13 @@
   public void release(Path f) throws IOException {}
   
   /** copy/move operations are not supported */
-  public void copyFromLocalFile(Path src, Path dst) throws IOException {}
-  public void moveFromLocalFile(Path src, Path dst) throws IOException {}
-  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
-  throws IOException {}
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+      throws IOException {
+  }
+
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+      throws IOException {
+  }
 
   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
       throws IOException {
@@ -317,11 +328,6 @@
       throws IOException {
   }
 
-  public void reportChecksumFailure(Path p, FSInputStream in,
-      long inPos,
-      FSInputStream sums, long sumsPos) {
-  }
-
   public long getBlockSize(Path f) throws IOException {
     return getDefaultBlockSize();
   }
@@ -336,47 +342,30 @@
   
   /** Some APIs exclusively for InMemoryFileSystem */
   
-  /** Register a path with its size. This will also register a checksum for 
-   * the file that the user is trying to create. This is required since none
-   * of the FileSystem APIs accept the size of the file as argument. But since
-   * it is required for us to apriori know the size of the file we are going to
-   * create, the user must call this method for each file he wants to create
-   * and reserve memory for that file. We either succeed in reserving memory
-   * for both the main file and the checksum file and return true, or return 
-   * false.
-   */
-  public boolean reserveSpaceWithCheckSum(Path f, int size) {
-    //get the size of the checksum file (we know it is going to be 'int'
-    //since this is an inmem fs with file sizes that will fit in 4 bytes)
-    int checksumSize = getChecksumFileLength(size);
+  /** Register a path with its size. */
+  public boolean reserveSpace(Path f, int size) {
     synchronized (this) {
-      if (!canFitInMemory(size + checksumSize)) return false;
+      if (!canFitInMemory(size))
+        return false;
       FileAttributes fileAttr;
-      FileAttributes checksumAttr;
       try {
         fileAttr = new FileAttributes(size);
-        checksumAttr = new FileAttributes(checksumSize);
       } catch (OutOfMemoryError o) {
         return false;
       }
-      totalUsed += size + checksumSize;
+      totalUsed += size;
       tempFileAttribs.put(getPath(f), fileAttr);
-      tempFileAttribs.put(getPath(FileSystem.getChecksumFile(f)),checksumAttr); 
       return true;
     }
   }
   
-  public int getChecksumFileLength(int size) {
-    return (int)super.getChecksumFileLength(size, bytesPerSum);
-  }
-  
   /** This API getClosedFiles could have been implemented over listPathsRaw
    * but it is an overhead to maintain directory structures for this impl of
    * the in-memory fs.
    */
   public Path[] getFiles(PathFilter filter) {
     synchronized (this) {
-      List <String> closedFilesList = new ArrayList();
+      List<String> closedFilesList = new ArrayList<String>();
       synchronized (pathToFileAttribs) {
         Set paths = pathToFileAttribs.keySet();
         if (paths == null || paths.isEmpty()) {
@@ -433,4 +422,70 @@
       this.data = new byte[size];
     }
   }
+  }
+    
+    public InMemoryFileSystem() {
+        super(new RawInMemoryFileSystem());
+    }
+    
+    public InMemoryFileSystem(URI uri, Configuration conf) {
+        super(new RawInMemoryFileSystem(uri, conf));
+    }
+    
+    /** copy/move operations are not supported */
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {}
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {}
+    
+    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+    throws IOException {
+        return fsOutputFile;
+    }
+    
+    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+    throws IOException {
+    }
+    
+    public void reportChecksumFailure(Path p, FSDataInputStream in,
+            long inPos,
+            FSDataInputStream sums, long sumsPos) {
+    }
+    
+    /**
+     * Register a file with its size. This will also register a checksum for the
+     * file that the user is trying to create. This is required since none of
+     * the FileSystem APIs accept the size of the file as argument. But since it
+     * is required for us to apriori know the size of the file we are going to
+     * create, the user must call this method for each file he wants to create
+     * and reserve memory for that file. We either succeed in reserving memory
+     * for both the main file and the checksum file and return true, or return
+     * false.
+     */
+    public boolean reserveSpaceWithCheckSum(Path f, int size) {
+        // get the size of the checksum file (we know it is going to be 'int'
+        // since this is an inmem fs with file sizes that will fit in 4 bytes)
+        long checksumSize = getChecksumFileLength(f, size);
+        RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
+        synchronized(mfs) {
+            return mfs.reserveSpace(f, size) && 
+            mfs.reserveSpace(getChecksumFile(f),
+                    (int)getChecksumFileLength(f, size));
+        }
+    }
+    public Path[] getFiles(PathFilter filter) {
+        return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
+    }
+    
+    public int getNumFiles(PathFilter filter) {
+      return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
+    }
+
+    public int getFSSize() {
+        return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
+    }
+    
+    public float getPercentUsed() {
+        return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
+    }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Tue Feb 27 15:45:46 2007
@@ -19,365 +19,52 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.nio.channels.*;
 import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import java.util.*;
 
 /****************************************************************
- * Implement the FileSystem API for the native filesystem.
+ * Implement the FileSystem API for the checksumed local filesystem.
  *
  * @author Mike Cafarella
  *****************************************************************/
-public class LocalFileSystem extends FileSystem {
+public class LocalFileSystem extends ChecksumFileSystem {
     static final URI NAME = URI.create("file:///");
 
-    private Path workingDir =
-      new Path(System.getProperty("user.dir"));
-    TreeMap sharedLockDataSet = new TreeMap();
-    TreeMap nonsharedLockDataSet = new TreeMap();
-    TreeMap lockObjSet = new TreeMap();
-    // by default use copy/delete instead of rename
-    boolean useCopyForRename = true;
-    
-    public LocalFileSystem() {}
-
-    /**
-     * Return 1x1 'localhost' cell if the file exists.
-     * Return null if otherwise.
-     */
-    public String[][] getFileCacheHints(Path f, long start, long len) throws IOException {
-        if (! exists(f)) {
-            return null;
-        } else {
-            String result[][] = new String[1][];
-            result[0] = new String[1];
-            result[0][0] = "localhost";
-            return result;
-        }
-    }
-
-    /** @deprecated */
-    public String getName() { return "local"; }
-
-    public URI getUri() { return NAME; }
-
-    public void initialize(URI uri, Configuration conf) {
-      setConf(conf);
-    }
-
-    /** Convert a path to a File. */
-    public File pathToFile(Path path) {
-      checkPath(path);
-      if (!path.isAbsolute()) {
-        path = new Path(workingDir, path);
-      }
-      return new File(path.toUri().getPath());
-    }
-
-    /*******************************************************
-     * For open()'s FSInputStream
-     *******************************************************/
-    class LocalFSFileInputStream extends FSInputStream {
-        FileInputStream fis;
-
-        public LocalFSFileInputStream(Path f) throws IOException {
-          this.fis = new FileInputStream(pathToFile(f));
-        }
-
-        public void seek(long pos) throws IOException {
-          fis.getChannel().position(pos);
-        }
-
-        public long getPos() throws IOException {
-          return fis.getChannel().position();
-        }
-
-        public boolean seekToNewSource(long targetPos) throws IOException {
-          return false;
-        }
-
-        /*
-         * Just forward to the fis
-         */
-        public int available() throws IOException { return fis.available(); }
-        public void close() throws IOException { fis.close(); }
-        public boolean markSupport() { return false; }
-
-        public int read() throws IOException {
-          try {
-            return fis.read();
-          } catch (IOException e) {               // unexpected exception
-            throw new FSError(e);                 // assume native fs error
-          }
-        }
-
-        public int read(byte[] b, int off, int len) throws IOException {
-          try {
-            return fis.read(b, off, len);
-          } catch (IOException e) {               // unexpected exception
-            throw new FSError(e);                 // assume native fs error
-          }
-        }
-
-        public int read(long position, byte[] b, int off, int len)
-        throws IOException {
-          ByteBuffer bb = ByteBuffer.wrap(b, off, len);
-          try {
-            return fis.getChannel().read(bb, position);
-          } catch (IOException e) {
-            throw new FSError(e);
-          }
-        }
-        
-        public long skip(long n) throws IOException { return fis.skip(n); }
+    public LocalFileSystem() {
+        super(new RawLocalFileSystem());
     }
     
-    public FSInputStream openRaw(Path f) throws IOException {
-        if (! exists(f)) {
-            throw new FileNotFoundException(f.toString());
-        }
-        return new LocalFSFileInputStream(f);
-    }
-
-    /*********************************************************
-     * For create()'s FSOutputStream.
-     *********************************************************/
-    class LocalFSFileOutputStream extends FSOutputStream {
-      FileOutputStream fos;
-
-      public LocalFSFileOutputStream(Path f) throws IOException {
-        this.fos = new FileOutputStream(pathToFile(f));
-      }
-
-      public long getPos() throws IOException {
-        return fos.getChannel().position();
-      }
-
-      /*
-       * Just forward to the fos
-       */
-      public void close() throws IOException { fos.close(); }
-      public void flush() throws IOException { fos.flush(); }
-
-      public void write(byte[] b, int off, int len) throws IOException {
-        try {
-          fos.write(b, off, len);
-        } catch (IOException e) {               // unexpected exception
-          throw new FSError(e);                 // assume native fs error
-        }
-      }
-      public void write(int b) throws IOException {
-        try {
-          fos.write(b);
-        } catch (IOException e) {               // unexpected exception
-          throw new FSError(e);                 // assume native fs error
-        }
-      }
-    }
-
-    public FSOutputStream createRaw(Path f, boolean overwrite, 
-                                    short replication, long blockSize)
-      throws IOException {
-        if (exists(f) && ! overwrite) {
-            throw new IOException("File already exists:"+f);
-        }
-        Path parent = f.getParent();
-        if (parent != null) {
-          if (!mkdirs(parent)) {
-            throw new IOException("Mkdirs failed to create " + parent.toString());
-          }
-        }
-        return new LocalFSFileOutputStream(f);
-    }
-
-    public FSOutputStream createRaw(Path f, boolean overwrite, 
-                                    short replication, long blockSize,
-                                    Progressable progress)
-      throws IOException {
-        // ignore write-progress reporter for local files
-        return createRaw(f, overwrite, replication, blockSize);
-    }
-    /**
-     * Replication is not supported for the local file system.
-     */
-    public short getReplication(Path f) throws IOException {
-      return 1;
+    public LocalFileSystem( FileSystem rawLocalFileSystem ) {
+        super(rawLocalFileSystem);
     }
     
-    public boolean setReplicationRaw( Path src, 
-                                      short replication
-                                    ) throws IOException {
-      return true;
-    }
-
-    public boolean renameRaw(Path src, Path dst) throws IOException {
-        if (useCopyForRename) {
-          return FileUtil.copy(this, src, this, dst, true, getConf());
-        } else return pathToFile(src).renameTo(pathToFile(dst));
-    }
-
-    public boolean deleteRaw(Path p) throws IOException {
-        File f = pathToFile(p);
-        if (f.isFile()) {
-            return f.delete();
-        } else return FileUtil.fullyDelete(f);
-    }
-
-    public boolean exists(Path f) throws IOException {
-        return pathToFile(f).exists();
+    /** Convert a path to a File. */
+    public File pathToFile(Path path) {
+      return ((RawLocalFileSystem)fs).pathToFile(path);
     }
 
-    public boolean isDirectory(Path f) throws IOException {
-        return pathToFile(f).isDirectory();
+    @Override
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
+      FileUtil.copy(this, src, this, dst, delSrc, getConf());
     }
 
-    public long getLength(Path f) throws IOException {
-        return pathToFile(f).length();
+    @Override
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
+      FileUtil.copy(this, src, this, dst, delSrc, getConf());
     }
 
-    public Path[] listPathsRaw(Path f) throws IOException {
-        File localf = pathToFile(f);
-        Path[] results;
-
-        if(!localf.exists())
-          return null;
-        else if(localf.isFile()) {
-          results = new Path[1];
-          results[0] = f;
-          return results;
-        } else { //directory
-          String[] names = localf.list();
-          if (names == null) {
-            return null;
-          }
-          results = new Path[names.length];
-          for (int i = 0; i < names.length; i++) {
-            results[i] = new Path(f, names[i]);
-          }
-          return results;
-        }
-    }
-    
-    /**
-     * Creates the specified directory hierarchy. Does not
-     * treat existence as an error.
-     */
-    public boolean mkdirs(Path f) throws IOException {
-      Path parent = f.getParent();
-      File p2f = pathToFile(f);
-      return (parent == null || mkdirs(parent)) &&
-             (p2f.mkdir() || p2f.isDirectory());
-    }
-    
     /**
-     * Set the working directory to the given directory.
+     * Moves files to a bad file directory on the same device, so that their
+     * storage will not be reused.
      */
-    public void setWorkingDirectory(Path newDir) {
-      workingDir = newDir;
-    }
-    
-    public Path getWorkingDirectory() {
-      return workingDir;
-    }
-    
-    /** @deprecated */ @Deprecated
-    public void lock(Path p, boolean shared) throws IOException {
-      File f = pathToFile(p);
-      f.createNewFile();
-
-      if (shared) {
-        FileInputStream lockData = new FileInputStream(f);
-        FileLock lockObj =
-          lockData.getChannel().lock(0L, Long.MAX_VALUE, shared);
-        synchronized (this) {
-          sharedLockDataSet.put(f, lockData);
-          lockObjSet.put(f, lockObj);
-        }
-      } else {
-        FileOutputStream lockData = new FileOutputStream(f);
-        FileLock lockObj = lockData.getChannel().lock(0L, Long.MAX_VALUE, shared);
-        synchronized (this) {
-          nonsharedLockDataSet.put(f, lockData);
-          lockObjSet.put(f, lockObj);
-        }
-      }
-    }
-
-    /** @deprecated */ @Deprecated
-    public void release(Path p) throws IOException {
-      File f = pathToFile(p);
-
-      FileLock lockObj;
-      FileInputStream sharedLockData;
-      FileOutputStream nonsharedLockData;
-      synchronized (this) {
-        lockObj = (FileLock) lockObjSet.remove(f);
-        sharedLockData = (FileInputStream) sharedLockDataSet.remove(f);
-        nonsharedLockData = (FileOutputStream) nonsharedLockDataSet.remove(f);
-      }
- 
-      if (lockObj == null) {
-        throw new IOException("Given target not held as lock");
-      }
-      if (sharedLockData == null && nonsharedLockData == null) {
-        throw new IOException("Given target not held as lock");
-      }
-
-      lockObj.release();
-
-      if (sharedLockData != null) {
-        sharedLockData.close();
-      } else {
-        nonsharedLockData.close();
-      }
-    }
-
-    // In the case of the local filesystem, we can just rename the file.
-    public void moveFromLocalFile(Path src, Path dst) throws IOException {
-      rename(src, dst);
-    }
-
-    // Similar to moveFromLocalFile(), except the source is kept intact.
-    public void copyFromLocalFile(Path src, Path dst) throws IOException {
-      FileUtil.copy(this, src, this, dst, false, getConf());
-    }
-
-    // We can't delete the src file in this case.  Too bad.
-    public void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException {
-      FileUtil.copy(this, src, this, dst, false, copyCrc, getConf());
-    }
-
-    // We can write output directly to the final location
-    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-      throws IOException {
-      return fsOutputFile;
-    }
-
-    // It's in the right place - nothing to do.
-    public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
-      throws IOException {
-    }
-
-    public void close() throws IOException {
-        super.close();
-    }
-
-    public String toString() {
-        return "LocalFS";
-    }
-    
-
-    /** Moves files to a bad file directory on the same device, so that their
-     * storage will not be reused. */
-    public void reportChecksumFailure(Path p, FSInputStream in,
+    public void reportChecksumFailure(Path p, FSDataInputStream in,
                                       long inPos,
-                                      FSInputStream sums, long sumsPos) {
+                                      FSDataInputStream sums, long sumsPos) {
       try {
-        // canonicalize f   
-        File f = pathToFile(p).getCanonicalFile();
+        // canonicalize f
+        File f = ((RawLocalFileSystem)fs).pathToFile(p).getCanonicalFile();
       
         // find highest writable parent dir of f on the same device
         String device = new DF(f, getConf()).getMount();
@@ -402,27 +89,11 @@
         f.renameTo(badFile);                      // rename it
 
         // move checksum file too
-        File checkFile = pathToFile(getChecksumFile(p));
+        File checkFile = ((RawLocalFileSystem)fs).pathToFile(getChecksumFile(p));
         checkFile.renameTo(new File(badDir, checkFile.getName()+suffix));
 
       } catch (IOException e) {
         LOG.warn("Error moving bad file " + p + ": " + e);
       }
     }
-
-    public long getDefaultBlockSize() {
-      // default to 32MB: large enough to minimize the impact of seeks
-      return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
-    }
-
-    public long getBlockSize(Path filename) {
-      // local doesn't really do blocks, so just use the global number
-      return getDefaultBlockSize();
-    }
-    
-    public short getDefaultReplication() {
-      return 1;
-    }
-
-
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java?view=auto&rev=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java Tue Feb 27 15:45:46 2007
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/****************************************************************
+ * Implement the FileSystem API for the raw local filesystem.
+ *
+ * @author Mike Cafarella
+ *****************************************************************/
+public class RawLocalFileSystem extends FileSystem {
+  static final URI NAME = URI.create("file:///");
+  private Path workingDir =
+    new Path(System.getProperty("user.dir"));
+  TreeMap sharedLockDataSet = new TreeMap();
+  TreeMap nonsharedLockDataSet = new TreeMap();
+  TreeMap lockObjSet = new TreeMap();
+  // by default use copy/delete instead of rename
+  boolean useCopyForRename = true;
+  
+  public RawLocalFileSystem() {}
+  
+  /** Convert a path to a File. */
+  public File pathToFile(Path path) {
+    checkPath(path);
+    if (!path.isAbsolute()) {
+      path = new Path(getWorkingDirectory(), path);
+    }
+    return new File(path.toUri().getPath());
+  }
+
+  /**
+   * Return 1x1 'localhost' cell if the file exists.
+   * Return null if otherwise.
+   */
+  public String[][] getFileCacheHints(Path f, long start, long len) throws IOException {
+    if (! exists(f)) {
+      return null;
+    } else {
+      String result[][] = new String[1][];
+      result[0] = new String[1];
+      result[0][0] = "localhost";
+      return result;
+    }
+  }
+  
+  /** @deprecated */
+  public String getName() { return "local"; }
+  
+  public URI getUri() { return NAME; }
+  
+  public void initialize(URI uri, Configuration conf) {
+    setConf(conf);
+  }
+  
+  /*******************************************************
+   * For open()'s FSInputStream
+   *******************************************************/
+  class LocalFSFileInputStream extends FSInputStream {
+    FileInputStream fis;
+    
+    public LocalFSFileInputStream(Path f) throws IOException {
+      this.fis = new FileInputStream(pathToFile(f));
+    }
+    
+    public void seek(long pos) throws IOException {
+      fis.getChannel().position(pos);
+    }
+    
+    public long getPos() throws IOException {
+      return fis.getChannel().position();
+    }
+    
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+    
+    /*
+     * Just forward to the fis
+     */
+    public int available() throws IOException { return fis.available(); }
+    public void close() throws IOException { fis.close(); }
+    public boolean markSupport() { return false; }
+    
+    public int read() throws IOException {
+      try {
+        return fis.read();
+      } catch (IOException e) {                 // unexpected exception
+        throw new FSError(e);                   // assume native fs error
+      }
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException {
+      try {
+        return fis.read(b, off, len);
+      } catch (IOException e) {                 // unexpected exception
+        throw new FSError(e);                   // assume native fs error
+      }
+    }
+    
+    public int read(long position, byte[] b, int off, int len)
+    throws IOException {
+      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
+      try {
+        return fis.getChannel().read(bb, position);
+      } catch (IOException e) {
+        throw new FSError(e);
+      }
+    }
+    
+    public long skip(long n) throws IOException { return fis.skip(n); }
+  }
+  
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    if (!exists(f)) {
+      throw new FileNotFoundException(f.toString());
+    }
+    return new FSDataInputStream(new LocalFSFileInputStream(f), bufferSize);
+  }
+  
+  /*********************************************************
+   * For create()'s FSOutputStream.
+   *********************************************************/
+  class LocalFSFileOutputStream extends OutputStream {
+    FileOutputStream fos;
+    
+    public LocalFSFileOutputStream(Path f) throws IOException {
+      this.fos = new FileOutputStream(pathToFile(f));
+    }
+    
+    public long getPos() throws IOException {
+      return fos.getChannel().position();
+    }
+    
+    /*
+     * Just forward to the fos
+     */
+    public void close() throws IOException { fos.close(); }
+    public void flush() throws IOException { fos.flush(); }
+    public void write(byte[] b, int off, int len) throws IOException {
+      try {
+        fos.write(b, off, len);
+      } catch (IOException e) {                // unexpected exception
+        throw new FSError(e);                  // assume native fs error
+      }
+    }
+    
+    public void write(int b) throws IOException {
+      try {
+        fos.write(b);
+      } catch (IOException e) {              // unexpected exception
+        throw new FSError(e);                // assume native fs error
+      }
+    }
+  }
+  
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
+      short replication, long blockSize, Progressable progress)
+  throws IOException {
+    if (exists(f) && ! overwrite) {
+      throw new IOException("File already exists:"+f);
+    }
+    Path parent = f.getParent();
+    if (parent != null && !mkdirs(parent)) {
+      throw new IOException("Mkdirs failed to create " + parent.toString());
+    }
+    return new FSDataOutputStream(new LocalFSFileOutputStream(f), getConf());
+  }
+  
+  /**
+   * Replication is not supported for the local file system.
+   */
+  public short getReplication(Path f) throws IOException {
+    return 1;
+  }
+  
+  /** Set the replication of the given file */
+  public boolean setReplication( Path src,
+      short replication
+  ) throws IOException {
+    return true;
+  }
+  
+  public boolean rename(Path src, Path dst) throws IOException {
+    if (useCopyForRename) {
+      return FileUtil.copy(this, src, this, dst, true, getConf());
+    } else return pathToFile(src).renameTo(pathToFile(dst));
+  }
+  
+  public boolean delete(Path p) throws IOException {
+    File f = pathToFile(p);
+    if (f.isFile()) {
+      return f.delete();
+    } else return FileUtil.fullyDelete(f);
+  }
+  
+  public boolean exists(Path f) throws IOException {
+    return pathToFile(f).exists();
+  }
+  
+  public boolean isDirectory(Path f) throws IOException {
+    return pathToFile(f).isDirectory();
+  }
+  
+  public long getLength(Path f) throws IOException {
+    return pathToFile(f).length();
+  }
+  
+  public Path[] listPaths(Path f) throws IOException {
+    File localf = pathToFile(f);
+    Path[] results;
+    
+    if(!localf.exists())
+      return null;
+    else if(localf.isFile()) {
+      results = new Path[1];
+      results[0] = f;
+      return results;
+    } else { // directory
+      String[] names = localf.list();
+      if (names == null) {
+        return null;
+      }
+      results = new Path[names.length];
+      for (int i = 0; i < names.length; i++) {
+        results[i] = new Path(f, names[i]);
+      }
+      return results;
+    }
+  }
+  
+  /**
+   * Creates the specified directory hierarchy. Does not
+   * treat existence as an error.
+   */
+  public boolean mkdirs(Path f) throws IOException {
+    Path parent = f.getParent();
+    File p2f = pathToFile(f);
+    return (parent == null || mkdirs(parent)) &&
+    (p2f.mkdir() || p2f.isDirectory());
+  }
+  
+  /**
+   * Set the working directory to the given directory.
+   */
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    workingDir = newDir;
+  }
+  
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+  
+  /** @deprecated */ @Deprecated
+  public void lock(Path p, boolean shared) throws IOException {
+    File f = pathToFile(p);
+    f.createNewFile();
+    
+    if (shared) {
+      FileInputStream lockData = new FileInputStream(f);
+      FileLock lockObj =
+        lockData.getChannel().lock(0L, Long.MAX_VALUE, shared);
+      synchronized (this) {
+        sharedLockDataSet.put(f, lockData);
+        lockObjSet.put(f, lockObj);
+      }
+    } else {
+      FileOutputStream lockData = new FileOutputStream(f);
+      FileLock lockObj = lockData.getChannel().lock(0L, Long.MAX_VALUE, shared);
+      synchronized (this) {
+        nonsharedLockDataSet.put(f, lockData);
+        lockObjSet.put(f, lockObj);
+      }
+    }
+  }
+  
+  /** @deprecated */ @Deprecated
+  public void release(Path p) throws IOException {
+    File f = pathToFile(p);
+    
+    FileLock lockObj;
+    FileInputStream sharedLockData;
+    FileOutputStream nonsharedLockData;
+    synchronized (this) {
+      lockObj = (FileLock) lockObjSet.remove(f);
+      sharedLockData = (FileInputStream) sharedLockDataSet.remove(f);
+      nonsharedLockData = (FileOutputStream) nonsharedLockDataSet.remove(f);
+    }
+    
+    if (lockObj == null) {
+      throw new IOException("Given target not held as lock");
+    }
+    if (sharedLockData == null && nonsharedLockData == null) {
+      throw new IOException("Given target not held as lock");
+    }
+    
+    lockObj.release();
+    
+    if (sharedLockData != null) {
+      sharedLockData.close();
+    } else {
+      nonsharedLockData.close();
+    }
+  }
+  
+  // In the case of the local filesystem, we can just rename the file.
+  public void moveFromLocalFile(Path src, Path dst) throws IOException {
+    rename(src, dst);
+  }
+  
+  @Override
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+  throws IOException {
+    FileUtil.copy(this, src, this, dst, delSrc, getConf());
+  }
+  
+  @Override
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+  throws IOException {
+    FileUtil.copy(this, src, this, dst, delSrc, getConf());
+  }
+  
+  // We can write output directly to the final location
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  throws IOException {
+    return fsOutputFile;
+  }
+  
+  // It's in the right place - nothing to do.
+  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
+  throws IOException {
+  }
+  
+  public void close() throws IOException {
+    super.close();
+  }
+  
+  public String toString() {
+    return "LocalFS";
+  }
+  
+  public long getBlockSize(Path filename) {
+    // local doesn't really do blocks, so just use the global number
+    return getDefaultBlockSize();
+  }
+  
+  public short getDefaultReplication() {
+    return 1;
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Tue Feb 27 15:45:46 2007
@@ -8,8 +8,8 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -152,7 +152,7 @@
   }
 
   @Override
-  public Path[] listPathsRaw(Path path) throws IOException {
+  public Path[] listPaths(Path path) throws IOException {
     Path absolutePath = makeAbsolute(path);
     INode inode = store.retrieveINode(absolutePath);
     if (inode == null) {
@@ -166,21 +166,14 @@
   }
 
   @Override
-  public FSOutputStream createRaw(Path file, boolean overwrite,
-      short replication, long blockSize) throws IOException {
-
-    return createRaw(file, overwrite, replication, blockSize, null);
-  }
-
-  @Override
-  public FSOutputStream createRaw(Path file, boolean overwrite,
+  public FSDataOutputStream create(Path file, boolean overwrite, int bufferSize,
       short replication, long blockSize, Progressable progress)
       throws IOException {
 
     INode inode = store.retrieveINode(makeAbsolute(file));
     if (inode != null) {
       if (overwrite) {
-        deleteRaw(file);
+        delete(file);
       } else {
         throw new IOException("File already exists: " + file);
       }
@@ -192,18 +185,20 @@
         }
       }      
     }
-    return new S3OutputStream(getConf(), store, makeAbsolute(file),
-        blockSize, progress);
+    return new FSDataOutputStream( 
+            new S3OutputStream(getConf(), store, makeAbsolute(file),
+                blockSize, progress), bufferSize );
   }
 
   @Override
-  public FSInputStream openRaw(Path path) throws IOException {
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
     INode inode = checkFile(path);
-    return new S3InputStream(getConf(), store, inode);
+    return new FSDataInputStream( new S3InputStream(getConf(), store, inode),
+            bufferSize);
   }
 
   @Override
-  public boolean renameRaw(Path src, Path dst) throws IOException {
+  public boolean rename(Path src, Path dst) throws IOException {
     Path absoluteSrc = makeAbsolute(src);
     INode srcINode = store.retrieveINode(absoluteSrc);
     if (srcINode == null) {
@@ -228,10 +223,10 @@
         return false;
       }
     }
-    return renameRawRecursive(absoluteSrc, absoluteDst);
+    return renameRecursive(absoluteSrc, absoluteDst);
   }
   
-  private boolean renameRawRecursive(Path src, Path dst) throws IOException {
+  private boolean renameRecursive(Path src, Path dst) throws IOException {
     INode srcINode = store.retrieveINode(src);
     store.storeINode(dst, srcINode);
     store.deleteINode(src);
@@ -250,7 +245,7 @@
   }
 
   @Override
-  public boolean deleteRaw(Path path) throws IOException {
+  public boolean delete(Path path) throws IOException {
     Path absolutePath = makeAbsolute(path);
     INode inode = store.retrieveINode(absolutePath);
     if (inode == null) {
@@ -262,12 +257,12 @@
         store.deleteBlock(block);
       }
     } else {
-      Path[] contents = listPathsRaw(absolutePath);
+      Path[] contents = listPaths(absolutePath);
       if (contents == null) {
         return false;
       }
       for (Path p : contents) {
-        if (! deleteRaw(p)) {
+        if (! delete(p)) {
           return false;
         }
       }
@@ -305,7 +300,7 @@
    * us.
    */
   @Override
-  public boolean setReplicationRaw(Path path, short replication)
+  public boolean setReplication(Path path, short replication)
       throws IOException {
     return true;
   }
@@ -354,25 +349,13 @@
   }
 
   @Override
-  public void reportChecksumFailure(Path f, 
-                                    FSInputStream in, long inPos, 
-                                    FSInputStream sums, long sumsPos) {
-    // TODO: What to do here?
-  }
-
-  @Override
-  public void moveFromLocalFile(Path src, Path dst) throws IOException {
-    FileUtil.copy(localFs, src, this, dst, true, getConf());
-  }
-
-  @Override
-  public void copyFromLocalFile(Path src, Path dst) throws IOException {
-    FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+    FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
   }
 
   @Override
-  public void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException {
-    FileUtil.copy(this, src, localFs, dst, false, copyCrc, getConf());
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+    FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
   }
 
   @Override

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java Tue Feb 27 15:45:46 2007
@@ -9,12 +9,11 @@
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3.INode.FileType;
 import org.apache.hadoop.util.Progressable;
 
-class S3OutputStream extends FSOutputStream {
+class S3OutputStream extends OutputStream {
 
   private Configuration conf;
   
@@ -70,7 +69,6 @@
     return result;
   }
 
-  @Override
   public long getPos() throws IOException {
     return filePos;
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Tue Feb 27 15:45:46 2007
@@ -206,7 +206,8 @@
       
       int length = connection.getContentLength();
       int inMemFSSize = inMemFileSys.getFSSize();
-      int checksumLength = inMemFileSys.getChecksumFileLength(length);
+      int checksumLength = (int)inMemFileSys.getChecksumFileLength(
+              localFilename, length);
         
       boolean createInMem = false; 
       if (inMemFSSize > 0)  

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Feb 27 15:45:46 2007
@@ -1,17 +1,16 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Progressable;
 
@@ -27,13 +26,9 @@
  * better to commit(Path) individual files when done. Otherwise
  * commit() can be used to commit all open files at once. 
  */
-public class PhasedFileSystem extends FileSystem {
-
-  private FileSystem baseFS ;
-  private URI uri;
-
+public class PhasedFileSystem extends FilterFileSystem {
   // Map from final file name to temporary file name
-  private Map<Path, FileInfo> finalNameToFileInfo = new HashMap(); 
+  private Map<Path, FileInfo> finalNameToFileInfo = new HashMap<Path, FileInfo>(); 
   
   private String jobid ; 
   private String tipid ; 
@@ -50,12 +45,12 @@
    */
   public PhasedFileSystem(FileSystem fs, String jobid, 
       String tipid, String taskid) {
-    this.baseFS = fs ; 
+    super(fs); 
     this.jobid = jobid; 
     this.tipid = tipid ; 
     this.taskid = taskid ; 
     
-    tempDir = new Path(baseFS.getConf().get("mapred.system.dir") ); 
+    tempDir = new Path(fs.getConf().get("mapred.system.dir") ); 
     this.setConf(fs.getConf());
   }
   /**
@@ -65,21 +60,14 @@
    * @param conf JobConf
    */
   public PhasedFileSystem(FileSystem fs, JobConf conf) {
-    this.baseFS = fs ; 
+    super(fs); 
     this.jobid = conf.get("mapred.job.id"); 
     this.tipid = conf.get("mapred.tip.id"); 
     this.taskid = conf.get("mapred.task.id") ; 
     
-    tempDir = new Path(baseFS.getConf().get("mapred.system.dir") );
+    tempDir = new Path(fs.getConf().get("mapred.system.dir") );
     this.setConf(fs.getConf());
   }
-  /**
-   * This Constructor should not be used in this or any derived class. 
-   * @param conf
-   */
-  protected PhasedFileSystem(Configuration conf){
-    throw new UnsupportedOperationException("Operation not supported"); 
-  }
   
   private Path setupFile(Path finalFile, boolean overwrite) throws IOException{
     if( finalNameToFileInfo.containsKey(finalFile) ){
@@ -94,8 +82,8 @@
         }catch(IOException ioe){
           // ignore if already closed
         }
-        if( baseFS.exists(fInfo.getTempPath())){
-          baseFS.delete( fInfo.getTempPath() );
+        if( fs.exists(fInfo.getTempPath())){
+          fs.delete( fInfo.getTempPath() );
         }
         finalNameToFileInfo.remove(finalFile); 
       }
@@ -111,45 +99,19 @@
     return tempPath ; 
   }
   
-  public URI getUri() {
-    return baseFS.getUri();
-  }
-
-  public void initialize(URI uri, Configuration conf) throws IOException {
-    baseFS.initialize(uri, conf);
-  }
-
-  @Override
-  public FSOutputStream createRaw(
-      Path f, boolean overwrite, short replication, long blockSize)
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
+          short replication, long blockSize,Progressable progress)
       throws IOException {
-    
-    // for reduce output its checked in job client but lets check it anyways
-    // as tasks with side effect may write to locations not set in jobconf
-    // as output path. 
-    if( baseFS.exists(f) && !overwrite ){
-      throw new IOException("Error creating file - already exists : " + f); 
-    }
-    FSOutputStream stream = 
-      baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, blockSize); 
-    finalNameToFileInfo.get(f).setOpenFileStream(stream); 
-    return stream; 
-  }
-
-  @Override
-  public FSOutputStream createRaw(
-      Path f, boolean overwrite, short replication, long blockSize,
-      Progressable progress)
-      throws IOException {
-    if( baseFS.exists(f) && !overwrite ){
+    if( fs.exists(f) && !overwrite ){
       throw new IOException("Error creating file - already exists : " + f); 
     }
-    FSOutputStream stream = 
-      baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, 
+    FSDataOutputStream stream = 
+      fs.create(setupFile(f, overwrite), overwrite, bufferSize, replication, 
           blockSize, progress);
     finalNameToFileInfo.get(f).setOpenFileStream(stream); 
     return stream ; 
   }
+  
   /**
    * Commits a single file file to its final locations as passed in create* methods. 
    * If a file already exists in final location then temporary file is deleted. 
@@ -177,29 +139,29 @@
     Path tempPath = fInfo.getTempPath(); 
     // ignore .crc files 
     if(! tempPath.toString().endsWith(".crc")){
-      if( !baseFS.exists(fPath) || fInfo.isOverwrite()){
-        if(! baseFS.exists(fPath.getParent())){
-          baseFS.mkdirs(fPath.getParent());
+      if( !fs.exists(fPath) || fInfo.isOverwrite()){
+        if(!fs.exists(fPath.getParent())){
+          fs.mkdirs(fPath.getParent());
         }
         
-        if( baseFS.exists(fPath) && fInfo.isOverwrite()){
-          baseFS.delete(fPath); 
+        if( fs.exists(fPath) && fInfo.isOverwrite()){
+          fs.delete(fPath); 
         }
         
         try {
-          if( ! baseFS.rename(fInfo.getTempPath(), fPath) ){
+          if( !fs.rename(fInfo.getTempPath(), fPath) ){
             // delete the temp file if rename failed
-            baseFS.delete(fInfo.getTempPath());
+            fs.delete(fInfo.getTempPath());
           }
         }catch(IOException ioe){
           // rename failed, log error and delete temp files
           LOG.error("PhasedFileSystem failed to commit file : " + fPath 
               + " error : " + ioe.getMessage()); 
-          baseFS.delete(fInfo.getTempPath());
+          fs.delete(fInfo.getTempPath());
         }
       }else{
         // delete temp file
-        baseFS.delete(fInfo.getTempPath());
+        fs.delete(fInfo.getTempPath());
       }
       // done with the file
       if( removeFromMap ){
@@ -241,7 +203,7 @@
       }catch(IOException ioe){
         // ignore if already closed
       }
-      baseFS.delete(fInfo.getTempPath()); 
+      fs.delete(fInfo.getTempPath()); 
       if( removeFromMap ){
         finalNameToFileInfo.remove(p);
       }
@@ -261,24 +223,9 @@
     // safe to clean now
     finalNameToFileInfo.clear();
   }
-  /**
-   * Closes base file system. 
-   */
-  public void close() throws IOException { 
-    baseFS.close(); 
-  } 
   
   @Override
-  public short getReplication(
-      Path src)
-      throws IOException {
-    // keep replication same for temp file as for 
-    // final file. 
-    return baseFS.getReplication(src);
-  }
-
-  @Override
-  public boolean setReplicationRaw(
+  public boolean setReplication(
       Path src, short replication)
       throws IOException {
     // throw IOException for interface compatibility with 
@@ -287,59 +234,19 @@
   }
 
   @Override
-  public boolean renameRaw(
+  public boolean rename(
       Path src, Path dst)
       throws IOException {
     throw new UnsupportedOperationException("Operation not supported");  
   }
 
   @Override
-  public boolean deleteRaw(
+  public boolean delete(
       Path f)
       throws IOException {
     throw new UnsupportedOperationException("Operation not supported");  
   }
 
-  @Override
-  public boolean exists(Path f)
-      throws IOException {
-    return baseFS.exists(f);
-  }
-
-  @Override
-  public boolean isDirectory(Path f)
-      throws IOException {
-    return baseFS.isDirectory(f);  
-  }
-
-  @Override
-  public long getLength(Path f)
-      throws IOException {
-    return baseFS.getLength(f); 
-  }
-
-  @Override
-  public Path[] listPathsRaw(Path f)
-      throws IOException {
-    return baseFS.listPathsRaw(f);
-  }
-
-  @Override
-  public void setWorkingDirectory(Path new_dir) {
-    baseFS.setWorkingDirectory(new_dir);   
-  }
-
-  @Override
-  public Path getWorkingDirectory() {
-    return baseFS.getWorkingDirectory();  
-  }
-
-  @Override
-  public boolean mkdirs(Path f)
-      throws IOException {
-    return baseFS.mkdirs(f) ;
-  }
-
   /** @deprecated */ @Deprecated
   @Override
   public void lock(
@@ -358,21 +265,14 @@
 
   @Override
   public void copyFromLocalFile(
-      Path src, Path dst)
-      throws IOException {
-    throw new UnsupportedOperationException("Operation not supported");  
-  }
-
-  @Override
-  public void moveFromLocalFile(
-      Path src, Path dst)
+      boolean delSrc, Path src, Path dst)
       throws IOException {
     throw new UnsupportedOperationException("Operation not supported");  
   }
 
   @Override
   public void copyToLocalFile(
-      Path src, Path dst, boolean copyCrc)
+      boolean delSrc, Path src, Path dst)
       throws IOException {
     throw new UnsupportedOperationException("Operation not supported");  
   }
@@ -392,31 +292,6 @@
  }
 
   @Override
-
-  public void reportChecksumFailure(Path f, 
-                                    FSInputStream in, long inPos, 
-                                    FSInputStream sums, long sumsPos) {
-    baseFS.reportChecksumFailure(f, in, inPos, sums, sumsPos); 
-  }
-
-  @Override
-  public long getBlockSize(
-      Path f)
-      throws IOException {
-    return baseFS.getBlockSize(f);
-  }
-
-  @Override
-  public long getDefaultBlockSize() {
-    return baseFS.getDefaultBlockSize();
-  }
-
-  @Override
-  public short getDefaultReplication() {
-    return baseFS.getDefaultReplication();
-  }
-
-  @Override
   public String[][] getFileCacheHints(
       Path f, long start, long len)
       throws IOException {
@@ -428,16 +303,10 @@
     throw new UnsupportedOperationException("Operation not supported");  
   }
 
-  @Override
-  public FSInputStream openRaw(Path f)
-      throws IOException {
-    return baseFS.openRaw(f);   
-  }
-  
   private class FileInfo {
     private Path tempPath ;
     private Path finalPath ; 
-    private FSOutputStream openFileStream ; 
+    private OutputStream openFileStream ; 
     private boolean overwrite ;
     
     FileInfo(Path tempPath, Path finalPath, boolean overwrite){
@@ -445,11 +314,11 @@
       this.finalPath = finalPath ; 
       this.overwrite = overwrite; 
     }
-    public FSOutputStream getOpenFileStream() {
+    public OutputStream getOpenFileStream() {
       return openFileStream;
     }
     public void setOpenFileStream(
-        FSOutputStream openFileStream) {
+        OutputStream openFileStream) {
       this.openFileStream = openFileStream;
     }
     public Path getFinalPath() {
@@ -473,7 +342,5 @@
         Path tempPath) {
       this.tempPath = tempPath;
     }
-    
   }
-
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Feb 27 15:45:46 2007
@@ -554,11 +554,9 @@
         BufferedInputStream is = 
           new BufferedInputStream(connection.getInputStream());
         
-        FSDataOutputStream os = 
-          new FSDataOutputStream(destFileSys, destinationPath, true, 
-              jobConf,	bufferSize, (short)jobConf.getInt("dfs.replication", 3), 
-              jobConf.getLong("dfs.block.size", 67108864)
-          );
+        FSDataOutputStream os = destFileSys.create(destinationPath, true, 
+              bufferSize, (short)jobConf.getInt("dfs.replication", 3), 
+              jobConf.getLong("dfs.block.size", 67108864));
         
         int readBytes = 0;
         while((readBytes = is.read(buffer, 0, bufferSize)) != -1) {
@@ -624,7 +622,7 @@
           HDFS.equalsIgnoreCase(srcListURIScheme)) {
       FileSystem fs = FileSystem.get(srcListURI, conf);
       fis = new BufferedReader(
-          new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf))
+          new InputStreamReader(fs.open(new Path(srcListURIPath)))
           );
     } else if("http".equalsIgnoreCase(srcListURIScheme)) {
       //Copy the file 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java Tue Feb 27 15:45:46 2007
@@ -23,13 +23,13 @@
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.File;
 import java.io.FilenameFilter;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.ListIterator;
@@ -259,7 +259,7 @@
         for (int iFileNumber = 0; iFileNumber < numFiles; iFileNumber++) {
           testFileName = new UTF8("/f" + iFileNumber);
           testfilesList.add(testFileName);
-          FSOutputStream nos = dfsClient.create(testFileName, false);
+          OutputStream nos = dfsClient.create(testFileName, false);
           try {
             for (long nBytesWritten = 0L;
                  nBytesWritten < nBytes;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java Tue Feb 27 15:45:46 2007
@@ -23,7 +23,6 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +33,7 @@
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 
@@ -223,7 +223,7 @@
     //
     byte[] buffer = new byte[BUFFER_SIZE];
     UTF8 testFileName = new UTF8(filename); // hardcode filename
-    FSOutputStream nos;
+    OutputStream nos;
 	nos = dfsClient.create(testFileName, false);
     try {
       for (long nBytesWritten = 0L;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java Tue Feb 27 15:45:46 2007
@@ -22,8 +22,8 @@
 import java.util.Date;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
@@ -78,13 +78,13 @@
      */
     static int createWrite() {
       int exceptions = 0;
-      FSOutputStream out = null;
+      FSDataOutputStream out = null;
       boolean success = false;
       for (int index = 0; index < numFiles; index++) {
         do { // create file until is succeeds
           try {
-              out = fileSys.createRaw(
-              new Path(taskDir, "" + index), false, (short)1, bytesPerBlock);
+              out = fileSys.create(
+              new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock);
             success = true;
           } catch (IOException ioe) { success=false; exceptions++; }
         } while (!success);
@@ -115,10 +115,10 @@
      */
     static int openRead() {
       int exceptions = 0;
-      FSInputStream in = null;
+      FSDataInputStream in = null;
       for (int index = 0; index < numFiles; index++) {
         try {
-          in = fileSys.openRaw(new Path(taskDir, "" + index));
+          in = fileSys.open(new Path(taskDir, "" + index), 512);
           long toBeRead = bytesPerFile;
           while (toBeRead > 0) {
             int nbytes = (int) Math.min(buffer.length, toBeRead);
@@ -149,7 +149,7 @@
       for (int index = 0; index < numFiles; index++) {
         do { // rename file until is succeeds
           try {
-            boolean result = fileSys.renameRaw(
+            boolean result = fileSys.rename(
               new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
             success = true;
           } catch (IOException ioe) { success=false; exceptions++; }
@@ -170,7 +170,7 @@
       for (int index = 0; index < numFiles; index++) {
         do { // delete file until is succeeds
           try {
-            boolean result = fileSys.deleteRaw(new Path(taskDir, "A" + index));
+            boolean result = fileSys.delete(new Path(taskDir, "A" + index));
             success = true;
           } catch (IOException ioe) { success=false; exceptions++; }
         } while (!success);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java Tue Feb 27 15:45:46 2007
@@ -20,10 +20,8 @@
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.Random;
-import java.net.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -39,8 +37,9 @@
 
   private void writeFile(FileSystem fileSys, Path name, int repl)
   throws IOException {
-    FSOutputStream stm = fileSys.createRaw(name, true, (short)repl,
-        (long)blockSize);
+    FSDataOutputStream stm = fileSys.create(name, true,
+            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+            (short)repl, (long)blockSize);
     byte[] buffer = new byte[fileSize];
     Random rand = new Random(seed);
     rand.nextBytes(buffer);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Tue Feb 27 15:45:46 2007
@@ -23,6 +23,7 @@
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
 
 
 /**
@@ -46,7 +47,10 @@
   public void testDFSShell() throws IOException {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, 2, false);
-    FileSystem fileSys = cluster.getFileSystem();
+    FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+            fs instanceof DistributedFileSystem);
+    DistributedFileSystem fileSys = (DistributedFileSystem)fs;
     FsShell shell = new FsShell();
     shell.setConf(conf);
 
@@ -60,6 +64,7 @@
     	// Second, create a file in that directory.
     	Path myFile = new Path("/test/mkdirs/myFile");
     	writeFile(fileSys, myFile);
+        assertTrue(fileSys.exists(myFile));
 
         // Verify that we can read the file
         {
@@ -70,20 +75,19 @@
           try {
             val = shell.run(args);
             } catch (Exception e) {
-            System.err.println("Exception raised from DFSShell.run " +
-                               e.getLocalizedMessage()); 
+            System.err.println("Exception raised from DFSShell.run: " +
+                               StringUtils.stringifyException(e)); 
           }
           assertTrue(val == 0);
         }
 
         // Verify that we can get with and without crc
         {
-          File testFile = new File(TEST_ROOT_DIR, "mkdirs/myFile");
+          File testFile = new File(TEST_ROOT_DIR, "myFile");
           File checksumFile = new File(fileSys.getChecksumFile(
               new Path(testFile.getAbsolutePath())).toString());
           testFile.delete();
           checksumFile.delete();
-          new File(TEST_ROOT_DIR, "mkdirs").delete();
           
           String[] args = new String[3];
           args[0] = "-get";
@@ -102,12 +106,11 @@
           testFile.delete();
         }
         {
-          File testFile = new File(TEST_ROOT_DIR, "mkdirs/myFile");
+          File testFile = new File(TEST_ROOT_DIR, "myFile");
           File checksumFile = new File(fileSys.getChecksumFile(
               new Path(testFile.getAbsolutePath())).toString());
           testFile.delete();
           checksumFile.delete();
-          new File(TEST_ROOT_DIR, "mkdirs").delete();
           
           String[] args = new String[4];
           args[0] = "-get";

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Tue Feb 27 15:45:46 2007
@@ -22,8 +22,6 @@
 import java.util.Random;
 import java.net.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,8 +57,9 @@
   private void writeFile(FileSystem fileSys, Path name, int repl)
   throws IOException {
     // create and write a file that contains three blocks of data
-    FSOutputStream stm = fileSys.createRaw(name, true, (short)repl,
-        (long)blockSize);
+    FSDataOutputStream stm = fileSys.create(name, true, 
+            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+            (short)repl, (long)blockSize);
     byte[] buffer = new byte[fileSize];
     Random rand = new Random(seed);
     rand.nextBytes(buffer);
@@ -88,8 +87,11 @@
     // sleep an additional 10 seconds for the blockreports from the datanodes
     // to arrive. 
     //
-    FSInputStream is = fileSys.openRaw(name);
-    DFSClient.DFSInputStream dis = (DFSClient.DFSInputStream) is;
+    // need a raw stream
+    assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
+        
+    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) 
+        ((DistributedFileSystem)fileSys).getRawFileSystem().open(name);
     DatanodeInfo[][] dinfo = dis.getDataNodes();
 
     for (int blk = 0; blk < dinfo.length; blk++) { // for each block

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Tue Feb 27 15:45:46 2007
@@ -25,7 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
@@ -51,8 +51,9 @@
   private void writeFile(FileSystem fileSys, Path name, int repl)
   throws IOException {
     // create and write a file that contains three blocks of data
-    FSOutputStream stm = fileSys.createRaw(name, true, (short)repl,
-        (long)blockSize);
+    FSDataOutputStream stm = fileSys.create(name, true,
+            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+            (short)repl, (long)blockSize);
     byte[] buffer = new byte[fileSize];
     Random rand = new Random(seed);
     rand.nextBytes(buffer);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java Tue Feb 27 15:45:46 2007
@@ -17,14 +17,13 @@
  */
 package org.apache.hadoop.dfs;
 
-import javax.swing.filechooser.FileSystemView;
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -57,8 +56,7 @@
   }
   
   private void seekReadFile(FileSystem fileSys, Path name) throws IOException {
-    FSInputStream stmRaw = fileSys.openRaw(name);
-    FSDataInputStream stm = new FSDataInputStream(stmRaw, 4096);
+    FSDataInputStream stm = fileSys.open(name, 4096);
     byte[] expected = new byte[ONEMB];
     Random rand = new Random(seed);
     rand.nextBytes(expected);
@@ -83,7 +81,11 @@
    * Read some data, skip a few bytes and read more. HADOOP-922.
    */
   private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
-    FSInputStream stmRaw = fileSys.openRaw(name);
+    if (fileSys instanceof ChecksumFileSystem) {
+        fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
+    }
+    // Make the buffer size small to trigger code for HADOOP-922
+    FSDataInputStream stmRaw = fileSys.open(name, 1);
     byte[] expected = new byte[ONEMB];
     Random rand = new Random(seed);
     rand.nextBytes(expected);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java Tue Feb 27 15:45:46 2007
@@ -21,8 +21,8 @@
 import java.io.*;
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -38,8 +38,9 @@
 
   private void writeFile(FileSystem fileSys, Path name) throws IOException {
     // create and write a file that contains three blocks of data
-    FSOutputStream stm = fileSys.createRaw(name, true, (short)1,
-        (long)blockSize);
+    FSDataOutputStream stm = fileSys.create(name, true, 
+            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+            (short)1, (long)blockSize);
     byte[] buffer = new byte[fileSize];
     Random rand = new Random(seed);
     rand.nextBytes(buffer);
@@ -59,7 +60,7 @@
   private void checkFile(FileSystem fileSys, Path name) throws IOException {
     String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
     assertEquals("Number of blocks", fileSize, locations.length);
-    FSInputStream stm = fileSys.openRaw(name);
+    FSDataInputStream stm = fileSys.open(name);
     byte[] expected = new byte[fileSize];
     Random rand = new Random(seed);
     rand.nextBytes(expected);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java Tue Feb 27 15:45:46 2007
@@ -83,7 +83,7 @@
       fileSys.rename(file2, file1);
       
       // try reading a file
-      InputStream stm = fileSys.openRaw(file1);
+      InputStream stm = fileSys.open(file1);
       byte[] buffer = new byte[3];
       int bytesRead = stm.read(buffer, 0, 3);
       assertEquals("42\n", new String(buffer, 0, bytesRead));



Mime
View raw message