hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r512499 [1/3] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop...
Date Tue, 27 Feb 2007 23:45:51 GMT
Author: cutting
Date: Tue Feb 27 15:45:46 2007
New Revision: 512499

URL: http://svn.apache.org/viewvc?view=rev&rev=512499
Log:
HADOOP-928.  Make checksums optional per FileSystem.  Contributed by Hairong.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestCheckpoint.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSeekBug.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Feb 27 15:45:46 2007
@@ -150,6 +150,9 @@
 44. HADOOP-1042.  Improve the handling of failed map output fetches.
     (Devaraj Das via cutting)
 
+45. HADOOP-928.  Make checksums optional per FileSystem.
+    (Hairong Kuang via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Feb 27 15:45:46 2007
@@ -201,7 +201,7 @@
      * inner subclass of InputStream that does the right out-of-band
      * work.
      */
-    public FSInputStream open(UTF8 src) throws IOException {
+    public DFSInputStream open(UTF8 src) throws IOException {
         checkOpen();
         //    Get block info from namenode
         return new DFSInputStream(src.toString());
@@ -215,7 +215,7 @@
      * @return output stream
      * @throws IOException
      */
-    public FSOutputStream create( UTF8 src, 
+    public OutputStream create( UTF8 src, 
                                   boolean overwrite
                                 ) throws IOException {
       return create( src, overwrite, defaultReplication, defaultBlockSize, null);
@@ -230,7 +230,7 @@
      * @return output stream
      * @throws IOException
      */
-    public FSOutputStream create( UTF8 src, 
+    public OutputStream create( UTF8 src, 
                                   boolean overwrite,
                                   Progressable progress
                                 ) throws IOException {
@@ -247,7 +247,7 @@
      * @return output stream
      * @throws IOException
      */
-    public FSOutputStream create( UTF8 src, 
+    public OutputStream create( UTF8 src, 
                                   boolean overwrite, 
                                   short replication,
                                   long blockSize
@@ -266,14 +266,14 @@
      * @return output stream
      * @throws IOException
      */
-    public FSOutputStream create( UTF8 src, 
+    public OutputStream create( UTF8 src, 
                                   boolean overwrite, 
                                   short replication,
                                   long blockSize,
                                   Progressable progress
                                 ) throws IOException {
       checkOpen();
-      FSOutputStream result = new DFSOutputStream(src, overwrite, 
+      OutputStream result = new DFSOutputStream(src, overwrite, 
                                                   replication, blockSize, progress);
       synchronized (pendingCreates) {
         pendingCreates.put(src.toString(), result);
@@ -931,11 +931,45 @@
             throw new IOException("Mark not supported");
         }
     }
+    
+    static class DFSDataInputStream extends FSDataInputStream {
+      DFSDataInputStream(DFSInputStream in, Configuration conf)
+      throws IOException {
+        super(in, conf);
+      }
+      
+      DFSDataInputStream(DFSInputStream in, int bufferSize) throws IOException {
+        super(in, bufferSize);
+      }
+      
+      /**
+       * Returns the datanode from which the stream is currently reading.
+       */
+      public DatanodeInfo getCurrentDatanode() {
+        return ((DFSInputStream)inStream).getCurrentDatanode();
+      }
+      
+      /**
+       * Returns the block containing the target position. 
+       */
+      public Block getCurrentBlock() {
+        return ((DFSInputStream)inStream).getCurrentBlock();
+      }
+
+      /**
+       * Used by the automatic tests to detemine blocks locations of a
+       * file
+       */
+      synchronized DatanodeInfo[][] getDataNodes() {
+        return ((DFSInputStream)inStream).getDataNodes();
+      }
+
+    }
 
     /****************************************************************
      * DFSOutputStream creates files from a stream of bytes.
      ****************************************************************/
-    class DFSOutputStream extends FSOutputStream {
+    class DFSOutputStream extends OutputStream {
         private Socket s;
         boolean closed = false;
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Feb 27 15:45:46 2007
@@ -33,19 +33,21 @@
  *
  * @author Mike Cafarella
  *****************************************************************/
-public class DistributedFileSystem extends FileSystem {
-    private Path workingDir = 
-      new Path("/user", System.getProperty("user.name"));
-
+public class DistributedFileSystem extends ChecksumFileSystem {
+    private static class RawDistributedFileSystem extends FileSystem {
+    private Path workingDir =
+        new Path("/user", System.getProperty("user.name")); 
     private URI uri;
     private FileSystem localFs;
 
     DFSClient dfs;
 
-    public DistributedFileSystem() {}
+    public RawDistributedFileSystem() {
+    }
+
 
     /** @deprecated */
-    public DistributedFileSystem(InetSocketAddress namenode,
+    public RawDistributedFileSystem(InetSocketAddress namenode,
                                  Configuration conf) throws IOException {
       initialize(URI.create("hdfs://"+
                             namenode.getHostName()+":"+
@@ -119,24 +121,32 @@
       return dfs.getHints(getPath(f), start, len);
     }
 
-    public FSInputStream openRaw(Path f) throws IOException {
-      return dfs.open(getPath(f));
-    }
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      if (! exists(f)) {
+        throw new FileNotFoundException(f.toString());
+      }
 
-    public FSOutputStream createRaw(Path f, boolean overwrite, 
-                                    short replication, long blockSize)
-      throws IOException {
-      return dfs.create(getPath(f), overwrite, replication, blockSize);
+      return new DFSClient.DFSDataInputStream(dfs.open(getPath(f)), bufferSize);
     }
 
-    public FSOutputStream createRaw(Path f, boolean overwrite, 
-                                    short replication, long blockSize,
-                                    Progressable progress)
-      throws IOException {
-      return dfs.create(getPath(f), overwrite, replication, blockSize, progress);
+    public FSDataOutputStream create(Path f, boolean overwrite,
+            int bufferSize, short replication, long blockSize,
+            Progressable progress) throws IOException {
+      if (exists(f) && ! overwrite) {
+         throw new IOException("File already exists:"+f);
+      }
+      Path parent = f.getParent();
+      if (parent != null && !mkdirs(parent)) {
+        throw new IOException("Mkdirs failed to create " + parent);
+      }
+      
+      return new FSDataOutputStream(
+           dfs.create(getPath(f), overwrite,
+                   replication, blockSize, progress),
+           bufferSize);
     }
     
-    public boolean setReplicationRaw( Path src, 
+    public boolean setReplication( Path src, 
                                       short replication
                                     ) throws IOException {
       return dfs.setReplication(getPath(src), replication);
@@ -145,14 +155,14 @@
     /**
      * Rename files/dirs
      */
-    public boolean renameRaw(Path src, Path dst) throws IOException {
+    public boolean rename(Path src, Path dst) throws IOException {
       return dfs.rename(getPath(src), getPath(dst));
     }
 
     /**
      * Get rid of Path f, whether a true file or dir.
      */
-    public boolean deleteRaw(Path f) throws IOException {
+    public boolean delete(Path f) throws IOException {
         return dfs.delete(getPath(f));
     }
 
@@ -194,7 +204,7 @@
       return info[0].getReplication();
   }
 
-    public Path[] listPathsRaw(Path f) throws IOException {
+    public Path[] listPaths(Path f) throws IOException {
         DFSFileInfo info[] = dfs.listPaths(getPath(f));
         if (info == null) {
             return new Path[0];
@@ -221,16 +231,16 @@
         dfs.release(getPath(f));
     }
 
-    public void moveFromLocalFile(Path src, Path dst) throws IOException {
-      FileUtil.copy(localFs, src, this, dst, true, true, getConf());
-    }
-
-    public void copyFromLocalFile(Path src, Path dst) throws IOException {
-      FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+    @Override
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
+      FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
     }
 
-    public void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException {
-      FileUtil.copy(this, src, localFs, dst, false, copyCrc, getConf());
+    @Override
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
+      FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
     }
 
     public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
@@ -257,8 +267,41 @@
 
     DFSClient getClient() {
         return dfs;
+    }        
+    /** Return the total raw capacity of the filesystem, disregarding
+     * replication .*/
+    public long getRawCapacity() throws IOException{
+        return dfs.totalRawCapacity();
+    }
+
+    /** Return the total raw used space in the filesystem, disregarding
+     * replication .*/
+    public long getRawUsed() throws IOException{
+        return dfs.totalRawUsed();
+    }
+
+    /** Return statistics for each datanode. */
+    public DatanodeInfo[] getDataNodeStats() throws IOException {
+      return dfs.datanodeReport();
     }
     
+    /**
+     * Enter, leave or get safe mode.
+     *  
+     * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+     */
+    public boolean setSafeMode( FSConstants.SafeModeAction action ) 
+    throws IOException {
+      return dfs.setSafeMode( action );
+    }
+
+    /*
+     * Refreshes the list of hosts and excluded hosts from the configured 
+     * files.  
+     */
+    public void refreshNodes() throws IOException {
+      dfs.refreshNodes();
+    }
 
     /**
      * We need to find the blocks that didn't match.  Likely only one 
@@ -266,14 +309,14 @@
      * we can consider figuring out exactly which block is corrupt.
      */
     public void reportChecksumFailure(Path f, 
-                                      FSInputStream in, long inPos, 
-                                      FSInputStream sums, long sumsPos) {
+                                      FSDataInputStream in, long inPos, 
+                                      FSDataInputStream sums, long sumsPos) {
       
       LocatedBlock lblocks[] = new LocatedBlock[2];
 
       try {
         // Find block in data stream.
-        DFSClient.DFSInputStream dfsIn = (DFSClient.DFSInputStream) in;
+        DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
         Block dataBlock = dfsIn.getCurrentBlock();
         if (dataBlock == null) {
           throw new IOException("Error: Current block in data stream is null! ");
@@ -284,7 +327,7 @@
                  " on datanode=" + dataNode[0].getName());
 
         // Find block in checksum stream
-        DFSClient.DFSInputStream dfsSums = (DFSClient.DFSInputStream) sums;
+        DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
         Block sumsBlock = dfsSums.getCurrentBlock();
         if (sumsBlock == null) {
           throw new IOException("Error: Current block in checksum stream is null! ");
@@ -305,32 +348,33 @@
       }
 
     }
+    }
+
+    public DistributedFileSystem() {
+        super( new RawDistributedFileSystem() );
+    }
+
+    /** @deprecated */
+    public DistributedFileSystem(InetSocketAddress namenode,
+                                 Configuration conf) throws IOException {
+      super( new RawDistributedFileSystem(namenode, conf) );
+    }
 
     /** Return the total raw capacity of the filesystem, disregarding
      * replication .*/
     public long getRawCapacity() throws IOException{
-        return dfs.totalRawCapacity();
+        return ((RawDistributedFileSystem)fs).getRawCapacity();
     }
 
     /** Return the total raw used space in the filesystem, disregarding
      * replication .*/
     public long getRawUsed() throws IOException{
-        return dfs.totalRawUsed();
-    }
-
-    /** Return the total size of all files in the filesystem.*/
-    public long getUsed()throws IOException{
-        long used = 0;
-        DFSFileInfo dfsFiles[] = dfs.listPaths(getPath(new Path("/")));
-        for(int i=0;i<dfsFiles.length;i++){
-            used += dfsFiles[i].getContentsLen();
-        }
-        return used;
+        return ((RawDistributedFileSystem)fs).getRawUsed();
     }
 
     /** Return statistics for each datanode. */
     public DatanodeInfo[] getDataNodeStats() throws IOException {
-      return dfs.datanodeReport();
+      return ((RawDistributedFileSystem)fs).getDataNodeStats();
     }
     
     /**
@@ -340,7 +384,7 @@
      */
     public boolean setSafeMode( FSConstants.SafeModeAction action ) 
     throws IOException {
-      return dfs.setSafeMode( action );
+      return ((RawDistributedFileSystem)fs).setSafeMode( action );
     }
 
     /*
@@ -348,6 +392,17 @@
      * files.  
      */
     public void refreshNodes() throws IOException {
-      dfs.refreshNodes();
+      ((RawDistributedFileSystem)fs).refreshNodes();
+    }
+    /**
+     * We need to find the blocks that didn't match.  Likely only one 
+     * is corrupt but we will report both to the namenode.  In the future,
+     * we can consider figuring out exactly which block is corrupt.
+     */
+    public void reportChecksumFailure(Path f, 
+                                      FSDataInputStream in, long inPos, 
+                                      FSDataInputStream sums, long sumsPos) {
+      ((RawDistributedFileSystem)fs).reportChecksumFailure(
+                f, in, inPos, sums, sumsPos);
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java Tue Feb 27 15:45:46 2007
@@ -22,6 +22,7 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -34,7 +35,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.net.DNS;
 
@@ -255,7 +255,7 @@
       }
       // create chains
       int chain = 0;
-      FSOutputStream fos = null;
+      OutputStream fos = null;
       for (int i = 0; i < blocks.length; i++) {
         LocatedBlock lblock = blocks[i];
         DatanodeInfo[] locs = lblock.getLocations();
@@ -305,7 +305,7 @@
    * around.
    */
       private void copyBlock(DFSClient dfs, LocatedBlock lblock,
-          FSOutputStream fos) throws Exception {
+          OutputStream fos) throws Exception {
     int failures = 0;
     InetSocketAddress targetAddr = null;
     TreeSet deadNodes = new TreeSet();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java Tue Feb 27 15:45:46 2007
@@ -31,7 +31,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.io.UTF8;
 
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Feb 27 15:45:46 2007
@@ -288,6 +288,10 @@
     byte[] digest = null;
 
     FileSystem fileSystem = getFileSystem(cache, conf);
+    if(!(fileSystem instanceof ChecksumFileSystem)) {
+        throw new IOException( "Not a checksummed file system: "
+                +fileSystem.getUri() );
+    }
     String filename = cache.getPath();
     Path filePath = new Path(filename);
     Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR
@@ -299,8 +303,15 @@
       // do nothing
     }
     if (!fileSystem.exists(md5File)) {
-      FSInputStream fsStream = fileSystem.openRaw(FileSystem
-          .getChecksumFile(filePath));
+      ChecksumFileSystem checksumFs;
+      if(!(fileSystem instanceof ChecksumFileSystem)) {
+          throw new IOException(
+                  "Not a checksumed file system: "+fileSystem.getUri());
+      } else {
+          checksumFs = (ChecksumFileSystem)fileSystem;
+      }
+      FSDataInputStream fsStream = checksumFs.getRawFileSystem().open(
+              checksumFs.getChecksumFile(filePath));
       int read = fsStream.read(b);
       while (read != -1) {
         md5.update(b, 0, read);
@@ -313,7 +324,7 @@
       out.write(digest);
       out.close();
     } else {
-      FSInputStream fsStream = fileSystem.openRaw(md5File);
+      FSDataInputStream fsStream = fileSystem.open(md5File);
       digest = new byte[md5.getDigestLength()];
       // assuming reading 16 bytes once is not a problem
       // though it should be checked if 16 bytes have been read or not

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?view=auto&rev=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Tue Feb 27 15:45:46 2007
@@ -0,0 +1,629 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/****************************************************************
+ * Abstract Checksumed FileSystem.
+ * It provide a basice implementation of a Checksumed FileSystem,
+ * which creates a checksum file for each raw file.
+ * It generates & verifies checksums at the client side.
+ *
+ * @author Hairong Kuang
+ *****************************************************************/
+public abstract class ChecksumFileSystem extends FilterFileSystem {
+  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
+
+  public ChecksumFileSystem(FileSystem fs) {
+    super(fs);
+  }
+
+  /** get the raw file system */
+  public FileSystem getRawFileSystem() {
+    return fs;
+  }
+
+  /** Return the name of the checksum file associated with a file.*/
+  public Path getChecksumFile(Path file) {
+    return new Path(file.getParent(), "." + file.getName() + ".crc");
+  }
+
+  /** Return true iff file is a checksum file name.*/
+  public static boolean isChecksumFile(Path file) {
+    String name = file.getName();
+    return name.startsWith(".") && name.endsWith(".crc");
+  }
+
+  /** Return the length of the checksum file given the size of the 
+   * actual file.
+   **/
+  public long getChecksumFileLength(Path file, long fileSize) {
+    return FSOutputSummer.getChecksumLength(fileSize, getBytesPerSum());
+  }
+
+  /** Return the bytes Per Checksum */
+  public int getBytesPerSum() {
+    return getConf().getInt("io.bytes.per.checksum", 512);
+  }
+
+  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
+    int defaultBufferSize = getConf().getInt("io.file.buffer.size", 4096);
+    int proportionalBufferSize = bufferSize / bytesPerSum;
+    return Math.max(bytesPerSum,
+                    Math.max(proportionalBufferSize, defaultBufferSize));
+  }
+
+  /*******************************************************
+   * For open()'s FSInputStream
+   * It verifies that data matches checksums.
+   *******************************************************/
+  private static class FSInputChecker extends FSInputStream {
+    public static final Log LOG 
+      = LogFactory.getLog("org.apache.hadoop.fs.FSInputChecker");
+    
+    private ChecksumFileSystem fs;
+    private Path file;
+    private FSDataInputStream datas;
+    private FSDataInputStream sums;
+    private Checksum sum = new CRC32();
+    private int inSum;
+    
+    private static final int HEADER_LENGTH = 8;
+    
+    private int bytesPerSum = 1;
+    
+    public FSInputChecker(ChecksumFileSystem fs, Path file)
+      throws IOException {
+      this(fs, file, fs.getConf().getInt("io.file.buffer.size", 4096));
+    }
+    
+    public FSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
+      throws IOException {
+      // open with an extremly small buffer size,
+      // so that the buffer could be by-passed by the buffer in FSDataInputStream
+      datas = fs.getRawFileSystem().open(file, 1);
+      this.fs = fs;
+      this.file = file;
+      Path sumFile = fs.getChecksumFile(file);
+      try {
+        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(),bufferSize);
+        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
+
+        byte[] version = new byte[CHECKSUM_VERSION.length];
+        sums.readFully(version);
+        if (!Arrays.equals(version, CHECKSUM_VERSION))
+          throw new IOException("Not a checksum file: "+sumFile);
+        bytesPerSum = sums.readInt();
+      } catch (FileNotFoundException e) {         // quietly ignore
+        stopSumming();
+      } catch (IOException e) {                   // loudly ignore
+        LOG.warn("Problem opening checksum file: "+ file + 
+                 ".  Ignoring exception: " + 
+                 StringUtils.stringifyException(e));
+        stopSumming();
+      }
+    }
+
+    public void seek(long desired) throws IOException {
+      // seek to a checksum boundary
+      long checksumBoundary = desired/bytesPerSum*bytesPerSum;
+      if(checksumBoundary != getPos()) {
+        datas.seek(checksumBoundary);
+        sums.seek(HEADER_LENGTH + 4*(checksumBoundary/bytesPerSum));
+      }
+      
+      sum.reset();
+      inSum = 0;
+      
+      // scan to desired position
+      int delta = (int)(desired - checksumBoundary);
+      readBuffer(new byte[delta], 0, delta);
+    }
+    
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      readBuffer(b, 0, 1);
+      return b[0] & 0xff;
+    }
+
+    public int read(byte b[]) throws IOException {
+      return read(b, 0, b.length);
+    }
+
+    public int read(byte b[], int off, int len) throws IOException {
+      // make sure that it ends at a checksum boundary
+      long curPos = getPos();
+      long endPos = len+curPos/bytesPerSum*bytesPerSum;
+      return readBuffer(b, off, (int)(endPos-curPos));
+    }
+    
+    private int readBuffer(byte b[], int off, int len) throws IOException {
+      int read;
+      boolean retry;
+      int retriesLeft = 3;
+      long oldPos = getPos();
+      do {
+        retriesLeft--;
+        retry = false;
+        
+        read = 0;
+        boolean endOfFile=false;
+        while (read < len && !endOfFile) {
+          int count = datas.read(b, off + read, len - read);
+          if (count < 0)
+            endOfFile = true;
+          else
+            read += count;
+        }
+        
+        if (sums != null && read!=0) {
+          long oldSumsPos = sums.getPos();
+          try {
+            int summed = 0;
+            while (summed < read) {
+              int goal = bytesPerSum - inSum;
+              int inBuf = read - summed;
+              int toSum = inBuf <= goal ? inBuf : goal;
+              
+              try {
+                sum.update(b, off+summed, toSum);
+              } catch (ArrayIndexOutOfBoundsException e) {
+                throw new RuntimeException("Summer buffer overflow b.len=" + 
+                                           b.length + ", off=" + off + 
+                                           ", summed=" + summed + ", read=" + 
+                                           read + ", bytesPerSum=" + bytesPerSum +
+                                           ", inSum=" + inSum, e);
+              }
+              summed += toSum;
+              
+              inSum += toSum;
+              if (inSum == bytesPerSum || endOfFile) {
+                verifySum(read-(summed-bytesPerSum));
+              }
+            }
+          } catch (ChecksumException ce) {
+            LOG.info("Found checksum error: "+StringUtils.stringifyException(ce));
+            if (retriesLeft == 0) {
+              throw ce;
+            }
+            
+            sums.seek(oldSumsPos);
+            datas.seek(oldPos);
+            
+            if (seekToNewSource(oldPos)) {
+              // Neither the data stream nor the checksum stream are being read
+              // from different sources, meaning we'll still get a checksum error 
+              // if we try to do the read again.  We throw an exception instead.
+              throw ce;
+            } else {
+              // Since at least one of the sources is different, 
+              // the read might succeed, so we'll retry.
+              retry = true;
+            }
+          }
+        }
+      } while (retry);
+      return read==0?-1:read;
+    }
+    
+    private void verifySum(int delta) throws IOException {
+      int crc;
+      try {
+        crc = sums.readInt();
+      } catch (IOException e) {
+        LOG.warn("Problem reading checksum file: "+e+". Ignoring.");
+        stopSumming();
+        return;
+      }
+      int sumValue = (int)sum.getValue();
+      sum.reset();
+      inSum = 0;
+      if (crc != sumValue) {
+        long pos = getPos() - delta;
+        fs.reportChecksumFailure(file, datas, pos, sums, pos/bytesPerSum);
+        throw new ChecksumException("Checksum error: "+file+" at "+pos);
+      }
+    }
+    
+    public long getPos() throws IOException {
+      return datas.getPos();
+    }
+    
+    public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+      return datas.read(position, buffer, offset, length);
+    }
+    
+    public void readFully(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+      datas.readFully(position, buffer, offset, length);
+    }
+    
+    public void readFully(long position, byte[] buffer)
+      throws IOException {
+      datas.readFully(position, buffer);
+    }
+    
+    public void close() throws IOException {
+      datas.close();
+      stopSumming();
+    }
+    
+    private void stopSumming() {
+      if (sums != null) {
+        try {
+          sums.close();
+        } catch (IOException f) {}
+        sums = null;
+        bytesPerSum = 1;
+      }
+    }
+    
+    public int available() throws IOException {
+      return datas.available();
+    }
+    
+    public boolean markSupported() {
+      return datas.markSupported();
+    }
+    
+    public synchronized void mark(int readlimit) {
+      datas.mark(readlimit);
+    }
+    
+    public synchronized void reset() throws IOException {
+      datas.reset();
+    }
+    
+    public long skip(long n) throws IOException {
+      return datas.skip(n);
+    }
+
+    @Override
+      public boolean seekToNewSource(long targetPos) throws IOException {
+      return datas.seekToNewSource(targetPos) &&
+        sums.seekToNewSource(targetPos/bytesPerSum);
+    }
+
+  }
+
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   */
+  @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    if (!exists(f)) {
+      throw new FileNotFoundException(f.toString());
+    }
+    return new FSDataInputStream(new FSInputChecker(this, f, bufferSize),
+                                 bufferSize);
+  }
+
+  /** This class provides an output stream for a checksummed file.
+   * It generates checksums for data. */
+  private static class FSOutputSummer extends FilterOutputStream {
+    
+    private FSDataOutputStream sums;
+    private Checksum sum = new CRC32();
+    private int inSum;
+    private int bytesPerSum;
+    
+    public FSOutputSummer(ChecksumFileSystem fs, 
+                          Path file, 
+                          boolean overwrite, 
+                          short replication,
+                          long blockSize,
+                          Configuration conf)
+      throws IOException {
+      this(fs, file, overwrite, 
+           conf.getInt("io.file.buffer.size", 4096),
+           replication, blockSize, null);
+    }
+    
+    public FSOutputSummer(ChecksumFileSystem fs, 
+                          Path file, 
+                          boolean overwrite,
+                          int bufferSize,
+                          short replication,
+                          long blockSize,
+                          Progressable progress)
+      throws IOException {
+      super(fs.getRawFileSystem().create(file, overwrite, 1, 
+                                         replication, blockSize, progress));
+      this.bytesPerSum = fs.getBytesPerSum();
+      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
+      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
+                                               sumBufferSize, replication,
+                                               blockSize);
+      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
+      sums.writeInt(this.bytesPerSum);
+    }
+    
+    public void write(byte b[], int off, int len) throws IOException {
+      int summed = 0;
+      while (summed < len) {
+        
+        int goal = this.bytesPerSum - inSum;
+        int inBuf = len - summed;
+        int toSum = inBuf <= goal ? inBuf : goal;
+        
+        sum.update(b, off+summed, toSum);
+        summed += toSum;
+        
+        inSum += toSum;
+        if (inSum == this.bytesPerSum) {
+          writeSum();
+        }
+      }
+      
+      out.write(b, off, len);
+    }
+    
+    private void writeSum() throws IOException {
+      if (inSum != 0) {
+        sums.writeInt((int)sum.getValue());
+        sum.reset();
+        inSum = 0;
+      }
+    }
+    
+    public void close() throws IOException {
+      writeSum();
+      sums.close();
+      super.close();
+    }
+    
+    public static long getChecksumLength(long size, int bytesPerSum) {
+      //the checksum length is equal to size passed divided by bytesPerSum +
+      //bytes written in the beginning of the checksum file.  
+      return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 + 
+        CHECKSUM_VERSION.length;  
+    }
+  }
+
+  /**
+   * Opens an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open
+   * @param overwrite if a file with this name already exists, then if true,
+   *   the file will be overwritten, and if false an error will be thrown.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file. 
+   */
+  @Override
+    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
+                                     short replication, long blockSize, Progressable progress)
+    throws IOException {
+    if (exists(f) && !overwrite) {
+      throw new IOException("File already exists:" + f);
+    }
+    Path parent = f.getParent();
+    if (parent != null && !mkdirs(parent)) {
+      throw new IOException("Mkdirs failed to create " + parent);
+    }
+    return new FSDataOutputStream(new FSOutputSummer(this, f, overwrite,
+                                                     bufferSize, replication, blockSize, progress), bufferSize);
+  }
+
+  /**
+   * Set replication for an existing file.
+   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
+   * @param src file name
+   * @param replication new replication
+   * @throws IOException
+   * @return true if successful;
+   *         false if file does not exist or is a directory
+   */
+  public boolean setReplication(Path src, short replication) throws IOException {
+    boolean value = fs.setReplication(src, replication);
+    if (!value)
+      return false;
+
+    Path checkFile = getChecksumFile(src);
+    if (exists(checkFile))
+      fs.setReplication(checkFile, replication);
+
+    return true;
+  }
+
+  /**
+   * Rename files/dirs
+   */
+  public boolean rename(Path src, Path dst) throws IOException {
+    if (fs.isDirectory(src)) {
+      return fs.rename(src, dst);
+    } else {
+
+      boolean value = fs.rename(src, dst);
+      if (!value)
+        return false;
+
+      Path checkFile = getChecksumFile(src);
+      if (fs.exists(checkFile)) { //try to rename checksum
+        if (fs.isDirectory(dst)) {
+          value = fs.rename(checkFile, dst);
+        } else {
+          value = fs.rename(checkFile, getChecksumFile(dst));
+        }
+      }
+
+      return value;
+    }
+  }
+
+  /**
+   * Get rid of Path f, whether a true file or dir.
+   */
+  public boolean delete(Path f) throws IOException {
+    if (fs.isDirectory(f)) {
+      return fs.delete(f);
+    } else {
+      Path checkFile = getChecksumFile(f);
+      if(fs.exists(checkFile)) {
+        fs.delete(checkFile);
+      }
+
+      return fs.delete(f);
+    }
+  }
+
+  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
+      public boolean accept(Path file) {
+        return !isChecksumFile(file);
+      }
+    };
+
+  /** 
+   * Filter raw files in the given pathes using the default checksum filter. 
+   * @param files a list of paths
+   * @return a list of files under the source paths
+   * @exception IOException
+   */
+  @Override
+    public Path[] listPaths(Path[] files) throws IOException {
+    return fs.listPaths(files, DEFAULT_FILTER);
+  }
+
+  /** 
+   * Filter raw files in the given path using the default checksum filter. 
+   * @param f source path
+   * @return a list of files under the source path
+   * @exception IOException
+   */
+  public Path[] listPaths(Path f) throws IOException {
+    return fs.listPaths(f, DEFAULT_FILTER);
+  }
+
+  @Override
+    public boolean mkdirs(Path f) throws IOException {
+    return fs.mkdirs(f);
+  }
+
+  @Override
+    public void lock(Path f, boolean shared) throws IOException {
+    if (fs.isDirectory(f)) {
+      fs.lock(f, shared);
+    } else {
+      Path checkFile = getChecksumFile(f);
+      if(fs.exists(checkFile)) {
+        fs.lock(checkFile, shared);
+      }
+      fs.lock(f, shared);
+    }
+  }
+
+  @Override
+    public void release(Path f) throws IOException {
+    if (fs.isDirectory(f)) {
+      fs.release(f);
+    } else {
+      Path checkFile = getChecksumFile(f);
+      if(fs.exists(checkFile)) {
+        fs.release(getChecksumFile(f));
+      }
+      fs.release(f);
+    }
+  }
+
+  @Override
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
+    FileSystem localFs = getNamed("file:///", getConf());
+    FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
+  }
+
+  /**
+   * The src file is under FS, and the dst is on the local disk.
+   * Copy it from FS control to the local dst name.
+   */
+  @Override
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException {
+    FileSystem localFs = getNamed("file:///", getConf());
+    FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
+  }
+
+  /**
+   * The src file is under FS, and the dst is on the local disk.
+   * Copy it from FS control to the local dst name.
+   * If src and dst are directories, the copyCrc parameter
+   * determines whether to copy CRC files.
+   */
+  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
+    throws IOException {
+    if (!fs.isDirectory(src)) { // source is a file
+      fs.copyToLocalFile(src, dst);
+      FileSystem localFs = getNamed("file:///", getConf());
+      if (localFs instanceof ChecksumFileSystem) {
+        localFs = ((ChecksumFileSystem) localFs).getRawFileSystem();
+      }
+      if (localFs.isDirectory(dst)) {
+        dst = new Path(dst, src.getName());
+      }
+      dst = getChecksumFile(dst);
+      if (localFs.exists(dst)) { //remove old local checksum file
+        localFs.delete(dst);
+      }
+      Path checksumFile = getChecksumFile(src);
+      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
+        fs.copyToLocalFile(checksumFile, dst);
+      }
+    } else {
+      Path[] srcs = listPaths(src);
+      for (Path srcFile : srcs) {
+        copyToLocalFile(srcFile, dst, copyCrc);
+      }
+    }
+  }
+
+  @Override
+    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+    throws IOException {
+    return tmpLocalFile;
+  }
+
+  @Override
+    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+    throws IOException {
+    moveFromLocalFile(tmpLocalFile, fsOutputFile);
+  }
+
+  /**
+   * Report a checksum error to the file system.
+   * @param f the file name containing the error
+   * @param in the stream open on the file
+   * @param inPos the position of the beginning of the bad data in the file
+   * @param sums the stream open on the checksum file
+   * @param sumsPos the position of the beginning of the bad data in the checksum file
+   */
+  public abstract void reportChecksumFailure(Path f, FSDataInputStream in,
+                                             long inPos, FSDataInputStream sums, long sumsPos);
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Tue Feb 27 15:45:46 2007
@@ -18,196 +18,19 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
-import java.util.Arrays;
-import java.util.zip.*;
-
-import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.StringUtils;
 
 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
  * and buffers input through a {@link BufferedInputStream}. */
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable {
-  private static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.fs.DataInputStream");
-
-  private static final byte[] VERSION = FSDataOutputStream.CHECKSUM_VERSION;
-  private static final int HEADER_LENGTH = 8;
-  
-  private int bytesPerSum = 1;
-  
-  /** Verify that data matches checksums. */
-  private class Checker extends FilterInputStream
-      implements Seekable, PositionedReadable {
-    private FileSystem fs;
-    private Path file;
-    private FSDataInputStream sums;
-    private Checksum sum = new CRC32();
-    private int inSum;
-    private FSInputStream sumsIn;
-
-    public Checker(FileSystem fs, Path file, Configuration conf)
-      throws IOException {
-      super(fs.openRaw(file));
-      
-      this.fs = fs;
-      this.file = file;
-      Path sumFile = FileSystem.getChecksumFile(file);
-      try {
-        sumsIn = fs.openRaw(sumFile);
-        this.sums = new FSDataInputStream(sumsIn, conf);
-        byte[] version = new byte[VERSION.length];
-        sums.readFully(version);
-        if (!Arrays.equals(version, VERSION))
-          throw new IOException("Not a checksum file: "+sumFile);
-        bytesPerSum = sums.readInt();
-      } catch (FileNotFoundException e) {         // quietly ignore
-        stopSumming();
-      } catch (IOException e) {                   // loudly ignore
-        LOG.warn("Problem opening checksum file: "+ file + 
-                 ".  Ignoring exception: " + 
-                 StringUtils.stringifyException(e));
-        stopSumming();
-      }
-    }
-
-    public void seek(long desired) throws IOException {
-      ((Seekable)in).seek(desired);
-      if (sums != null) {
-        if (desired % bytesPerSum != 0)
-          throw new IOException("Seek to non-checksummed position.");
-        try {
-          sums.seek(HEADER_LENGTH + 4*(desired/bytesPerSum));
-        } catch (IOException e) {
-          LOG.warn("Problem seeking checksum file: "+e+". Ignoring.");
-          stopSumming();
-        }
-        sum.reset();
-        inSum = 0;
-      }
-    }
-    
-    public int read(byte b[], int off, int len) throws IOException {
-      int read;
-      boolean retry;
-      int retriesLeft = 3;
-      long oldPos = getPos();
-      do {
-        retriesLeft--;
-        retry = false;
-
-        read = in.read(b, off, len);
-        
-        if (sums != null) {
-          long oldSumsPos = sums.getPos();
-          try {
-            int summed = 0;
-            while (summed < read) {
-              int goal = bytesPerSum - inSum;
-              int inBuf = read - summed;
-              int toSum = inBuf <= goal ? inBuf : goal;
-          
-              try {
-                sum.update(b, off+summed, toSum);
-              } catch (ArrayIndexOutOfBoundsException e) {
-                throw new RuntimeException("Summer buffer overflow b.len=" + 
-                                           b.length + ", off=" + off + 
-                                           ", summed=" + summed + ", read=" + 
-                                           read + ", bytesPerSum=" + bytesPerSum +
-                                           ", inSum=" + inSum, e);
-              }
-              summed += toSum;
-          
-              inSum += toSum;
-              if (inSum == bytesPerSum) {
-                verifySum(read-(summed-bytesPerSum));
-              }
-            }
-          } catch (ChecksumException ce) {
-            LOG.info("Found checksum error: " + StringUtils.stringifyException(ce));
-            if (retriesLeft == 0) {
-              throw ce;
-            }
-            sums.seek(oldSumsPos);
-            if (!((FSInputStream)in).seekToNewSource(oldPos) ||
-                !((FSInputStream)sumsIn).seekToNewSource(oldSumsPos)) {
-              // Neither the data stream nor the checksum stream are being read from
-              // different sources, meaning we'll still get a checksum error if we 
-              // try to do the read again.  We throw an exception instead.
-              throw ce;
-            } else {
-              // Since at least one of the sources is different, the read might succeed,
-              // so we'll retry.
-              retry = true;
-            }
-          }
-        }
-      } while (retry);
-      return read;
-    }
-
-    private void verifySum(int delta) throws IOException {
-      int crc;
-      try {
-        crc = sums.readInt();
-      } catch (IOException e) {
-        LOG.warn("Problem reading checksum file: "+e+". Ignoring.");
-        stopSumming();
-        return;
-      }
-      int sumValue = (int)sum.getValue();
-      sum.reset();
-      inSum = 0;
-      if (crc != sumValue) {
-        long pos = getPos() - delta;
-        fs.reportChecksumFailure(file, (FSInputStream)in,
-                                 pos, sumsIn, pos/bytesPerSum) ;
-        throw new ChecksumException("Checksum error: "+file+" at "+pos);
-      }
-    }
-
-    public long getPos() throws IOException {
-      return ((FSInputStream)in).getPos();
-    }
-
-    public int read(long position, byte[] buffer, int offset, int length)
-    throws IOException {
-      return ((FSInputStream)in).read(position, buffer, offset, length);
-    }
-    
-    public void readFully(long position, byte[] buffer, int offset, int length)
-    throws IOException {
-      ((FSInputStream)in).readFully(position, buffer, offset, length);
-    }
-    
-    public void readFully(long position, byte[] buffer)
-    throws IOException {
-      ((FSInputStream)in).readFully(position, buffer);
-    }
-
-    public void close() throws IOException {
-      super.close();
-      stopSumming();
-    }
-
-    private void stopSumming() {
-      if (sums != null) {
-        try {
-          sums.close();
-        } catch (IOException f) {}
-        sums = null;
-        bytesPerSum = 1;
-      }
-    }
-  }
 
   /** Cache the file position.  This improves performance significantly.*/
   private static class PositionCache extends FilterInputStream {
     long position;
 
-    public PositionCache(InputStream in) throws IOException {
+    public PositionCache(FSInputStream in) throws IOException {
       super(in);
     }
 
@@ -221,7 +44,7 @@
     }
 
     public void seek(long desired) throws IOException {
-      ((Seekable)in).seek(desired);               // seek underlying stream
+      ((FSInputStream)in).seek(desired);          // seek underlying stream
       position = desired;                         // update position
     }
       
@@ -231,18 +54,17 @@
     
     public int read(long position, byte[] buffer, int offset, int length)
     throws IOException {
-      return ((PositionedReadable)in).read(position, buffer, offset, length);
+      return ((FSInputStream)in).read(position, buffer, offset, length);
     }
     
     public void readFully(long position, byte[] buffer, int offset, int length)
     throws IOException {
-      ((PositionedReadable)in).readFully(position, buffer, offset, length);
+      ((FSInputStream)in).readFully(position, buffer, offset, length);
     }
-    
   }
 
   /** Buffer input.  This improves performance significantly.*/
-  private class Buffer extends BufferedInputStream {
+  private static class Buffer extends BufferedInputStream {
     public Buffer(PositionCache in, int bufferSize)
       throws IOException {
       super(in, bufferSize);
@@ -257,18 +79,8 @@
       } else {
         this.count = 0;                           // invalidate buffer
         this.pos = 0;
-
-        long delta = desired % bytesPerSum;
-        
-        // seek to last checksummed point, if any
-        ((PositionCache)in).seek(desired - delta);
-
-        // scan to desired position
-        for (int i = 0; i < delta; i++) {
-          read();
-        }
+        ((PositionCache)in).seek(desired);
       }
-
     }
       
     public long getPos() throws IOException {     // adjust for buffer
@@ -291,40 +103,18 @@
     throws IOException {
       ((PositionCache)in).readFully(position, buffer, offset, length);
     }
-}
-  
-  
-  public FSDataInputStream(FileSystem fs, Path file, int bufferSize, Configuration conf)
-      throws IOException {
-    super(null);
-    Checker chkr = new Checker(fs, file, conf);  // sets bytesPerSum
-    if (bufferSize % bytesPerSum != 0) {
-      throw new IOException("Buffer size must be multiple of " + bytesPerSum);
-    }
-    this.in = new Buffer(new PositionCache(chkr), bufferSize);
   }
+
+  protected FSInputStream inStream;
   
-  
-  public FSDataInputStream(FileSystem fs, Path file, Configuration conf)
-    throws IOException {
-    super(null);
-    int bufferSize = conf.getInt("io.file.buffer.size", 4096);
-    Checker chkr = new Checker(fs, file, conf);
-    if (bufferSize % bytesPerSum != 0) {
-      throw new IOException("Buffer size must be multiple of " + bytesPerSum);
-    }
-    this.in = new Buffer(new PositionCache(chkr), bufferSize);
-  }
-    
-  /** Construct without checksums. */
   public FSDataInputStream(FSInputStream in, Configuration conf) throws IOException {
     this(in, conf.getInt("io.file.buffer.size", 4096));
   }
-  /** Construct without checksums. */
+  
   public FSDataInputStream(FSInputStream in, int bufferSize)
     throws IOException {
-    super(null);
-    this.in = new Buffer(new PositionCache(in), bufferSize);
+    super( new Buffer(new PositionCache(in), bufferSize) );
+    this.inStream = in;
   }
   
   public synchronized void seek(long desired) throws IOException {
@@ -334,7 +124,7 @@
   public long getPos() throws IOException {
     return ((Buffer)in).getPos();
   }
-
+  
   public int read(long position, byte[] buffer, int offset, int length)
   throws IOException {
     return ((Buffer)in).read(position, buffer, offset, length);
@@ -348,5 +138,9 @@
   public void readFully(long position, byte[] buffer)
   throws IOException {
     ((Buffer)in).readFully(position, buffer, 0, buffer.length);
+  }
+  
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return inStream.seekToNewSource(targetPos); 
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Tue Feb 27 15:45:46 2007
@@ -18,89 +18,13 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
-import java.util.zip.Checksum;
-import java.util.zip.CRC32;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
 
-/** Utility that wraps a {@link FSOutputStream} in a {@link DataOutputStream},
+/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream},
  * buffers output through a {@link BufferedOutputStream} and creates a checksum
  * file. */
 public class FSDataOutputStream extends DataOutputStream {
-  public static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
-  
-  /** Store checksums for data. */
-  private static class Summer extends FilterOutputStream {
-
-    private FSDataOutputStream sums;
-    private Checksum sum = new CRC32();
-    private int inSum;
-    private int bytesPerSum;
-
-    public Summer(FileSystem fs, 
-                  Path file, 
-                  boolean overwrite, 
-                  short replication,
-                  long blockSize,
-                  Configuration conf)
-      throws IOException {
-      this(fs, file, overwrite, replication, blockSize, conf, null);
-    }
-
-    public Summer(FileSystem fs, 
-                  Path file, 
-                  boolean overwrite, 
-                  short replication,
-                  long blockSize,
-                  Configuration conf,
-                  Progressable progress)
-      throws IOException {
-      super(fs.createRaw(file, overwrite, replication, blockSize, progress));
-      this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
-      this.sums = new FSDataOutputStream(
-            fs.createRaw(FileSystem.getChecksumFile(file), true, 
-                         replication, blockSize), 
-            conf);
-      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
-      sums.writeInt(this.bytesPerSum);
-    }
-    
-    public void write(byte b[], int off, int len) throws IOException {
-      int summed = 0;
-      while (summed < len) {
-
-        int goal = this.bytesPerSum - inSum;
-        int inBuf = len - summed;
-        int toSum = inBuf <= goal ? inBuf : goal;
-
-        sum.update(b, off+summed, toSum);
-        summed += toSum;
-
-        inSum += toSum;
-        if (inSum == this.bytesPerSum) {
-          writeSum();
-        }
-      }
-
-      out.write(b, off, len);
-    }
-
-    private void writeSum() throws IOException {
-      if (inSum != 0) {
-        sums.writeInt((int)sum.getValue());
-        sum.reset();
-        inSum = 0;
-      }
-    }
-
-    public void close() throws IOException {
-      writeSum();
-      sums.close();
-      super.close();
-    }
-
-  }
-
   private static class PositionCache extends FilterOutputStream {
     long position;
 
@@ -122,7 +46,7 @@
   }
 
   private static class Buffer extends BufferedOutputStream {
-    public Buffer(OutputStream out, int bufferSize) throws IOException {
+    public Buffer(PositionCache out, int bufferSize) throws IOException {
       super(out, bufferSize);
     }
 
@@ -138,50 +62,19 @@
         buf[count++] = (byte)b;
       }
     }
-
   }
 
-  public FSDataOutputStream(FileSystem fs, Path file,
-                            boolean overwrite, Configuration conf,
-                            int bufferSize, short replication, long blockSize )
+  public FSDataOutputStream(OutputStream out, int bufferSize)
   throws IOException {
-    super(new Buffer(
-            new PositionCache(
-                new Summer(fs, file, overwrite, replication, blockSize, conf)), 
-            bufferSize));
-  }
-
-  public FSDataOutputStream(FileSystem fs, Path file,
-                            boolean overwrite, Configuration conf,
-                            int bufferSize, short replication, long blockSize,
-                            Progressable progress)
-  throws IOException {
-    super(new Buffer(
-            new PositionCache(
-                new Summer(fs, file, overwrite, replication, blockSize, conf, progress)), 
-            bufferSize));
+    super(new Buffer(new PositionCache(out), bufferSize));
   }
   
-  /** Construct without checksums. */
-  private FSDataOutputStream(FSOutputStream out, Configuration conf) throws IOException {
+  public FSDataOutputStream(OutputStream out, Configuration conf)
+  throws IOException {
     this(out, conf.getInt("io.file.buffer.size", 4096));
   }
 
-  /** Construct without checksums. */
-  private FSDataOutputStream(FSOutputStream out, int bufferSize)
-    throws IOException {
-    super(new Buffer(new PositionCache(out), bufferSize));
-  }
-
   public long getPos() throws IOException {
     return ((Buffer)out).getPos();
   }
-
-  public static long getChecksumLength(long size, int bytesPerSum) {
-    //the checksum length is equal to size passed divided by bytesPerSum +
-    //bytes written in the beginning of the checksum file.  
-    return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 + 
-            CHECKSUM_VERSION.length;  
-  }
-  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSOutputStream.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSOutputStream.java Tue Feb 27 15:45:46 2007
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.*;
-
-/****************************************************************
- * FSOutputStream is an OutputStream that can track its position.
- *
- * @author Mike Cafarella
- *****************************************************************/
-public abstract class FSOutputStream extends OutputStream {
-    /**
-     * Return the current offset from the start of the file
-     */
-    public abstract long getPos() throws IOException;
-}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Feb 27 15:45:46 2007
@@ -48,7 +48,7 @@
  * @author Mike Cafarella
  *****************************************************************/
 public abstract class FileSystem extends Configured {
-    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DistributedFileSystem");
+    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FileSystem");
 
     // cache indexed by URI scheme and authority
     private static final Map<String,Map<String,FileSystem>> CACHE
@@ -194,25 +194,7 @@
 
       return new Path(scheme+":"+"//"+authority + pathUri.getPath());
     }
-
-    /** Return the name of the checksum file associated with a file.*/
-    public static Path getChecksumFile(Path file) {
-      return new Path(file.getParent(), "."+file.getName()+".crc");
-    }
-
-    /** Return the length of the checksum file given the size of the 
-     * actual file.
-     **/
-    public static long getChecksumFileLength(long fileSize, int bytesPerSum) {
-      return FSDataOutputStream.getChecksumLength(fileSize, bytesPerSum);
-    }
-    
-    /** Return true iff file is a checksum file name.*/
-    public static boolean isChecksumFile(Path file) {
-      String name = file.getName();
-      return name.startsWith(".") && name.endsWith(".crc");
-    }
-
+    
     ///////////////////////////////////////////////////////////////
     // FileSystem
     ///////////////////////////////////////////////////////////////
@@ -252,25 +234,18 @@
      * @param f the file name to open
      * @param bufferSize the size of the buffer to be used.
      */
-    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-      return new FSDataInputStream(this, f, bufferSize, getConf());
-    }
+    public abstract FSDataInputStream open(Path f, int bufferSize)
+    throws IOException;
     
     /**
      * Opens an FSDataInputStream at the indicated Path.
      * @param f the file to open
      */
     public FSDataInputStream open(Path f) throws IOException {
-      return new FSDataInputStream(this, f, getConf());
+      return open(f, getConf().getInt("io.file.buffer.size", 4096));
     }
 
     /**
-     * Opens an InputStream for the indicated Path, whether local
-     * or via DFS.
-     */
-    public abstract FSInputStream openRaw(Path f) throws IOException;
-
-    /**
      * Opens an FSDataOutputStream at the indicated Path.
      * Files are overwritten by default.
      */
@@ -368,8 +343,7 @@
                                       short replication,
                                       long blockSize
                                     ) throws IOException {
-      return new FSDataOutputStream(this, f, overwrite, getConf(), 
-                                    bufferSize, replication, blockSize );
+      return create(f, overwrite, bufferSize, replication, blockSize, null);
     }
 
     /**
@@ -381,72 +355,25 @@
      * @param bufferSize the size of the buffer to be used.
      * @param replication required block replication for the file. 
      */
-    public FSDataOutputStream create( Path f, 
-                                      boolean overwrite,
-                                      int bufferSize,
-                                      short replication,
-                                      long blockSize,
-                                      Progressable progress
-                                    ) throws IOException {
-      return new FSDataOutputStream(this, f, overwrite, getConf(), 
-                                    bufferSize, replication, blockSize, progress );
-    }
+    public abstract FSDataOutputStream create( Path f, 
+                                               boolean overwrite,
+                                               int bufferSize,
+                                               short replication,
+                                               long blockSize,
+                                               Progressable progress
+                                             ) throws IOException;
 
-    /** Opens an OutputStream at the indicated Path.
-     * @param f the file name to open
-     * @param overwrite if a file with this name already exists, then if true,
-     *   the file will be overwritten, and if false an error will be thrown.
-     * @param replication required block replication for the file. 
-     */
-    public abstract FSOutputStream createRaw(Path f, boolean overwrite, 
-                                             short replication,
-                                             long blockSize)
-      throws IOException;
-
-    /** Opens an OutputStream at the indicated Path with write-progress
-     * reporting.
-     * @param f the file name to open
-     * @param overwrite if a file with this name already exists, then if true,
-     *   the file will be overwritten, and if false an error will be thrown.
-     * @param replication required block replication for the file. 
-     */
-    public abstract FSOutputStream createRaw(Path f, boolean overwrite, 
-                                             short replication,
-                                             long blockSize, Progressable progress)
-      throws IOException;
-    
     /**
      * Creates the given Path as a brand-new zero-length file.  If
      * create fails, or if it already existed, return false.
      */
     public boolean createNewFile(Path f) throws IOException {
-        if (exists(f)) {
-            return false;
-        } else {
-          create(f,false,getConf().getInt("io.file.buffer.size", 4096)).close();
-          return true;
-        }
-    }
-
-    /**
-     * Set replication for an existing file.
-     * 
-     * @param src file name
-     * @param replication new replication
-     * @throws IOException
-     * @return true if successful;
-     *         false if file does not exist or is a directory
-     */
-    public boolean setReplication(Path src, short replication) throws IOException {
-      boolean value = setReplicationRaw(src, replication);
-      if( ! value )
+      if (exists(f)) {
         return false;
-
-      Path checkFile = getChecksumFile(src);
-      if (exists(checkFile))
-        setReplicationRaw(checkFile, replication);
-
-      return true;
+      } else {
+        create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
+        return true;
+      }
     }
 
     /**
@@ -467,57 +394,20 @@
      * @return true if successful;
      *         false if file does not exist or is a directory
      */
-    public abstract boolean setReplicationRaw(Path src, short replication) throws IOException;
+    public abstract boolean setReplication(Path src, short replication) throws IOException;
 
     /**
      * Renames Path src to Path dst.  Can take place on local fs
      * or remote DFS.
      */
-    public boolean rename(Path src, Path dst) throws IOException {
-      if (isDirectory(src)) {
-        return renameRaw(src, dst);
-      } else {
-
-        boolean value = renameRaw(src, dst);
-        if (!value)
-          return false;
-
-        Path checkFile = getChecksumFile(src);
-        if (exists(checkFile)) { //try to rename checksum
-          if(isDirectory(dst)) {
-            renameRaw(checkFile, dst);
-          } else {
-            renameRaw(checkFile, getChecksumFile(dst)); 
-          }
-        }
-
-        return value;
-      }
-      
-    }
-
-    /**
-     * Renames Path src to Path dst.  Can take place on local fs
-     * or remote DFS.
-     */
-    public abstract boolean renameRaw(Path src, Path dst) throws IOException;
-
-    /** Delete a file. */
-    public boolean delete(Path f) throws IOException {
-      if (isDirectory(f)) {
-        return deleteRaw(f);
-      } else {
-        deleteRaw(getChecksumFile(f));            // try to delete checksum
-        return deleteRaw(f);
-      }
-    }
-
-    /**
-     * Deletes Path
+    public abstract boolean rename(Path src, Path dst) throws IOException;
+    
+    /** Delete a file */
+    public abstract boolean delete(Path f) throws IOException;
+    
+    /** Check if exists.
+     * @param f source file
      */
-    public abstract boolean deleteRaw(Path f) throws IOException;
-
-    /** Check if exists. */
     public abstract boolean exists(Path f) throws IOException;
 
     /** True iff the named path is a directory. */
@@ -525,11 +415,11 @@
 
     /** True iff the named path is a regular file. */
     public boolean isFile(Path f) throws IOException {
-        if (exists(f) && ! isDirectory(f)) {
-            return true;
-        } else {
-            return false;
-        }
+      if (exists(f) && ! isDirectory(f)) {
+        return true;
+      } else {
+        return false;
+      }
     }
     
     /** The number of bytes in a file. */
@@ -540,38 +430,43 @@
      * If <i>f</i> is a directory, return the size of the directory tree
      */
     public long getContentLength(Path f) throws IOException {
-        if (!isDirectory(f)) {
-            // f is a file
-            return getLength(f);
-        }
-            
-        // f is a diretory
-        Path[] contents = listPathsRaw(f);
-        long size = 0;
-        for(int i=0; i<contents.length; i++) {
-            size += getContentLength(contents[i]);
-        }
-        return size;
+      if (!isDirectory(f)) {
+        // f is a file
+        return getLength(f);
+      }
+      
+      // f is a diretory
+      Path[] contents = listPaths(f);
+      long size = 0;
+      for(int i=0; i<contents.length; i++) {
+        size += getContentLength(contents[i]);
+      }
+      return size;
     }
 
     final private static PathFilter DEFAULT_FILTER = new PathFilter() {
       public boolean accept(Path file) {
-        return !isChecksumFile(file);
+        return true;
       }     
     };
-  
-    /** List files in a directory. */
-    public Path[] listPaths(Path f) throws IOException {
-      return listPaths(f, DEFAULT_FILTER);
-    }
     
     /** List files in a directory. */
-    public abstract Path[] listPathsRaw(Path f) throws IOException;
+    public abstract Path[] listPaths(Path f) throws IOException;
+    
+    /** 
+     * Filter files in the given pathes using the default checksum filter. 
+     * @param files a list of paths
+     * @return a list of files under the source paths
+     * @exception IOException
+     */
+    public Path[] listPaths(Path[] files ) throws IOException {
+      return listPaths(files, DEFAULT_FILTER);
+    }
 
-    /** Filter raw files in a directory. */
+    /** Filter files in a directory. */
     private void listPaths(ArrayList<Path> results, Path f, PathFilter filter)
       throws IOException {
-      Path listing[] = listPathsRaw(f);
+      Path listing[] = listPaths(f);
       if (listing != null) {
         for (int i = 0; i < listing.length; i++) {
           if (filter.accept(listing[i])) {
@@ -581,25 +476,15 @@
       }      
     }
     
-    /** Filter raw files in a directory. */
+    /** Filter files in a directory. */
     public Path[] listPaths(Path f, PathFilter filter) throws IOException {
-        ArrayList<Path> results = new ArrayList<Path>();
-        listPaths(results, f, filter);
-        return (Path[]) results.toArray(new Path[results.size()]);
-    }
-
-    /** 
-     * Filter raw files in a list directories using the default checksum filter. 
-     * @param files a list of paths
-     * @return a list of files under the source paths
-     * @exception IOException
-     */
-    public Path[] listPaths(Path[] files ) throws IOException {
-      return listPaths( files, DEFAULT_FILTER );
+      ArrayList<Path> results = new ArrayList<Path>();
+      listPaths(results, f, filter);
+      return (Path[]) results.toArray(new Path[results.size()]);
     }
     
     /** 
-     * Filter raw files in a list directories using user-supplied path filter. 
+     * Filter files in a list directories using user-supplied path filter. 
      * @param files a list of paths
      * @return a list of files under the source paths
      * @exception IOException
@@ -713,7 +598,7 @@
       private boolean hasPattern = false;
       
       /** Default pattern character: Escape any special meaning. */
-      private static final char  PAT_ESCAPE =  '\\';
+      private static final char  PAT_ESCAPE = '\\';
       /** Default pattern character: Any single character. */
       private static final char  PAT_ANY = '.';
       /** Default pattern character: Character set close. */
@@ -740,60 +625,58 @@
         // Validate the pattern
         len = filePattern.length();
         if (len == 0)
-            return;
+          return;
 
-        setOpen =  0;
+        setOpen = 0;
         setRange = false;
-
-        for (int i = 0;  i < len;  i++)
-        {
-            char  pCh;
-
-            // Examine a single pattern character
-            pCh = filePattern.charAt(i);            
-            if( pCh == PAT_ESCAPE ) {
-              fileRegex.append( pCh );
-              i++;
-              if (i >= len)
-                  error( "An escaped character does not present",
-                      filePattern, i);
-              pCh = filePattern.charAt(i);
-            } else if( pCh == '.' ) {
-              fileRegex.append( PAT_ESCAPE );
-            } else if( pCh == '*' ) {
-                fileRegex.append( PAT_ANY );
-                hasPattern = true;
-            } else if( pCh == '?' ) {
-                pCh = PAT_ANY ;
-                hasPattern = true;
-            } else if( pCh == '[' && setOpen == 0 ) {
-                setOpen++;
-                hasPattern = true;
-            } else if( pCh == '^' && setOpen > 0) {
-            } else if (pCh == '-'  &&  setOpen > 0) {
-                // Character set range
-                setRange = true;
-            } else if (pCh == PAT_SET_CLOSE  &&  setRange) {
-                // Incomplete character set range
-                error("Incomplete character set range", filePattern, i);
-            } else if (pCh == PAT_SET_CLOSE  &&  setOpen > 0) {
-                // End of a character set
-                if (setOpen < 2)
-                    error("Unexpected end of set", filePattern, i);
-                setOpen = 0;
-            } else if (setOpen > 0) {
-                // Normal character, or the end of a character set range
-                setOpen++;
-                setRange = false;
-            }
-            fileRegex.append( pCh );
+        
+        for (int i = 0; i < len; i++) {
+          char pCh;
+          
+          // Examine a single pattern character
+          pCh = filePattern.charAt(i);
+          if (pCh == PAT_ESCAPE) {
+            fileRegex.append(pCh);
+            i++;
+            if (i >= len)
+              error("An escaped character does not present", filePattern, i);
+            pCh = filePattern.charAt(i);
+          } else if (pCh == '.') {
+            fileRegex.append(PAT_ESCAPE);
+          } else if (pCh == '*') {
+            fileRegex.append(PAT_ANY);
+            hasPattern = true;
+          } else if (pCh == '?') {
+            pCh = PAT_ANY;
+            hasPattern = true;
+          } else if (pCh == '[' && setOpen == 0) {
+            setOpen++;
+            hasPattern = true;
+          } else if (pCh == '^' && setOpen > 0) {
+          } else if (pCh == '-' && setOpen > 0) {
+            // Character set range
+            setRange = true;
+          } else if (pCh == PAT_SET_CLOSE && setRange) {
+            // Incomplete character set range
+            error("Incomplete character set range", filePattern, i);
+          } else if (pCh == PAT_SET_CLOSE && setOpen > 0) {
+            // End of a character set
+            if (setOpen < 2)
+              error("Unexpected end of set", filePattern, i);
+            setOpen = 0;
+          } else if (setOpen > 0) {
+            // Normal character, or the end of a character set range
+            setOpen++;
+            setRange = false;
+          }
+          fileRegex.append(pCh);
         }
-
+        
         // Check for a well-formed pattern
-        if (setOpen > 0  ||  setRange)
-        {
-            // Incomplete character set or character range
-            error("Expecting set closure character or end of range", filePattern, len);
+        if (setOpen > 0 || setRange) {
+          // Incomplete character set or character range
+          error("Expecting set closure character or end of range", filePattern,
+              len);
         }
         regex = Pattern.compile(fileRegex.toString());
       }
@@ -808,13 +691,14 @@
       
       private void error(String s, String pattern, int pos) throws IOException {
         throw new IOException("Illegal file pattern: "
-                                 +s+" for glob "+pattern + " at " + pos);
+                                 +s+ " for glob "+ pattern + " at " + pos);
       }
     }
     
     /**
-     * Set the current working directory for the given file system.
-     * All relative paths will be resolved relative to it.
+     * Set the current working directory for the given file system. All relative
+     * paths will be resolved relative to it.
+     * 
      * @param new_dir
      */
     public abstract void setWorkingDirectory(Path new_dir);
@@ -852,30 +736,52 @@
      * The src file is on the local disk.  Add it to FS at
      * the given dst name and the source is kept intact afterwards
      */
-    public abstract void copyFromLocalFile(Path src, Path dst) throws IOException;
+    public void copyFromLocalFile(Path src, Path dst)
+    throws IOException {
+      copyFromLocalFile(false, src, dst);
+    }
 
     /**
      * The src file is on the local disk.  Add it to FS at
      * the given dst name, removing the source afterwards.
      */
-    public abstract void moveFromLocalFile(Path src, Path dst) throws IOException;
+    public void moveFromLocalFile(Path src, Path dst)
+    throws IOException {
+      copyFromLocalFile(true, src, dst);
+    }
 
     /**
+     * The src file is on the local disk.  Add it to FS at
+     * the given dst name.
+     * delSrc indicates if the source should be removed
+     */
+    public abstract void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException;
+    
+    /**
      * The src file is under FS, and the dst is on the local disk.
      * Copy it from FS control to the local dst name.
-     * If src and dst are directories, copy crc files as well.
      */
     public void copyToLocalFile(Path src, Path dst) throws IOException {
-      copyToLocalFile(src, dst, true);
+      copyToLocalFile(false, src, dst);
     }
     
     /**
      * The src file is under FS, and the dst is on the local disk.
      * Copy it from FS control to the local dst name.
-     * If src and dst are directories, the copyCrc parameter
-     * determines whether to copy CRC files.
+     * Remove the source afterwards
      */
-    public abstract void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException;
+    public void moveToLocalFile(Path src, Path dst) throws IOException {
+      copyToLocalFile(true, src, dst);
+    }
+
+    /**
+     * The src file is under FS, and the dst is on the local disk.
+     * Copy it from FS control to the local dst name.
+     * delSrc indicates if the src will be removed or not.
+     */   
+    public abstract void copyToLocalFile(boolean delSrc, Path src, Path dst)
+    throws IOException;
 
     /**
      * Returns a local File that the user can write output to.  The caller
@@ -907,20 +813,18 @@
       }
     }
 
-    /**
-     * Report a checksum error to the file system.
-     * @param f the file name containing the error
-     * @param in the stream open on the file
-     * @param inPos the position of the beginning of the bad data in the file
-     * @param sums the stream open on the checksum file
-     * @param sumsPos the position of the beginning of the bad data in the checksum file
-     */
-    public abstract void reportChecksumFailure(Path f, 
-                                               FSInputStream in, long inPos, 
-                                               FSInputStream sums, long sumsPos);
+    /** Return the total size of all files in the filesystem.*/
+    public long getUsed() throws IOException{
+      long used = 0;
+      Path[] files = listPaths(new Path("/"));
+      for(Path file:files){
+        used += getContentLength(file);
+      }
+      return used;
+    }
 
     /**
-     * Get the size for a particular file.
+     * Get the block size for a particular file.
      * @param f the filename
      * @return the number of bytes in a block
      */
@@ -928,7 +832,10 @@
     
     /** Return the number of bytes that large input files should be optimally
      * be split into to minimize i/o time. */
-    public abstract long getDefaultBlockSize();
+    public long getDefaultBlockSize() {
+      // default to 32MB: large enough to minimize the impact of seeks
+      return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
+    }
     
     /**
      * Get the default replication.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=512499&r1=512498&r2=512499
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Feb 27 15:45:46 2007
@@ -67,55 +67,36 @@
                              FileSystem dstFS, Path dst, 
                              boolean deleteSource,
                              Configuration conf ) throws IOException {
-    return copy(srcFS, src, dstFS, dst, deleteSource, true, conf);
-  
-  }
-
-  /** Copy files between FileSystems. */
-  public static boolean copy(FileSystem srcFS, Path src, 
-                             FileSystem dstFS, Path dst, 
-                             boolean deleteSource,
-                             boolean copyCrc,
-                             Configuration conf ) throws IOException {
-    dst = checkDest(src.getName(), dstFS, dst);
+      dst = checkDest(src.getName(), dstFS, dst);
 
-    if (srcFS.isDirectory(src)) {
-      if (!dstFS.mkdirs(dst)) {
-        return false;
-      }
-      Path contents[] = srcFS.listPaths(src);
-      for (int i = 0; i < contents.length; i++) {
-        copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()),
-             deleteSource, copyCrc, conf);
-      }
-    } else if (srcFS.isFile(src)) {
-      InputStream in = srcFS.open(src);
-      try {
-        OutputStream out = (copyCrc) ?
-          dstFS.create(dst) :
-          dstFS.createRaw(dst, true, dstFS.getDefaultReplication(),
-            dstFS.getDefaultBlockSize());
-        copyContent(in, out, conf);
-      } finally {
-        in.close();
-      }
-      // if crc copying is disabled, remove the existing crc file if any
-      if (!copyCrc) {
-        Path crcFile = dstFS.getChecksumFile(dst);
-        if (dstFS.exists(crcFile)) {
-          dstFS.deleteRaw(crcFile);
+      if (srcFS.isDirectory(src)) {
+        if (!dstFS.mkdirs(dst)) {
+          return false;
         }
+        Path contents[] = srcFS.listPaths(src);
+        for (int i = 0; i < contents.length; i++) {
+          copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()),
+               deleteSource, conf);
+        }
+      } else if (srcFS.isFile(src)) {
+        InputStream in = srcFS.open(src);
+        try {
+          OutputStream out = dstFS.create(dst);
+          copyContent(in, out, conf);
+        } finally {
+          in.close();
+        }
+      } else {
+        throw new IOException(src.toString() + ": No such file or directory");
+      }
+      if (deleteSource) {
+        return srcFS.delete(src);
+      } else {
+        return true;
       }
-    } else {
-      throw new IOException(src.toString() + ": No such file or directory");
-    }
-    if (deleteSource) {
-      return srcFS.delete(src);
-    } else {
-      return true;
-    }
-  }
   
+  }
+
   /** Copy all files in a directory to one output file (merge). */
   public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                              FileSystem dstFS, Path dstFile, 



Mime
View raw message