hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r831475 [1/2] - in /hadoop/common/trunk: ./ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/ftp/ src/java/org/apache/hadoop/fs/local/ src/test/core/org/apache/hadoop/fs/
Date Fri, 30 Oct 2009 22:24:23 GMT
Author: suresh
Date: Fri Oct 30 22:24:22 2009
New Revision: 831475

URL: http://svn.apache.org/viewvc?rev=831475&view=rev
Log:
Hadoop-6223. Add new file system interface AbstractFileSystem with implementation of some file systems that delegate to old FileSystem. Contributed by Sanjay Radia.

Added:
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/ftp/FtpFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/LocalConfigKeys.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/LocalFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/RawLocalFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/FileContextPermissionBase.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFcLocalFsPermission.java
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/core-default.xml
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/Options.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystemConfigKeys.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFileContextDeleteOnExit.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Fri Oct 30 22:24:22 2009
@@ -249,6 +249,10 @@
     HADOOP-6313. Implement Syncable interface in FSDataOutputStream to expose
     flush APIs to application users. (Hairong Kuang via suresh)
 
+    Hadoop-6223. Add new file system interface AbstractFileSystem with
+    implementation of some file systems that delegate to old FileSystem.
+    (Sanjay Radia via suresh)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

Modified: hadoop/common/trunk/src/java/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/core-default.xml?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/core-default.xml (original)
+++ hadoop/common/trunk/src/java/core-default.xml Fri Oct 30 22:24:22 2009
@@ -163,6 +163,19 @@
 </property>
 
 <property>
+  <name>fs.AbstractFileSystem.file.impl</name>
+  <value>org.apache.hadoop.fs.local.LocalFs</value>
+  <description>The AbstractFileSystem for file: uris.</description>
+</property>
+
+
+<property>
+  <name>fs.AbstractFileSystem.hdfs.impl</name>
+  <value>org.apache.hadoop.fs.Hdfs</value>
+  <description>The FileSystem for hdfs: uris.</description>
+</property>
+
+<property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
   <description>The FileSystem for s3: uris.</description>

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class provides an interface for implementors of a Hadoop filesystem
+ * (analogous to the VFS of Unix). Applications do not access this class;
+ * instead they access files across all filesystems using {@link FileContext}.
+ * 
+ * Pathnames passed to AbstractFileSystem can be fully qualified URI that
+ * matches the "this" filesystem (ie same scheme and authority) 
+ * or a Slash-relative name that is assumed to be relative
+ * to the root of the "this" filesystem .
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public abstract class AbstractFileSystem {
+  static final Log LOG = LogFactory.getLog(AbstractFileSystem.class);
+
+  /** Recording statistics per a filesystem class. */
+  private static final Map<Class<? extends AbstractFileSystem>, Statistics> 
+  STATISTICS_TABLE =
+      new IdentityHashMap<Class<? extends AbstractFileSystem>, Statistics>();
+  
+  /** Cache of constructors for each filesystem class. */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
+    new ConcurrentHashMap<Class<?>, Constructor<?>>();
+  
+  private static final Class<?>[] URI_CONFIG_ARGS = 
+    new Class[]{URI.class, Configuration.class};
+  
+  /** The statistics for this file system. */
+  protected Statistics statistics;
+  
+  private final URI myUri;
+  
+  protected Statistics getStatistics() {
+    return statistics;
+  }
+  
+  /**
+   * Prohibits names which contain a ".", "..". ":" or "/" 
+   */
+  private static boolean isValidName(String src) {
+    // Check for ".." "." ":" "/"
+    StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
+    while(tokens.hasMoreTokens()) {
+      String element = tokens.nextToken();
+      if (element.equals("..") || 
+          element.equals(".")  ||
+          (element.indexOf(":") >= 0)) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /** 
+   * Create an object for the given class and initialize it from conf.
+   * @param theClass class of which an object is created
+   * @param conf Configuration
+   * @return a new object
+   */
+  @SuppressWarnings("unchecked")
+  static <T> T newInstance(Class<T> theClass,
+    URI uri, Configuration conf) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(URI_CONFIG_ARGS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(uri, conf);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+  
+  /**
+   * Create a file system instance for the specified uri using the conf.
+   * The conf is used to find the class name that implements the filesystem.
+   * The conf is also passed to the filesystem for its configuration.
+   * @param uri
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  private static AbstractFileSystem createFileSystem(URI uri,
+    Configuration conf) throws IOException {
+    Class<?> clazz = conf.getClass("fs.AbstractFileSystem." + 
+                                uri.getScheme() + ".impl", null);
+    if (clazz == null) {
+      throw new IOException("No AbstractFileSystem for scheme: "
+          + uri.getScheme());
+    }
+    return (AbstractFileSystem) newInstance(clazz, uri, conf);
+  }
+  
+  
+  /**
+   * Get the statistics for a particular file system.
+   * @param cls the class to lookup
+   * @return a statistics object
+   */
+  protected static synchronized Statistics getStatistics(String scheme,
+      Class<? extends AbstractFileSystem> cls) {
+    Statistics result = STATISTICS_TABLE.get(cls);
+    if (result == null) {
+      result = new Statistics(scheme);
+      STATISTICS_TABLE.put(cls, result);
+    }
+    return result;
+  }
+  
+  protected static synchronized void clearStatistics() {
+    for(Statistics stat: STATISTICS_TABLE.values()) {
+      stat.reset();
+    }
+  }
+
+  protected static synchronized void printStatistics() throws IOException {
+    for (Map.Entry<Class<? extends AbstractFileSystem>, Statistics> pair: 
+            STATISTICS_TABLE.entrySet()) {
+      System.out.println("  FileSystem " + pair.getKey().getName() + 
+                         ": " + pair.getValue());
+    }
+  }
+
+  
+  /**
+   * The main factory method for creating a filesystem.
+   * Get a filesystem for the URI's scheme and authority.
+   * The scheme of the URI determines a configuration property name,
+   * <tt>fs.AbstractFileSystem.<i>scheme</i>.impl</tt> whose value names
+   * the AbstractFileSystem class. 
+   * The entire URI and conf is passed to the AbstractFileSystem factory
+   * method.
+   * @param uri for the file system to be created.
+   * @param conf which is passed to the filesystem impl.
+   */
+  static AbstractFileSystem get(final URI uri, final Configuration conf)
+    throws IOException {
+    return createFileSystem(uri, conf);
+  }
+
+  /**
+   * Constructor to be called by subclasses.
+   * 
+   * @param uri for this file system.
+   * @param supportedScheme the scheme supported by the implementor
+   * @param authorityNeeded if true then theURI must have authority, if false
+   *          then the URI must have null authority.
+   * @throws URISyntaxException
+   */
+  protected AbstractFileSystem(final URI uri, final String supportedScheme,
+      final boolean authorityNeeded, final int defaultPort) throws URISyntaxException {
+    myUri = getUri(uri, supportedScheme, authorityNeeded, defaultPort);
+    statistics = getStatistics(supportedScheme, getClass()); 
+  }
+  
+  protected void checkScheme(URI uri, String supportedScheme) {
+    String scheme = uri.getScheme();
+    if (scheme == null) {
+      throw new IllegalArgumentException("Uri without scheme: " + uri);
+    }
+    if (!scheme.equals(supportedScheme)) {
+      throw new IllegalArgumentException("Uri scheme " + uri
+          + " does not match the scheme " + supportedScheme);
+    }
+  }
+
+  /**
+   * Get the URI for the file system based on the given URI. The path, query
+   * part of the given URI is stripped out and default filesystem port is used
+   * to form the URI.
+   * 
+   * @param uri FileSystem URI.
+   * @param authorityNeeded if true authority cannot be null in the URI. If
+   *          false authority must be null.
+   * @param defaultPort default port to use if port is not specified in the URI.
+   * @return URI of the file system
+   * @throws URISyntaxException 
+   */
+  private URI getUri(URI uri, String supportedScheme,
+      boolean authorityNeeded, int defaultPort) throws URISyntaxException {
+    checkScheme(uri, supportedScheme);
+    // A filesystem implementation that requires authority must always
+    // specify default port
+    if (defaultPort < 0 && authorityNeeded) {
+      throw new IllegalArgumentException(
+          "FileSystem implementation error -  default port " + defaultPort
+              + " is not valid");
+    }
+    String authority = uri.getAuthority();
+    if (!authorityNeeded) {
+      if (authority != null) {
+        throw new IllegalArgumentException("Scheme with non-null authority: "
+            + uri);
+      }
+      return new URI(supportedScheme + ":///");
+    }
+    if (authority == null) {
+      throw new IllegalArgumentException("Uri without authority: " + uri);
+    }
+    int port = uri.getPort();
+    port = port == -1 ? defaultPort : port;
+    return new URI(supportedScheme + "://" + uri.getHost() + ":" + port);
+  }
+  
+  /**
+   * The default port of this filesystem.
+   * @return default port of this filesystem's Uri scheme
+   * A uri with a port of -1 => default port;
+   */
+  protected abstract int getUriDefaultPort();
+
+  /**
+   * Returns a URI whose scheme and authority identify this FileSystem.
+   * @return the uri of this filesystem.
+   */
+  protected URI getUri() {
+    return myUri;
+  }
+  
+  /**
+   * Check that a Path belongs to this FileSystem.
+   * 
+   * If the path is fully qualified URI, then its scheme and authority
+   * matches that of this file system. Otherwise the path must be 
+   * slash-relative name.
+   */
+  protected void checkPath(Path path) {
+    URI uri = path.toUri();
+    String thatScheme = uri.getScheme();
+    String thatAuthority = uri.getAuthority();
+    if (thatScheme == null) {
+      if (thatAuthority == null) {
+        if (path.isUriPathAbsolute()) {
+          return;
+        }
+        throw new IllegalArgumentException("relative paths not allowed:" + 
+            path);
+      } else {
+        throw new IllegalArgumentException(
+            "Path without scheme with non-null autorhrity:" + path);
+      }
+    }
+    String thisScheme = this.getUri().getScheme();
+    String thisAuthority = this.getUri().getAuthority();
+    
+    // Schemes and authorities must match.
+    // Allow for null Authority for file:///
+    if (!thisScheme.equalsIgnoreCase(thatScheme) ||
+       (thisAuthority != null && 
+            !thisAuthority.equalsIgnoreCase(thatAuthority)) ||
+       (thisAuthority == null && thatAuthority != null)) {
+      throw new IllegalArgumentException("Wrong FS: " + path + 
+                                    ", expected: "+this.getUri());
+    }
+    
+    int thisPort = this.getUri().getPort();
+    int thatPort = path.toUri().getPort();
+    if (thatPort == -1) { // -1 => defaultPort of Uri scheme
+      thatPort = this.getUriDefaultPort();
+    }
+    if (thisPort != thatPort) {
+      throw new IllegalArgumentException("Wrong FS: "+path+
+                                       ", expected: "+this.getUri());
+    }
+  }
+  
+  /**
+   * Get the path-part of a pathname. Checks that URI matches this filesystem
+   * and that the path-part is a valid name.
+   * @param p
+   * @return path-part of the Path p
+   */
+  protected String getUriPath(final Path p) {
+    checkPath(p);
+    String s = p.toUri().getPath();
+    if (!isValidName(s)) {
+      throw new IllegalArgumentException("Path part " + s + " from URI" +
+          p + " is not a valid filename.");
+    }
+    return s;
+  }
+  
+  /**
+   * Some file systems like LocalFileSystem have an initial workingDir
+   * that we use as the starting workingDir. For other file systems
+   * like HDFS there is no built in notion of an initial workingDir.
+   * 
+   * @return the initial workingDir if the filesystem if it has such a notion
+   * otherwise return a null.
+   */
+  protected Path getInitialWorkingDirectory() {
+    return null;
+  }
+  
+  /** 
+   * Return the current user's home directory in this filesystem.
+   * The default implementation returns "/user/$USER/".
+   */
+  protected Path getHomeDirectory() {
+    return new Path("/user/"+System.getProperty("user.name")).makeQualified(
+                                                                getUri(), null);
+  }
+  
+  /**
+   * Return a set of server default configuration values.
+   * @return server default configuration values
+   * @throws IOException
+   */
+  protected abstract FsServerDefaults getServerDefaults() throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#create(Path, EnumSet, Options.CreateOpts...)} except
+   * that the Path f must be fully qualified and the permission is absolute
+   * (i.e. umask has been applied).
+   */
+  protected final FSDataOutputStream create(final Path f,
+    final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts)
+    throws IOException {
+    checkPath(f);
+    int bufferSize = -1;
+    short replication = -1;
+    long blockSize = -1;
+    int bytesPerChecksum = -1;
+    FsPermission permission = null;
+    Progressable progress = null;
+    Boolean createParent = null;
+ 
+    for (CreateOpts iOpt : opts) {
+      if (CreateOpts.BlockSize.class.isInstance(iOpt)) {
+        if (blockSize != -1) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        blockSize = ((CreateOpts.BlockSize) iOpt).getValue();
+      } else if (CreateOpts.BufferSize.class.isInstance(iOpt)) {
+        if (bufferSize != -1) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        bufferSize = ((CreateOpts.BufferSize) iOpt).getValue();
+      } else if (CreateOpts.ReplicationFactor.class.isInstance(iOpt)) {
+        if (replication != -1) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        replication = ((CreateOpts.ReplicationFactor) iOpt).getValue();
+      } else if (CreateOpts.BytesPerChecksum.class.isInstance(iOpt)) {
+        if (bytesPerChecksum != -1) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
+      } else if (CreateOpts.Perms.class.isInstance(iOpt)) {
+        if (permission != null) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        permission = ((CreateOpts.Perms) iOpt).getValue();
+      } else if (CreateOpts.Progress.class.isInstance(iOpt)) {
+        if (progress != null) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        progress = ((CreateOpts.Progress) iOpt).getValue();
+      } else if (CreateOpts.CreateParent.class.isInstance(iOpt)) {
+        if (createParent != null) {
+          throw new IllegalArgumentException("multiple varargs of same kind");
+        }
+        createParent = ((CreateOpts.CreateParent) iOpt).getValue();
+      } else {
+        throw new IllegalArgumentException("Unkown CreateOpts of type " +
+            iOpt.getClass().getName());
+      }
+    }
+    if (permission == null) {
+      throw new IllegalArgumentException("no permission supplied");
+    }
+
+
+    FsServerDefaults ssDef = getServerDefaults();
+    if (ssDef.getBlockSize() % ssDef.getBytesPerChecksum() != 0) {
+      throw new IOException("Internal error: default blockSize is" + 
+          " not a multiple of default bytesPerChecksum ");
+    }
+    
+    if (blockSize == -1) {
+      blockSize = ssDef.getBlockSize();
+    }
+    if (bytesPerChecksum == -1) {
+      bytesPerChecksum = ssDef.getBytesPerChecksum();
+    }
+    if (bufferSize == -1) {
+      bufferSize = ssDef.getFileBufferSize();
+    }
+    if (replication == -1) {
+      replication = ssDef.getReplication();
+    }
+    if (createParent == null) {
+      createParent = false;
+    }
+
+    if (blockSize % bytesPerChecksum != 0) {
+      throw new IllegalArgumentException(
+             "blockSize should be a multiple of checksumsize");
+    }
+
+    return this.createInternal(f, createFlag, permission, bufferSize,
+      replication, blockSize, progress, bytesPerChecksum, createParent);
+  }
+
+  /**
+   * The specification of this method matches that of
+   * {@link #create(Path, EnumSet, Options.CreateOpts...)} except that the opts
+   * have been declared explicitly.
+   */
+  protected abstract FSDataOutputStream createInternal(Path f,
+      EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
+      short replication, long blockSize, Progressable progress,
+      int bytesPerChecksum, boolean createParent) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#mkdir(Path, FsPermission, boolean)} except that the Path
+   * f must be fully qualified and the permission is absolute (ie umask has been
+   * applied).
+   */
+  protected abstract void mkdir(final Path dir,
+      final FsPermission permission, final boolean createParent)
+    throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#delete(Path, boolean)} except that Path f must be for
+   * this filesystem.
+   */
+  protected abstract boolean delete(final Path f, final boolean recursive)
+    throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#open(Path)} except that Path f must be for this
+   * filesystem.
+   */
+  protected FSDataInputStream open(final Path f) throws IOException {
+    return open(f, getServerDefaults().getFileBufferSize());
+  }
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#open(Path, int)} except that Path f must be for this
+   * filesystem.
+   */
+  protected abstract FSDataInputStream open(final Path f, int bufferSize)
+    throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#setReplication(Path, short)} except that Path f must be
+   * for this filesystem.
+   */
+  protected abstract boolean setReplication(final Path f,
+    final short replication) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
+   * f must be for this filesystem.
+   */
+  protected final void rename(final Path src, final Path dst,
+    final Options.Rename... options) throws IOException {
+    boolean overwrite = false;
+    if (null != options) {
+      for (Rename option : options) {
+        if (option == Rename.OVERWRITE) {
+          overwrite = true;
+        }
+      }
+    }
+    renameInternal(src, dst, overwrite);
+  }
+  
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
+   * f must be for this filesystem and NO OVERWRITE is performed.
+   * 
+   * Filesystems that do not have a built in overwrite need implement only this
+   * method and can take advantage of the default impl of the other
+   * {@link #renameInternal(Path, Path, boolean)}
+   */
+  protected abstract void renameInternal(final Path src, final Path dst)
+    throws IOException;
+  
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
+   * f must be for this filesystem.
+   */
+  protected void renameInternal(final Path src, final Path dst,
+    boolean overwrite) throws IOException {
+    // Default implementation deals with overwrite in a non-atomic way
+    final FileStatus srcStatus = getFileStatus(src);
+    if (srcStatus == null) {
+      throw new FileNotFoundException("rename source " + src + " not found.");
+    }
+
+    FileStatus dstStatus;
+    try {
+      dstStatus = getFileStatus(dst);
+    } catch (IOException e) {
+      dstStatus = null;
+    }
+    if (dstStatus != null) {
+      if (srcStatus.isDir() != dstStatus.isDir()) {
+        throw new IOException("Source " + src + " Destination " + dst
+            + " both should be either file or directory");
+      }
+      if (!overwrite) {
+        throw new FileAlreadyExistsException("rename destination " + dst
+            + " already exists.");
+      }
+      // Delete the destination that is a file or an empty directory
+      if (dstStatus.isDir()) {
+        FileStatus[] list = listStatus(dst);
+        if (list != null && list.length != 0) {
+          throw new IOException(
+              "rename cannot overwrite non empty destination directory " + dst);
+        }
+      }
+      delete(dst, false);
+    } else {
+      final Path parent = dst.getParent();
+      final FileStatus parentStatus = getFileStatus(parent);
+      if (parentStatus == null) {
+        throw new FileNotFoundException("rename destination parent " + parent
+            + " not found.");
+      }
+      if (!parentStatus.isDir()) {
+        throw new ParentNotDirectoryException("rename destination parent "
+            + parent + " is a file.");
+      }
+    }
+    renameInternal(src, dst);
+  }
+  
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#setPermission(Path, FsPermission)} except that Path f
+   * must be for this filesystem.
+   */
+  protected abstract void setPermission(final Path f,
+      final FsPermission permission) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#setOwner(Path, String, String)} except that Path f must
+   * be for this filesystem.
+   */
+  protected abstract void setOwner(final Path f, final String username,
+      final String groupname) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#setTimes(Path, long, long)} except that Path f must be
+   * for this filesystem.
+   */
+  protected abstract void setTimes(final Path f, final long mtime,
+    final long atime) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#getFileChecksum(Path)} except that Path f must be for
+   * this filesystem.
+   */
+  protected abstract FileChecksum getFileChecksum(final Path f)
+    throws IOException;
+  
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
+   * must be for this filesystem.
+   */
+  protected abstract FileStatus getFileStatus(final Path f) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#getFileBlockLocations(Path, long, long)} except that
+   * Path f must be for this filesystem.
+   */
+  protected abstract BlockLocation[] getFileBlockLocations(final Path f,
+    final long start, final long len) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
+   * filesystem.
+   */
+  protected FsStatus getFsStatus(final Path f) throws IOException {
+    // default impl gets FsStatus of root
+    return getFsStatus();
+  }
+  
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
+   * filesystem.
+   */
+  protected abstract FsStatus getFsStatus() throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#listStatus(Path)} except that Path f must be for this
+   * filesystem.
+   */
+  protected abstract FileStatus[] listStatus(final Path f) throws IOException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
+   * must be for this filesystem.
+   */
+  protected abstract void setVerifyChecksum(final boolean verifyChecksum)
+    throws IOException;
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Abstract Checksumed Fs.
+ * It provide a basic implementation of a Checksumed Fs,
+ * which creates a checksum file for each raw file.
+ * It generates & verifies checksums at the client side.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public abstract class ChecksumFs extends FilterFs {
+  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
+  private int defaultBytesPerChecksum = 512;
+  private boolean verifyChecksum = true;
+
+  public static double getApproxChkSumLength(long size) {
+    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
+  }
+  
+  public ChecksumFs(AbstractFileSystem theFs)
+    throws IOException, URISyntaxException {
+    super(theFs);
+    defaultBytesPerChecksum = 
+      getMyFs().getServerDefaults().getBytesPerChecksum();
+  }
+  
+  /**
+   * Set whether to verify checksum.
+   */
+  public void setVerifyChecksum(boolean inVerifyChecksum) {
+    this.verifyChecksum = inVerifyChecksum;
+  }
+
+  /** get the raw file system. */
+  public AbstractFileSystem getRawFs() {
+    return getMyFs();
+  }
+
+  /** Return the name of the checksum file associated with a file.*/
+  public Path getChecksumFile(Path file) {
+    return new Path(file.getParent(), "." + file.getName() + ".crc");
+  }
+
+  /** Return true iff file is a checksum file name.*/
+  public static boolean isChecksumFile(Path file) {
+    String name = file.getName();
+    return name.startsWith(".") && name.endsWith(".crc");
+  }
+
+  /** Return the length of the checksum file given the size of the 
+   * actual file.
+   **/
+  public long getChecksumFileLength(Path file, long fileSize) {
+    return getChecksumLength(fileSize, getBytesPerSum());
+  }
+
+  /** Return the bytes Per Checksum. */
+  public int getBytesPerSum() {
+    return defaultBytesPerChecksum;
+  }
+
+  private int getSumBufferSize(int bytesPerSum, int bufferSize)
+    throws IOException {
+    int defaultBufferSize =  getMyFs().getServerDefaults().getFileBufferSize();
+    int proportionalBufferSize = bufferSize / bytesPerSum;
+    return Math.max(bytesPerSum,
+                    Math.max(proportionalBufferSize, defaultBufferSize));
+  }
+
+  /*******************************************************
+   * For open()'s FSInputStream
+   * It verifies that data matches checksums.
+   *******************************************************/
+  private static class ChecksumFSInputChecker extends FSInputChecker {
+    public static final Log LOG 
+      = LogFactory.getLog(FSInputChecker.class);
+    private static final int HEADER_LENGTH = 8;
+    
+    private ChecksumFs fs;
+    private FSDataInputStream datas;
+    private FSDataInputStream sums;
+    private int bytesPerSum = 1;
+    private long fileLen = -1L;
+    
+    public ChecksumFSInputChecker(ChecksumFs fs, Path file)
+      throws IOException {
+      this(fs, file, fs.getServerDefaults().getFileBufferSize());
+    }
+    
+    public ChecksumFSInputChecker(ChecksumFs fs, Path file, int bufferSize)
+      throws IOException {
+      super(file, fs.getFileStatus(file).getReplication());
+      this.datas = fs.getRawFs().open(file, bufferSize);
+      this.fs = fs;
+      Path sumFile = fs.getChecksumFile(file);
+      try {
+        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(),
+                                                bufferSize);
+        sums = fs.getRawFs().open(sumFile, sumBufferSize);
+
+        byte[] version = new byte[CHECKSUM_VERSION.length];
+        sums.readFully(version);
+        if (!Arrays.equals(version, CHECKSUM_VERSION)) {
+          throw new IOException("Not a checksum file: "+sumFile);
+        }
+        this.bytesPerSum = sums.readInt();
+        set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
+      } catch (FileNotFoundException e) {         // quietly ignore
+        set(fs.verifyChecksum, null, 1, 0);
+      } catch (IOException e) {                   // loudly ignore
+        LOG.warn("Problem opening checksum file: "+ file + 
+                 ".  Ignoring exception: " + 
+                 StringUtils.stringifyException(e));
+        set(fs.verifyChecksum, null, 1, 0);
+      }
+    }
+    
+    private long getChecksumFilePos(long dataPos) {
+      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
+    }
+    
+    protected long getChunkPosition(long dataPos) {
+      return dataPos/bytesPerSum*bytesPerSum;
+    }
+    
+    public int available() throws IOException {
+      return datas.available() + super.available();
+    }
+    
+    public int read(long position, byte[] b, int off, int len)
+      throws IOException {
+      // parameter check
+      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+        throw new IndexOutOfBoundsException();
+      } else if (len == 0) {
+        return 0;
+      }
+      if (position<0) {
+        throw new IllegalArgumentException(
+            "Parameter position can not to be negative");
+      }
+
+      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
+      checker.seek(position);
+      int nread = checker.read(b, off, len);
+      checker.close();
+      return nread;
+    }
+    
+    public void close() throws IOException {
+      datas.close();
+      if (sums != null) {
+        sums.close();
+      }
+      set(fs.verifyChecksum, null, 1, 0);
+    }
+    
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      final long sumsPos = getChecksumFilePos(targetPos);
+      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
+      final boolean newDataSource = datas.seekToNewSource(targetPos);
+      return sums.seekToNewSource(sumsPos) || newDataSource;
+    }
+
+    @Override
+    protected int readChunk(long pos, byte[] buf, int offset, int len,
+        byte[] checksum) throws IOException {
+      boolean eof = false;
+      if (needChecksum()) {
+        try {
+          final long checksumPos = getChecksumFilePos(pos); 
+          if (checksumPos != sums.getPos()) {
+            sums.seek(checksumPos);
+          }
+          sums.readFully(checksum);
+        } catch (EOFException e) {
+          eof = true;
+        }
+        len = bytesPerSum;
+      }
+      if (pos != datas.getPos()) {
+        datas.seek(pos);
+      }
+      final int nread = readFully(datas, buf, offset, len);
+      if (eof && nread > 0) {
+        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
+      }
+      return nread;
+    }
+    
+    /* Return the file length */
+    private long getFileLength() throws IOException {
+      if (fileLen==-1L) {
+        fileLen = fs.getFileStatus(file).getLen();
+      }
+      return fileLen;
+    }
+    
+    /**
+     * Skips over and discards <code>n</code> bytes of data from the
+     * input stream.
+     *
+     * The <code>skip</code> method skips over some smaller number of bytes
+     * when reaching end of file before <code>n</code> bytes have been skipped.
+     * The actual number of bytes skipped is returned.  If <code>n</code> is
+     * negative, no bytes are skipped.
+     *
+     * @param      n   the number of bytes to be skipped.
+     * @return     the actual number of bytes skipped.
+     * @exception  IOException  if an I/O error occurs.
+     *             ChecksumException if the chunk to skip to is corrupted
+     */
+    public synchronized long skip(long n) throws IOException {
+      final long curPos = getPos();
+      final long fileLength = getFileLength();
+      if (n+curPos > fileLength) {
+        n = fileLength - curPos;
+      }
+      return super.skip(n);
+    }
+    
+    /**
+     * Seek to the given position in the stream.
+     * The next read() will be from that position.
+     * 
+     * <p>This method does not allow seek past the end of the file.
+     * This produces IOException.
+     *
+     * @param      pos   the postion to seek to.
+     * @exception  IOException  if an I/O error occurs or seeks after EOF
+     *             ChecksumException if the chunk to seek to is corrupted
+     */
+
+    public synchronized void seek(long pos) throws IOException {
+      if (pos>getFileLength()) {
+        throw new IOException("Cannot seek after EOF");
+      }
+      super.seek(pos);
+    }
+
+  }
+
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   */
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return new FSDataInputStream(
+        new ChecksumFSInputChecker(this, f, bufferSize));
+  }
+
+  /**
+   * Calculated the length of the checksum file in bytes.
+   * @param size the length of the data file in bytes
+   * @param bytesPerSum the number of bytes in a checksum block
+   * @return the number of bytes in the checksum file
+   */
+  public static long getChecksumLength(long size, int bytesPerSum) {
+    //the checksum length is equal to size passed divided by bytesPerSum +
+    //bytes written in the beginning of the checksum file.  
+    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
+             CHECKSUM_VERSION.length + 4;  
+  }
+
+  /** This class provides an output stream for a checksummed file.
+   * It generates checksums for data. */
+  private static class ChecksumFSOutputSummer extends FSOutputSummer {
+    private FSDataOutputStream datas;    
+    private FSDataOutputStream sums;
+    private static final float CHKSUM_AS_FRACTION = 0.01f;
+    
+    
+    public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file, 
+      final EnumSet<CreateFlag> createFlag,
+      final FsPermission absolutePermission, final int bufferSize,
+      final short replication, final long blockSize, 
+      final Progressable progress, final int bytesPerChecksum,
+      final boolean createParent) throws IOException {
+      super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
+
+      this.datas = fs.getRawFs().createInternal(file, createFlag,
+          absolutePermission, bufferSize, replication, blockSize, progress,
+           bytesPerChecksum,  createParent);
+      
+      // Now create the chekcsumfile; adjust the buffsize
+      int bytesPerSum = fs.getBytesPerSum();
+      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
+      this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
+          EnumSet.of(CreateFlag.OVERWRITE), absolutePermission, sumBufferSize,
+          replication,  blockSize,  progress, bytesPerChecksum,  createParent);
+      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
+      sums.writeInt(bytesPerSum);
+    }
+    
+    public void close() throws IOException {
+      flushBuffer();
+      sums.close();
+      datas.close();
+    }
+    
+    @Override
+    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+      throws IOException {
+      datas.write(b, offset, len);
+      sums.write(checksum);
+    }
+  }
+
+  @Override
+  protected FSDataOutputStream createInternal(Path f,
+      EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
+      int bufferSize, short replication, long blockSize, Progressable progress,
+      int bytesPerChecksum, boolean createParent) throws IOException {
+
+    final FSDataOutputStream out = new FSDataOutputStream(
+        new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
+            bufferSize, replication, blockSize, progress,
+            bytesPerChecksum,  createParent), null);
+    return out;
+  }
+
+  /** Check if exists.
+   * @param f source file
+   */
+  private boolean exists(Path f) throws IOException {
+    try {
+      return getMyFs().getFileStatus(f) != null;
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+  
+  /** True iff the named path is a directory.
+   * Note: Avoid using this method. Instead reuse the FileStatus 
+   * returned by getFileStatus() or listStatus() methods.
+   */
+  private boolean isDirectory(Path f) throws IOException {
+    try {
+      return getMyFs().getFileStatus(f).isDir();
+    } catch (FileNotFoundException e) {
+      return false;               // f does not exist
+    }
+  }
+  /**
+   * Set replication for an existing file.
+   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
+   * @param src file name
+   * @param replication new replication
+   * @throws IOException
+   * @return true if successful;
+   *         false if file does not exist or is a directory
+   */
+  @Override
+  public boolean setReplication(Path src, short replication)
+    throws IOException {
+    boolean value = getMyFs().setReplication(src, replication);
+    if (!value) {
+      return false;
+    }
+    Path checkFile = getChecksumFile(src);
+    if (exists(checkFile)) {
+      getMyFs().setReplication(checkFile, replication);
+    }
+    return true;
+  }
+
+  /**
+   * Rename files/dirs.
+   */
+  @Override
+  public void renameInternal(Path src, Path dst) throws IOException {
+    if (isDirectory(src)) {
+      getMyFs().rename(src, dst);
+    } else {
+      getMyFs().rename(src, dst);
+
+      Path checkFile = getChecksumFile(src);
+      if (exists(checkFile)) { //try to rename checksum
+        if (isDirectory(dst)) {
+          getMyFs().rename(checkFile, dst);
+        } else {
+          getMyFs().rename(checkFile, getChecksumFile(dst));
+        }
+      }
+    }
+  }
+
+  /**
+   * Implement the delete(Path, boolean) in checksum
+   * file system.
+   */
+  public boolean delete(Path f, boolean recursive) throws IOException{
+    FileStatus fstatus = null;
+    try {
+      fstatus = getMyFs().getFileStatus(f);
+    } catch(FileNotFoundException e) {
+      return false;
+    }
+    if (fstatus.isDir()) {
+      //this works since the crcs are in the same
+      //directories and the files. so we just delete
+      //everything in the underlying filesystem
+      return getMyFs().delete(f, recursive);
+    } else {
+      Path checkFile = getChecksumFile(f);
+      if (exists(checkFile)) {
+        getMyFs().delete(checkFile, true);
+      }
+      return getMyFs().delete(f, true);
+    }
+  }
+
+  /**
+   * Report a checksum error to the file system.
+   * @param f the file name containing the error
+   * @param in the stream open on the file
+   * @param inPos the position of the beginning of the bad data in the file
+   * @param sums the stream open on the checksum file
+   * @param sumsPos the position of the beginning of the bad data in the
+   *         checksum file
+   * @return if retry is neccessary
+   */
+  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
+    long inPos, FSDataInputStream sums, long sumsPos) {
+    return false;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Implementation of AbstractFileSystem based on the existing implementation of 
+ * {@link FileSystem}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class DelegateToFileSystem extends AbstractFileSystem {
+  protected final FileSystem fsImpl;
+  
+  protected DelegateToFileSystem(URI theUri, FileSystem theFsImpl,
+      Configuration conf, String supportedScheme, boolean authorityRequired)
+      throws IOException, URISyntaxException {
+    super(theUri, supportedScheme, authorityRequired, 
+        FileSystem.getDefaultUri(conf).getPort());
+    fsImpl = theFsImpl;
+    fsImpl.initialize(theUri, conf);
+    fsImpl.statistics = getStatistics();
+  }
+
+  @Override
+  protected Path getInitialWorkingDirectory() {
+    return fsImpl.getInitialWorkingDirectory();
+  }
+  
+  @Override
+  @SuppressWarnings("deprecation") // call to primitiveCreate
+  protected FSDataOutputStream createInternal (Path f,
+      EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
+      short replication, long blockSize, Progressable progress,
+      int bytesPerChecksum, boolean createParent) throws IOException {
+    checkPath(f);
+    
+    // Default impl assumes that permissions do not matter
+    // calling the regular create is good enough.
+    // FSs that implement permissions should override this.
+
+    if (!createParent) { // parent must exist.
+      // since this.create makes parent dirs automatically
+      // we must throw exception if parent does not exist.
+      final FileStatus stat = getFileStatus(f.getParent());
+      if (stat == null) {
+        throw new FileNotFoundException("Missing parent:" + f);
+      }
+      if (!stat.isDir()) {
+          throw new ParentNotDirectoryException("parent is not a dir:" + f);
+      }
+      // parent does exist - go ahead with create of file.
+    }
+    return fsImpl.primitiveCreate(f, absolutePermission, flag, 
+        bufferSize, replication, blockSize, progress, bytesPerChecksum);
+  }
+
+  @Override
+  protected boolean delete(Path f, boolean recursive) throws IOException {
+    checkPath(f);
+    return fsImpl.delete(f, recursive);
+  }
+
+  @Override
+  protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
+      throws IOException {
+    checkPath(f);
+    return fsImpl.getFileBlockLocations(f, start, len);
+  }
+
+  @Override
+  protected FileChecksum getFileChecksum(Path f) throws IOException {
+    checkPath(f);
+    return fsImpl.getFileChecksum(f);
+  }
+
+  @Override
+  protected FileStatus getFileStatus(Path f) throws IOException {
+    checkPath(f);
+    return fsImpl.getFileStatus(f);
+  }
+
+  @Override
+  protected FsStatus getFsStatus() throws IOException {
+    return fsImpl.getStatus();
+  }
+
+  @Override
+  protected FsServerDefaults getServerDefaults() throws IOException {
+    return fsImpl.getServerDefaults();
+  }
+
+  @Override
+  protected int getUriDefaultPort() {
+    return 0;
+  }
+
+  @Override
+  protected FileStatus[] listStatus(Path f) throws IOException {
+    checkPath(f);
+    return fsImpl.listStatus(f);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation") // call to primitiveMkdir
+  protected void mkdir(Path dir, FsPermission permission, boolean createParent)
+      throws IOException {
+    checkPath(dir);
+    fsImpl.primitiveMkdir(dir, permission, createParent);
+    
+  }
+
+  @Override
+  protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    checkPath(f);
+    return fsImpl.open(f, bufferSize);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation") // call to rename
+  protected void renameInternal(Path src, Path dst) throws IOException {
+    checkPath(src);
+    checkPath(dst);
+    fsImpl.rename(src, dst, Options.Rename.NONE);
+    
+  }
+
+  @Override
+  protected void setOwner(Path f, String username, String groupname)
+      throws IOException {
+    checkPath(f);
+    fsImpl.setOwner(f, username, groupname);
+    
+  }
+
+  @Override
+  protected void setPermission(Path f, FsPermission permission)
+      throws IOException {
+    checkPath(f);
+    fsImpl.setPermission(f, permission);
+  }
+
+  @Override
+  protected boolean setReplication(Path f, short replication)
+      throws IOException {
+    checkPath(f);
+    return fsImpl.setReplication(f, replication);
+  }
+
+  @Override
+  protected void setTimes(Path f, long mtime, long atime) throws IOException {
+    checkPath(f);
+    fsImpl.setTimes(f, mtime, atime);
+    
+  }
+
+  @Override
+  protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
+    fsImpl.setVerifyChecksum(verifyChecksum);
+  }
+}

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java Fri Oct 30 22:24:22 2009
@@ -37,13 +37,12 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Progressable;
 
 /**
  * The FileContext class provides an interface to the application writer for
@@ -51,81 +50,102 @@
  * It provides a set of methods for the usual operation: create, open, 
  * list, etc 
  * 
- * *** Path Names ***
+ * <p>
+ * <b> *** Path Names *** </b>
+ * <p>
  * 
  * The Hadoop filesystem supports a URI name space and URI names.
  * It offers a a forest of filesystems that can be referenced using fully
  * qualified URIs.
- * 
  * Two common Hadoop filesystems implementations are
- *   the local filesystem: file:///path
- *   the hdfs filesystem hdfs://nnAddress:nnPort/path
- *   
+ * <ul>
+ * <li> the local filesystem: file:///path
+ * <li> the hdfs filesystem hdfs://nnAddress:nnPort/path
+ * </ul>  
  * While URI names are very flexible, it requires knowing the name or address
  * of the server. For convenience one often wants to access the default system
- * in your environment without knowing its name/address. This has an
+ * in one's environment without knowing its name/address. This has an
  * additional benefit that it allows one to change one's default fs
- *  (say your admin moves you from cluster1 to cluster2).
- *  
- * Too facilitate this Hadoop supports a notion of a default filesystem.
+ *  (e.g. admin moves application from cluster1 to cluster2).
+ *  <p>
+ * To facilitate this, Hadoop supports a notion of a default filesystem.
  * The user can set his default filesystem, although this is
- * typically set up for you in your environment in your default config.
+ * typically set up for you in your environment via your default config.
  * A default filesystem implies a default scheme and authority; slash-relative
  * names (such as /for/bar) are resolved relative to that default FS.
  * Similarly a user can also have working-directory-relative names (i.e. names
  * not starting with a slash). While the working directory is generally in the
- * same default FS, the wd can be in a different FS; in particular, changing
- * the default filesystem DOES NOT change the working directory,
- * 
+ * same default FS, the wd can be in a different FS.
+ * <p>
  *  Hence Hadoop path names can be one of:
- *       fully qualified URI:  scheme://authority/path
- *       slash relative names: /path    - relative to the default filesystem
- *       wd-relative names:    path        - relative to the working dir
- *       
- *       Relative paths with scheme (scheme:foo/bar) are illegal
- *  
- *  ****The Role of the FileContext and configuration defaults****
+ *  <ul>
+ *  <li> fully qualified URI:  scheme://authority/path
+ *  <li> slash relative names: /path    - relative to the default filesystem
+ *  <li> wd-relative names:    path        - relative to the working dir
+ *  </ul>   
+ *  Relative paths with scheme (scheme:foo/bar) are illegal.
+ *  <p>
+ *  <b>****The Role of the FileContext and configuration defaults****</b>
+ *  <p>
  *  The FileContext provides file namespace context for resolving file names;
  *  it also contains the umask for permissions, In that sense it is like the
  *  per-process file-related state in Unix system.
- *  These, in general, are obtained from the default configuration file
- *  in your environment,  (@see {@link Configuration}
- *  
+ *  These two properties
+ *  <ul> 
+ *  <li> default file system i.e your slash)
+ *  <li> umask
+ *  </ul>
+ *   in general, are obtained from the default configuration file
+ *  in your environment,  (@see {@link Configuration}).
  *  No other configuration parameters are obtained from the default config as 
  *  far as the file context layer is concerned. All filesystem instances
  *  (i.e. deployments of filesystems) have default properties; we call these
  *  server side (SS) defaults. Operation like create allow one to select many 
- *  properties: either pass them in as explicit parameters or 
- *  one can choose to used the SS properties.
- *  
+ *  properties: either pass them in as explicit parameters or use
+ *  the SS properties.
+ *  <p>
  *  The filesystem related SS defaults are
- *    - the home directory (default is "/user/<userName>")
- *    - the initial wd (only for local fs)
- *    - replication factor
- *    - block size
- *    - buffer size
- *    - bytesPerChecksum (if used).
+ *  <ul>
+ *  <li> the home directory (default is "/user/userName")
+ *  <li> the initial wd (only for local fs)
+ *  <li> replication factor
+ *  <li> block size
+ *  <li> buffer size
+ *  <li> bytesPerChecksum (if used).
+ *  </ul>
  *
- * 
- * *** Usage Model for the FileContext class ***
- * 
+ * <p>
+ * <b> *** Usage Model for the FileContext class *** </b>
+ * <p>
  * Example 1: use the default config read from the $HADOOP_CONFIG/core.xml.
- *            Unspecified values come from core-defaults.xml in the release jar.
- *            
- *     myFiles = getFileContext(); // uses the default config 
- *     myFiles.create(path, ...);
- *     myFiles.setWorkingDir(path)
- *     myFiles.open (path, ...);   
- *     
- * Example 2: Use a specific config, ignoring $HADOOP_CONFIG
- *    configX = someConfigSomeOnePassedToYou.
- *    myFContext = getFileContext(configX); //configX not changed but passeddown
- *    myFContext.create(path, ...);
- *    myFContext.setWorkingDir(path)
- *                                             
- * Other ways of creating new FileContexts:
- *   getLocalFSFileContext(...)  // local filesystem is the default FS
- *   getLocalFileContext(URI, ...) // where specified URI is default FS.
+ *   Unspecified values come from core-defaults.xml in the release jar.
+ *  <ul>  
+ *  <li> myFContext = FileContext.getFileContext(); // uses the default config
+ *                                                // which has your default FS 
+ *  <li>  myFContext.create(path, ...);
+ *  <li>  myFContext.setWorkingDir(path)
+ *  <li>  myFContext.open (path, ...);  
+ *  </ul>  
+ * Example 2: Get a FileContext with a specific URI as the default FS
+ *  <ul>  
+ *  <li> myFContext = FileContext.getFileContext(URI)
+ *  <li> myFContext.create(path, ...);
+ *   ...
+ * </ul> 
+ * Example 3: FileContext with local file system as the default
+ *  <ul> 
+ *  <li> myFContext = FileContext.getLocalFSFileContext()
+ *  <li> myFContext.create(path, ...);
+ *  <li> ...
+ *  </ul> 
+ * Example 4: Use a specific config, ignoring $HADOOP_CONFIG
+ *  Generally you should not need use a config unless you are doing
+ *   <ul> 
+ *   <li> configX = someConfigSomeOnePassedToYou.
+ *   <li> myFContext = getFileContext(configX); //configX not changed but passeddown
+ *   <li> myFContext.create(path, ...);
+ *   <li>...
+ *  </ul>                                          
  *    
  */
 
@@ -135,41 +155,44 @@
 public final class FileContext {
   
   public static final Log LOG = LogFactory.getLog(FileContext.class);
+  public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
   
   /**
-   * List of files that should be deleted on JVM shutdown
+   * List of files that should be deleted on JVM shutdown.
    */
-  final static Map<FileContext, Set<Path>> deleteOnExit = 
+  static final Map<FileContext, Set<Path>> DELETE_ON_EXIT = 
     new IdentityHashMap<FileContext, Set<Path>>();
 
-  /** JVM shutdown hook thread */
-  final static FileContextFinalizer finalizer = 
+  /** JVM shutdown hook thread. */
+  static final FileContextFinalizer FINALIZER = 
     new FileContextFinalizer();
   
+  private static final PathFilter DEFAULT_FILTER = new PathFilter() {
+    public boolean accept(final Path file) {
+      return true;
+    }
+  };
+  
   /**
    * The FileContext is defined by.
    *  1) defaultFS (slash)
    *  2) wd
    *  3) umask
-   *
    */   
-  private final FileSystem defaultFS; // the default FS for this FileContext.
+  private final AbstractFileSystem defaultFS; //default FS for this FileContext.
   private Path workingDir;          // Fully qualified
   private FsPermission umask;
-  private final Configuration conf; // passed to the filesystem below
-                                  // When we move to new AbstractFileSystem
-                                  // then it is not really needed except for
-                                  // undocumented config vars;
+  private final Configuration conf;
 
-  private FileContext(final FileSystem defFs, final FsPermission theUmask, 
-      final Configuration aConf) {
+  private FileContext(final AbstractFileSystem defFs,
+    final FsPermission theUmask, final Configuration aConf) {
     defaultFS = defFs;
     umask = FsPermission.getUMask(aConf);
     conf = aConf;
     /*
      * Init the wd.
      * WorkingDir is implemented at the FileContext layer 
-     * NOT at the FileSystem layer. 
+     * NOT at the AbstractFileSystem layer. 
      * If the DefaultFS, such as localFilesystem has a notion of
      *  builtin WD, we use that as the initial WD.
      *  Otherwise the WD is initialized to the home directory.
@@ -205,21 +228,20 @@
    * Delete all the paths that were marked as delete-on-exit.
    */
   static void processDeleteOnExit() {
-    synchronized (deleteOnExit) {
-      Set<Entry<FileContext, Set<Path>>> set = deleteOnExit.entrySet();
+    synchronized (DELETE_ON_EXIT) {
+      Set<Entry<FileContext, Set<Path>>> set = DELETE_ON_EXIT.entrySet();
       for (Entry<FileContext, Set<Path>> entry : set) {
         FileContext fc = entry.getKey();
         Set<Path> paths = entry.getValue();
         for (Path path : paths) {
           try {
             fc.delete(path, true);
-          }
-          catch (IOException e) {
+          } catch (IOException e) {
             LOG.warn("Ignoring failure to deleteOnExit for path " + path);
           }
         }
       }
-      deleteOnExit.clear();
+      DELETE_ON_EXIT.clear();
     }
   }
   
@@ -241,30 +263,27 @@
    * @return the filesystem of the path
    * @throws IOException
    */
-  private FileSystem getFSofPath(final Path absOrFqPath) throws IOException {
+  private AbstractFileSystem getFSofPath(final Path absOrFqPath)
+    throws IOException {
     checkNotSchemeWithRelative(absOrFqPath);
     if (!absOrFqPath.isAbsolute() && absOrFqPath.toUri().getScheme() == null) {
       throw new IllegalArgumentException(
           "FileContext Bug: path is relative");
     }
-    
-    // TBD cleanup this impl once we create a new FileSystem to replace current
-    // one - see HADOOP-6223.
+
     try { 
       // Is it the default FS for this FileContext?
       defaultFS.checkPath(absOrFqPath);
       return defaultFS;
     } catch (Exception e) { // it is different FileSystem
-      return FileSystem.get(absOrFqPath.toUri(), conf);
+      return AbstractFileSystem.get(absOrFqPath.toUri(), conf);
     }
   }
   
   
   /**
    * Protected Static Factory methods for getting a FileContexts
-   * that take a FileSystem as input. To be used for testing.
-   * Protected since new FileSystem will be protected.
-   * Note new file contexts are created for each call.
+   * that take a AbstractFileSystem as input. To be used for testing.
    */
   
   /**
@@ -276,27 +295,24 @@
    * @return new FileContext with specifed FS as default.
    * @throws IOException if the filesystem with specified cannot be created
    */
-  protected static FileContext getFileContext(final FileSystem defFS,
+  protected static FileContext getFileContext(final AbstractFileSystem defFS,
                     final Configuration aConf) throws IOException {
     return new FileContext(defFS, FsPermission.getUMask(aConf), aConf);
   }
   
   /**
-   * Create a FileContext for specified FileSystem using the default config.
+   * Create a FileContext for specified filesystem using the default config.
    * 
    * @param defaultFS
-   * @return a FileSystem for the specified URI
-   * @throws IOException if the filesysem with specified cannot be created
+   * @return a FileContext with the specified AbstractFileSystem
+   *                 as the default FS.
+   * @throws IOException if the filesystem with specified cannot be created
    */
-  protected static FileContext getFileContext(final FileSystem defaultFS)
-    throws IOException {
+  protected static FileContext getFileContext(
+    final AbstractFileSystem defaultFS) throws IOException {
     return getFileContext(defaultFS, new Configuration());
   }
  
- 
-  public static final URI LOCAL_FS_URI = URI.create("file:///");
-  public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
-  
   /**
    * Static Factory methods for getting a FileContext.
    * Note new file contexts are created for each call.
@@ -310,7 +326,7 @@
    * The keys relevant to the FileContext layer are extracted at time of
    * construction. Changes to the config after the call are ignore
    * by the FileContext layer. 
-   * The conf is passed to lower layers like FileSystem and HDFS which
+   * The conf is passed to lower layers like AbstractFileSystem and HDFS which
    * pick up their own config variables.
    */
    
@@ -320,7 +336,7 @@
    * Unspecified key-values for config are defaulted from core-defaults.xml
    * in the release jar.
    * 
-   * @throws IOException if default FileSystem in the config  cannot be created
+   * @throws IOException if default filesystem in the config  cannot be created
    */
   public static FileContext getFileContext() throws IOException {
     return getFileContext(new Configuration());
@@ -334,7 +350,7 @@
    */
   public static FileContext getLocalFSFileContext() throws IOException {
     if (localFsSingleton == null) {
-      localFsSingleton = getFileContext(LOCAL_FS_URI); 
+      localFsSingleton = getFileContext(FsConstants.LOCAL_FS_URI); 
     }
     return localFsSingleton;
   }
@@ -344,7 +360,7 @@
    * Create a FileContext for specified URI using the default config.
    * 
    * @param defaultFsUri
-   * @return a FileSystem for the specified URI
+   * @return a FileContext with the specified URI as the default FS.
    * @throws IOException if the filesysem with specified cannot be created
    */
   public static FileContext getFileContext(final URI defaultFsUri)
@@ -362,15 +378,18 @@
    */
   public static FileContext getFileContext(final URI defaultFsUri,
                     final Configuration aConf) throws IOException {
-    return getFileContext(FileSystem.get(defaultFsUri,  aConf), aConf);
+    return getFileContext(AbstractFileSystem.get(defaultFsUri,  aConf), aConf);
   }
 
   /**
    * Create a FileContext using the passed config.
+   * Generally it is better to use {@link #getFileContext(URI, Configuration)}
+   * instead of this one.
+   * 
    * 
    * @param aConf
    * @return new FileContext
-   * @throws IOException  if default FileSystem in the config  cannot be created
+   * @throws IOException  if default filesystem in the config  cannot be created
    */
   public static FileContext getFileContext(final Configuration aConf)
     throws IOException {
@@ -385,14 +404,14 @@
    */
   public static FileContext getLocalFSFileContext(final Configuration aConf)
     throws IOException {
-    return getFileContext(LOCAL_FS_URI, aConf);
+    return getFileContext(FsConstants.LOCAL_FS_URI, aConf);
   }
 
   /* This method is needed for tests. */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable /* return type will change to AFS once
                                   HADOOP-6223 is completed */
-  protected FileSystem getDefaultFileSystem() {
+  protected AbstractFileSystem getDefaultFileSystem() {
     return defaultFS;
   }
   
@@ -463,59 +482,49 @@
    * @param f the file name to open
    * @param createFlag gives the semantics  of create: overwrite, append etc.
    * @param opts  - varargs of CreateOpt:
-   *     Progress - to report progress on the operation - default null
-   *     Permission - umask is applied against permisssion:
-   *                  default FsPermissions:getDefault()
-   *                  @see #setPermission(Path, FsPermission)
-   *     CreateParent - create missing parent path
-   *                  default is to not create parents
-   *     
-   *                The defaults for the following are  SS defaults of the
-   *                file server implementing the tart path.
-   *                Not all parameters make sense for all kinds of filesystem
+   * <ul>
+   * <li>   Progress - to report progress on the operation - default null
+   * <li>   Permission - umask is applied against permisssion:
+   *                  default is FsPermissions:getDefault()
+
+   * <li>   CreateParent - create missing parent path; default is to not
+   *                   create parents
+   * <li> The defaults for the following are  SS defaults of the
+   *      file server implementing the target path. 
+   *      Not all parameters make sense for all kinds of filesystem
    *                - eg. localFS ignores Blocksize, replication, checksum
-   *    BufferSize - buffersize used in FSDataOutputStream
-   *    Blocksize - block size for file blocks
-   *    ReplicationFactor - replication for blocks
-   *    BytesPerChecksum - bytes per checksum
-   *                     
-   *    
+   * <ul>
+   * <li>  BufferSize - buffersize used in FSDataOutputStream
+   * <li>  Blocksize - block size for file blocks
+   * <li>  ReplicationFactor - replication for blocks
+   * <li>  BytesPerChecksum - bytes per checksum
+   * </ul>
+   * </ul>
+   *                       
    * @throws IOException
+   * 
+   * @see #setPermission(Path, FsPermission)
    */
-  @SuppressWarnings("deprecation") // call to primitiveCreate
   public FSDataOutputStream create(final Path f,
-                                    final EnumSet<CreateFlag> createFlag,
-                                    CreateOpts... opts)
+                                   final EnumSet<CreateFlag> createFlag,
+                                   Options.CreateOpts... opts)
     throws IOException {
     Path absF = fixRelativePart(f);
-    FileSystem fsOfAbsF = getFSofPath(absF);
+    AbstractFileSystem fsOfAbsF = getFSofPath(absF);
 
     // If one of the options is a permission, extract it & apply umask
     // If not, add a default Perms and apply umask;
-    // FileSystem#create
+    // AbstractFileSystem#create
 
-    FsPermission permission = null;
-
-    if (opts != null) {
-      for (int i = 0; i < opts.length; ++i) {
-        if (opts[i] instanceof CreateOpts.Perms) {
-          if (permission != null) 
-            throw new IllegalArgumentException("multiple permissions varargs");
-          permission = ((CreateOpts.Perms) opts[i]).getValue();
-          opts[i] = CreateOpts.perms(permission.applyUMask(umask));
-        }
-      }
-    }
-
-    CreateOpts[] theOpts = opts;
-    if (permission == null) { // no permission was set
-      CreateOpts[] newOpts = new CreateOpts[opts.length + 1];
-      System.arraycopy(opts, 0, newOpts, 0, opts.length);
-      newOpts[opts.length] = 
-        CreateOpts.perms(FsPermission.getDefault().applyUMask(umask));
-      theOpts = newOpts;
-    }
-    return fsOfAbsF.primitiveCreate(absF, createFlag, theOpts);
+    CreateOpts.Perms permOpt = 
+      (CreateOpts.Perms) CreateOpts.getOpt(CreateOpts.Perms.class, opts);
+    FsPermission permission = (permOpt != null) ? permOpt.getValue() :
+                                      FsPermission.getDefault();
+    permission = permission.applyUMask(umask);
+
+    CreateOpts[] updatedOpts = 
+                      CreateOpts.setOpt(CreateOpts.perms(permission), opts);
+    return fsOfAbsF.create(absF, createFlag, updatedOpts);
   }
   
   /**
@@ -529,14 +538,13 @@
    * @throws IOException when operation fails not authorized or 
    *   if parent does not exist and createParent is false.
    */
-  @SuppressWarnings("deprecation") // call to primitiveMkdir
   public void mkdir(final Path dir, final FsPermission permission,
       final boolean createParent)
     throws IOException {
     Path absDir = fixRelativePart(dir);
     FsPermission absFerms = (permission == null ? 
           FsPermission.getDefault() : permission).applyUMask(umask);
-    getFSofPath(absDir).primitiveMkdir(absDir, absFerms, createParent);
+    getFSofPath(absDir).mkdir(absDir, absFerms, createParent);
   }
 
   /**
@@ -615,15 +623,15 @@
    * @param dst new path after rename
    * @throws IOException on failure
    */
-  @SuppressWarnings("deprecation")
-  public void rename(final Path src, final Path dst, final Rename... options)
-    throws IOException {
+
+  public void rename(final Path src, final Path dst,
+      final Options.Rename... options) throws IOException {
     final Path absSrc  = fixRelativePart(src);
     final Path absDst = fixRelativePart(dst);
-    FileSystem srcFS = getFSofPath(absSrc);
-    FileSystem dstFS = getFSofPath(absDst);
+    AbstractFileSystem srcFS = getFSofPath(absSrc);
+    AbstractFileSystem dstFS = getFSofPath(absDst);
     if(!srcFS.getUri().equals(dstFS.getUri())) {
-      throw new IOException("Renames across FileSystems not supported");
+      throw new IOException("Renames across AbstractFileSystems not supported");
     }
     srcFS.rename(absSrc, absDst, options);
   }
@@ -750,10 +758,10 @@
    */
   public FsStatus getFsStatus(final Path f) throws IOException {
     if (f == null) {
-      return defaultFS.getStatus(null);
+      return defaultFS.getFsStatus(null);
     }
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).getStatus(absF);
+    return getFSofPath(absF).getFsStatus(absF);
   }
   
   /**
@@ -827,15 +835,15 @@
     if (!exists(f)) {
       return false;
     }
-    synchronized (deleteOnExit) {
-      if (deleteOnExit.isEmpty() && !finalizer.isAlive()) {
-        Runtime.getRuntime().addShutdownHook(finalizer);
+    synchronized (DELETE_ON_EXIT) {
+      if (DELETE_ON_EXIT.isEmpty() && !FINALIZER.isAlive()) {
+        Runtime.getRuntime().addShutdownHook(FINALIZER);
       }
       
-      Set<Path> set = deleteOnExit.get(this);
+      Set<Path> set = DELETE_ON_EXIT.get(this);
       if (set == null) {
         set = new TreeSet<Path>();
-        deleteOnExit.put(this, set);
+        DELETE_ON_EXIT.put(this, set);
       }
       set.add(f);
     }
@@ -878,6 +886,31 @@
       return results.toArray(new FileStatus[results.size()]);
     }
     
+    
+    /**
+     * Return the {@link ContentSummary} of path f.
+     * @param f
+     * @return the {@link ContentSummary} of path f.
+     * @throws IOException
+     */
+    public ContentSummary getContentSummary(Path f) throws IOException {
+      FileStatus status = FileContext.this.getFileStatus(f);
+      if (!status.isDir()) {
+        // f is a file
+        return new ContentSummary(status.getLen(), 1, 0);
+      }
+      // f is a directory
+      long[] summary = {0, 0, 1};
+      for(FileStatus s : FileContext.this.listStatus(f)) {
+        ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :
+                                       new ContentSummary(s.getLen(), 1, 0);
+        summary[0] += c.getLength();
+        summary[1] += c.getFileCount();
+        summary[2] += c.getDirectoryCount();
+      }
+      return new ContentSummary(summary[0], summary[1], summary[2]);
+    }
+    
     /**
      * Filter files/directories in the given list of paths using default
      * path filter.
@@ -1020,25 +1053,22 @@
      * @return an array of FileStatus objects
      * @throws IOException if any I/O error occurs when fetching file status
      */
-    public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter)
-      throws IOException {
-      
+    public FileStatus[] globStatus(final Path pathPattern,
+        final PathFilter filter) throws IOException {
+      URI uri = getFSofPath(fixRelativePart(pathPattern)).getUri();
+
       String filename = pathPattern.toUri().getPath();
-      
+
       List<String> filePatterns = GlobExpander.expand(filename);
       if (filePatterns.size() == 1) {
-        Path p = fixRelativePart(pathPattern);
-        FileSystem fs = getFSofPath(p);
-        URI uri = fs.getUri();
-        return globStatusInternal(uri, p, filter);
+        Path absPathPattern = fixRelativePart(pathPattern);
+        return globStatusInternal(uri, new Path(absPathPattern.toUri()
+            .getPath()), filter);
       } else {
         List<FileStatus> results = new ArrayList<FileStatus>();
-        for (String filePattern : filePatterns) {
-          Path p = new Path(filePattern);
-          p = fixRelativePart(p);
-          FileSystem fs = getFSofPath(p);
-          URI uri = fs.getUri();
-          FileStatus[] files = globStatusInternal(uri, p, filter);
+        for (String iFilePattern : filePatterns) {
+          Path iAbsFilePattern = fixRelativePart(new Path(iFilePattern));
+          FileStatus[] files = globStatusInternal(uri, iAbsFilePattern, filter);
           for (FileStatus file : files) {
             results.add(file);
           }
@@ -1047,36 +1077,45 @@
       }
     }
 
+    /**
+     * 
+     * @param uri for all the inPathPattern
+     * @param inPathPattern - without the scheme & authority (take from uri)
+     * @param filter
+     * @return
+     * @throws IOException
+     */
     private FileStatus[] globStatusInternal(
         final URI uri, final Path inPathPattern, final PathFilter filter)
       throws IOException {
       Path[] parents = new Path[1];
       int level = 0;
       
-      // comes in as full path, but just in case
-      final Path pathPattern = fixRelativePart(inPathPattern);
+      assert(inPathPattern.toUri().getScheme() == null &&
+          inPathPattern.toUri().getAuthority() == null && 
+          inPathPattern.isUriPathAbsolute());
+
       
-      String filename = pathPattern.toUri().getPath();
+      String filename = inPathPattern.toUri().getPath();
       
       // path has only zero component
       if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
-        Path p = pathPattern.makeQualified(uri, null);
+        Path p = inPathPattern.makeQualified(uri, null);
         return getFileStatus(new Path[]{p});
       }
 
       // path has at least one component
       String[] components = filename.split(Path.SEPARATOR);
-      // get the first component
-      if (pathPattern.isAbsolute()) {
-        parents[0] = new Path(Path.SEPARATOR);
-        level = 1;
-      } else {
-        parents[0] = new Path(Path.CUR_DIR);
-      }
+      
+      // Path is absolute, first component is "/" hence first component
+      // is the uri root
+      parents[0] = new Path(new Path(uri), new Path("/"));
+      level = 1;
 
       // glob the paths that match the parent path, ie. [0, components.length-1]
       boolean[] hasGlob = new boolean[]{false};
-      Path[] relParentPaths = globPathsLevel(parents, components, level, hasGlob);
+      Path[] relParentPaths = 
+        globPathsLevel(parents, components, level, hasGlob);
       FileStatus[] results;
       
       if (relParentPaths == null || relParentPaths.length == 0) {
@@ -1212,12 +1251,6 @@
     }
   }
 
-  private static final PathFilter DEFAULT_FILTER = new PathFilter() {
-    public boolean accept(final Path file) {
-      return true;
-    }
-  };
-  
   /* A class that could decide if a string matches the glob or not */
   private static class GlobFilter implements PathFilter {
     private PathFilter userFilter = DEFAULT_FILTER;
@@ -1401,7 +1434,7 @@
   }
 
   /**
-   * Deletes all the paths in deleteOnExit on JVM shutdown
+   * Deletes all the paths in deleteOnExit on JVM shutdown.
    */
   static class FileContextFinalizer extends Thread {
     public synchronized void run() {

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java Fri Oct 30 22:24:22 2009
@@ -148,6 +148,10 @@
   public Path getPath() {
     return path;
   }
+  
+  public void setPath(final Path p) {
+    path = p;
+  }
 
   /* These are provided so that these values could be loaded lazily 
    * by a filesystem (e.g. local file system).

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,170 @@
+package org.apache.hadoop.fs;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A <code>FilterFs</code> contains some other file system, which it uses as its
+ * basic file system, possibly transforming the data along the way or providing
+ * additional functionality. The class <code>FilterFs</code> itself simply
+ * overrides all methods of <code>AbstractFileSystem</code> with versions that
+ * pass all requests to the contained file system. Subclasses of
+ * <code>FilterFs</code> may further override some of these methods and may also
+ * provide additional methods and fields.
+ * 
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public abstract class FilterFs extends AbstractFileSystem {
+  private final AbstractFileSystem myFs;
+  
+  protected AbstractFileSystem getMyFs() {
+    return myFs;
+  }
+  
+  protected FilterFs(AbstractFileSystem fs) throws IOException,
+      URISyntaxException {
+    super(fs.getUri(), fs.getUri().getScheme(),
+        fs.getUri().getAuthority() != null, fs.getUriDefaultPort());
+    myFs = fs;
+  }
+
+  @Override
+  protected Path getInitialWorkingDirectory() {
+    return myFs.getInitialWorkingDirectory();
+  }
+  
+  @Override
+  protected FSDataOutputStream createInternal(Path f,
+    EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
+    short replication, long blockSize, Progressable progress,
+    int bytesPerChecksum, boolean createParent) throws IOException {
+    checkPath(f);
+    return myFs.createInternal(f, flag, absolutePermission, bufferSize,
+        replication, blockSize, progress, bytesPerChecksum, createParent);
+  }
+
+  @Override
+  protected boolean delete(Path f, boolean recursive) throws IOException {
+    checkPath(f);
+    return myFs.delete(f, recursive);
+  }
+
+  @Override
+  protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
+    throws IOException {
+    checkPath(f);
+    return myFs.getFileBlockLocations(f, start, len);
+  }
+
+  @Override
+  protected FileChecksum getFileChecksum(Path f) throws IOException {
+    checkPath(f);
+    return myFs.getFileChecksum(f);
+  }
+
+  @Override
+  protected FileStatus getFileStatus(Path f) throws IOException {
+    checkPath(f);
+    return myFs.getFileStatus(f);
+  }
+
+  @Override
+  protected FsStatus getFsStatus() throws IOException {
+    return myFs.getFsStatus();
+  }
+
+  @Override
+  protected FsServerDefaults getServerDefaults() throws IOException {
+    return myFs.getServerDefaults();
+  }
+
+  @Override
+  protected int getUriDefaultPort() {
+    return myFs.getUriDefaultPort();
+  }
+
+  @Override
+  protected FileStatus[] listStatus(Path f) throws IOException {
+    checkPath(f);
+    return myFs.listStatus(f);
+  }
+
+  @Override
+  protected void mkdir(Path dir, FsPermission permission, boolean createParent)
+    throws IOException {
+    checkPath(dir);
+    myFs.mkdir(dir, permission, createParent);
+    
+  }
+
+  @Override
+  protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    checkPath(f);
+    return myFs.open(f, bufferSize);
+  }
+
+  @Override
+  protected void renameInternal(Path src, Path dst) throws IOException {
+    checkPath(src);
+    checkPath(dst);
+    myFs.rename(src, dst, Options.Rename.NONE);
+    
+  }
+
+  @Override
+  protected void setOwner(Path f, String username, String groupname)
+    throws IOException {
+    checkPath(f);
+    myFs.setOwner(f, username, groupname);
+    
+  }
+
+  @Override
+  protected void setPermission(Path f, FsPermission permission)
+    throws IOException {
+    checkPath(f);
+    myFs.setPermission(f, permission);
+  }
+
+  @Override
+  protected boolean setReplication(Path f, short replication)
+    throws IOException {
+    checkPath(f);
+    return myFs.setReplication(f, replication);
+  }
+
+  @Override
+  protected void setTimes(Path f, long mtime, long atime) throws IOException {
+    checkPath(f);
+    myFs.setTimes(f, mtime, atime);
+    
+  }
+
+  @Override
+  protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
+    myFs.setVerifyChecksum(verifyChecksum);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.net.URI;
+
+/**
+ * FileSystem related constants.
+ */
+public interface FsConstants {
+  // URI for local filesystem
+  public static final URI LOCAL_FS_URI = URI.create("file:///");
+  
+  // URI scheme for FTP
+  public static final String FTP_SCHEME = "ftp";
+}



Mime
View raw message