hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r556743 [3/3] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/mapred/
Date Mon, 16 Jul 2007 21:35:03 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Mon Jul 16 14:34:59 2007
@@ -33,367 +33,233 @@
  * DistributedFileSystem.
  *
  *****************************************************************/
-public class DistributedFileSystem extends ChecksumFileSystem {
-  private static class RawDistributedFileSystem extends FileSystem {
-    private Path workingDir =
-      new Path("/user", System.getProperty("user.name")); 
-    private URI uri;
-    private FileSystem localFs;
+public class DistributedFileSystem extends FileSystem {
+  private Path workingDir =
+    new Path("/user", System.getProperty("user.name")); 
+  private URI uri;
+  private FileSystem localFs;
 
-    DFSClient dfs;
+  DFSClient dfs;
 
-    public RawDistributedFileSystem() {
-    }
+  public DistributedFileSystem() {
+  }
 
 
-    /** @deprecated */
-    public RawDistributedFileSystem(InetSocketAddress namenode,
-                                    Configuration conf) throws IOException {
-      initialize(URI.create("hdfs://"+
-                            namenode.getHostName()+":"+
-                            namenode.getPort()),
-                 conf);
-    }
+  /** @deprecated */
+  public DistributedFileSystem(InetSocketAddress namenode,
+    Configuration conf) throws IOException {
+    initialize(URI.create("hdfs://"+
+                          namenode.getHostName()+":"+
+                          namenode.getPort()),
+                          conf);
+  }
 
-    /** @deprecated */
-    public String getName() { return uri.getAuthority(); }
+  /** @deprecated */
+  public String getName() { return uri.getAuthority(); }
 
-    public URI getUri() { return uri; }
+  public URI getUri() { return uri; }
 
-    public void initialize(URI uri, Configuration conf) throws IOException {
-      setConf(conf);
-      String host = uri.getHost();
-      int port = uri.getPort();
-      this.dfs = new DFSClient(new InetSocketAddress(host, port), conf);
-      this.uri = URI.create("hdfs://"+host+":"+port);
-      this.localFs = getLocal(conf);
-    }
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    setConf(conf);
+    String host = uri.getHost();
+    int port = uri.getPort();
+    this.dfs = new DFSClient(new InetSocketAddress(host, port), conf);
+    this.uri = URI.create("hdfs://"+host+":"+port);
+    this.localFs = getLocal(conf);
+  }
 
-    public Path getWorkingDirectory() {
-      return workingDir;
-    }
-    
-    public long getDefaultBlockSize() {
-      return dfs.getDefaultBlockSize();
-    }
-    
-    public short getDefaultReplication() {
-      return dfs.getDefaultReplication();
-    }
-    
-    private Path makeAbsolute(Path f) {
-      if (f.isAbsolute()) {
-        return f;
-      } else {
-        return new Path(workingDir, f);
-      }
-    }
-    
-    public void setWorkingDirectory(Path dir) {
-      Path result = makeAbsolute(dir);
-      if (!FSNamesystem.isValidName(result.toString())) {
-        throw new IllegalArgumentException("Invalid DFS directory name " + 
-                                           result);
-      }
-      workingDir = makeAbsolute(dir);
-    }
-    
-    /**
-     * @deprecated use {@link #getPathName(Path)} instead.
-     */
-    private UTF8 getPath(Path file) {
-      return new UTF8(getPathName(file));
-    }
-
-    private String getPathName(Path file) {
-      checkPath(file);
-      String result = makeAbsolute(file).toUri().getPath();
-      if (!FSNamesystem.isValidName(result)) {
-        throw new IllegalArgumentException("Pathname " + result + " from " +
-                                           file +
-                                           " is not a valid DFS filename.");
-      }
-      return result;
-    }
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  public long getDefaultBlockSize() {
+    return dfs.getDefaultBlockSize();
+  }
+
+  public short getDefaultReplication() {
+    return dfs.getDefaultReplication();
+  }
 
-    public String[][] getFileCacheHints(Path f, long start, long len) throws IOException {
-      return dfs.getHints(getPathName(f), start, len);
+  private Path makeAbsolute(Path f) {
+    if (f.isAbsolute()) {
+      return f;
+    } else {
+      return new Path(workingDir, f);
     }
+  }
 
-    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-      return new DFSClient.DFSDataInputStream(dfs.open(getPath(f),bufferSize));
+  public void setWorkingDirectory(Path dir) {
+    Path result = makeAbsolute(dir);
+    if (!FSNamesystem.isValidName(result.toString())) {
+      throw new IllegalArgumentException("Invalid DFS directory name " + 
+                                         result);
     }
+    workingDir = makeAbsolute(dir);
+  }
 
-    public FSDataOutputStream create(Path f, boolean overwrite,
-                                     int bufferSize, short replication, long blockSize,
-                                     Progressable progress) throws IOException {
-      if (exists(f) && !overwrite) {
-        throw new IOException("File already exists:"+f);
-      }
-      Path parent = f.getParent();
-      if (parent != null && !mkdirs(parent)) {
-        throw new IOException("Mkdirs failed to create " + parent);
-      }
-      
-      return new FSDataOutputStream( dfs.create(
-          getPath(f), overwrite, replication, blockSize, progress, bufferSize));
-    }
-    
-    public boolean setReplication(Path src, 
-                                  short replication
-                                  ) throws IOException {
-      return dfs.setReplication(getPath(src), replication);
-    }
-    
-    /**
-     * Rename files/dirs
-     */
-    public boolean rename(Path src, Path dst) throws IOException {
-      return dfs.rename(getPath(src), getPath(dst));
-    }
-
-    /**
-     * Get rid of Path f, whether a true file or dir.
-     */
-    public boolean delete(Path f) throws IOException {
-      return dfs.delete(getPath(f));
-    }
-
-    public boolean exists(Path f) throws IOException {
-      return dfs.exists(getPath(f));
-    }
-
-    public long getContentLength(Path f) throws IOException {
-      if (f instanceof DfsPath) {
-        return ((DfsPath)f).getContentsLength();
-      }
+  /**
+   * @deprecated use {@link #getPathName(Path)} instead.
+   */
+  private UTF8 getPath(Path file) {
+    return new UTF8(getPathName(file));
+  }
 
-      DFSFileInfo info[] = dfs.listPaths(getPath(f));
-      return (info == null) ? 0 : info[0].getLen();
+  private String getPathName(Path file) {
+    checkPath(file);
+    String result = makeAbsolute(file).toUri().getPath();
+    if (!FSNamesystem.isValidName(result)) {
+      throw new IllegalArgumentException("Pathname " + result + " from " +
+                                         file+" is not a valid DFS filename.");
     }
+    return result;
+  }
 
-    public Path[] listPaths(Path f) throws IOException {
-      DFSFileInfo info[] = dfs.listPaths(getPath(f));
-      if (info == null) {
-        return new Path[0];
-      } else {
-        Path results[] = new DfsPath[info.length];
-        for (int i = 0; i < info.length; i++) {
-          results[i] = new DfsPath(info[i], this);
-        }
-        return results;
-      }
-    }
+  public String[][] getFileCacheHints(Path f, long start, long len) throws IOException {
+    return dfs.getHints(getPathName(f), start, len);
+  }
+
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return new DFSClient.DFSDataInputStream(dfs.open(getPath(f),bufferSize));
+  }
 
-    public boolean mkdirs(Path f) throws IOException {
-      return dfs.mkdirs(getPath(f));
+  public FSDataOutputStream create(Path f, boolean overwrite,
+    int bufferSize, short replication, long blockSize,
+    Progressable progress) throws IOException {
+    if (exists(f) && !overwrite) {
+      throw new IOException("File already exists:"+f);
+    }
+    Path parent = f.getParent();
+    if (parent != null && !mkdirs(parent)) {
+      throw new IOException("Mkdirs failed to create " + parent);
     }
 
-    /** @deprecated */ @Deprecated
-      public void lock(Path f, boolean shared) throws IOException {
-      dfs.lock(getPath(f), !shared);
-    }
-
-    /** @deprecated */ @Deprecated
-      public void release(Path f) throws IOException {
-      dfs.release(getPath(f));
-    }
-
-    @Override
-    public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
-      throws IOException {
-      FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
-    }
-
-    @Override
-    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
-      throws IOException {
-      FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
-    }
-
-    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-      throws IOException {
-      return tmpLocalFile;
-    }
-
-    /**
-     * Move completed local data to DFS destination
-     */
-    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-      throws IOException {
-      moveFromLocalFile(tmpLocalFile, fsOutputFile);
-    }
-
-    public void close() throws IOException {
-      super.close();
-      dfs.close();
-    }
-
-    public String toString() {
-      return "DFS[" + dfs + "]";
-    }
-
-    DFSClient getClient() {
-      return dfs;
-    }        
-    /** Return the total raw capacity of the filesystem, disregarding
-     * replication .*/
-    public long getRawCapacity() throws IOException{
-      return dfs.totalRawCapacity();
-    }
-
-    /** Return the total raw used space in the filesystem, disregarding
-     * replication .*/
-    public long getRawUsed() throws IOException{
-      return dfs.totalRawUsed();
-    }
-
-    /** Return statistics for each datanode. */
-    public DatanodeInfo[] getDataNodeStats() throws IOException {
-      return dfs.datanodeReport();
-    }
-    
-    /**
-     * Enter, leave or get safe mode.
-     *  
-     * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
-     */
-    public boolean setSafeMode(FSConstants.SafeModeAction action) 
-      throws IOException {
-      return dfs.setSafeMode(action);
-    }
-
-    /*
-     * Refreshes the list of hosts and excluded hosts from the configured 
-     * files.  
-     */
-    public void refreshNodes() throws IOException {
-      dfs.refreshNodes();
-    }
-
-    /**
-     * Finalize previously upgraded files system state.
-     * @throws IOException
-     */
-    public void finalizeUpgrade() throws IOException {
-      dfs.finalizeUpgrade();
-    }
-
-    public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
-                                                          ) throws IOException {
-      return dfs.distributedUpgradeProgress(action);
-    }
-
-    /*
-     * Requests the namenode to dump data strcutures into specified 
-     * file.
-     */
-    public void metaSave(String pathname) throws IOException {
-      dfs.metaSave(pathname);
-    }
-
-    /**
-     * We need to find the blocks that didn't match.  Likely only one 
-     * is corrupt but we will report both to the namenode.  In the future,
-     * we can consider figuring out exactly which block is corrupt.
-     */
-    public boolean reportChecksumFailure(Path f, 
-                                         FSDataInputStream in, long inPos, 
-                                         FSDataInputStream sums, long sumsPos) {
-      
-      LocatedBlock lblocks[] = new LocatedBlock[2];
-
-      try {
-        // Find block in data stream.
-        DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
-        Block dataBlock = dfsIn.getCurrentBlock();
-        if (dataBlock == null) {
-          throw new IOException("Error: Current block in data stream is null! ");
-        }
-        DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; 
-        lblocks[0] = new LocatedBlock(dataBlock, dataNode);
-        LOG.info("Found checksum error in data stream at block=" + dataBlock.getBlockName() + 
-                 " on datanode=" + dataNode[0].getName());
-
-        // Find block in checksum stream
-        DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
-        Block sumsBlock = dfsSums.getCurrentBlock();
-        if (sumsBlock == null) {
-          throw new IOException("Error: Current block in checksum stream is null! ");
-        }
-        DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; 
-        lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
-        LOG.info("Found checksum error in checksum stream at block=" + sumsBlock.getBlockName() + 
-                 " on datanode=" + sumsNode[0].getName());
-
-        // Ask client to delete blocks.
-        dfs.reportBadBlocks(lblocks);
-
-      } catch (IOException ie) {
-        LOG.info("Found corruption while reading "
-                 + f.toString() 
-                 + ".  Error repairing corrupt blocks.  Bad blocks remain. " 
-                 + StringUtils.stringifyException(ie));
-      }
+    return new FSDataOutputStream( dfs.create(getPath(f), overwrite, 
+                                              replication, blockSize, 
+                                              progress, bufferSize) );
+  }
 
-      return true;
+  public boolean setReplication(Path src, 
+                                short replication
+                               ) throws IOException {
+    return dfs.setReplication(getPath(src), replication);
+  }
+
+  /**
+   * Rename files/dirs
+   */
+  public boolean rename(Path src, Path dst) throws IOException {
+    return dfs.rename(getPath(src), getPath(dst));
+  }
+
+  /**
+   * Get rid of Path f, whether a true file or dir.
+   */
+  public boolean delete(Path f) throws IOException {
+    return dfs.delete(getPath(f));
+  }
+
+  public boolean exists(Path f) throws IOException {
+    return dfs.exists(getPath(f));
+  }
+
+  public long getContentLength(Path f) throws IOException {
+    if (f instanceof DfsPath) {
+      return ((DfsPath)f).getContentsLength();
     }
 
-    /**
-     * Returns the stat information about the file.
-     */
-    public FileStatus getFileStatus(Path f) throws IOException {
-      if (f instanceof DfsPath) {
-        DfsPath p = (DfsPath) f;
-        return p.info;
-      }
-      else {
-        DFSFileInfo p = dfs.getFileInfo(getPath(f));
-        return p;
+    DFSFileInfo info[] = dfs.listPaths(getPath(f));
+    return (info == null) ? 0 : info[0].getLen();
+  }
+
+  public Path[] listPaths(Path f) throws IOException {
+    DFSFileInfo info[] = dfs.listPaths(getPath(f));
+    if (info == null) {
+      return new Path[0];
+    } else {
+      Path results[] = new DfsPath[info.length];
+      for (int i = 0; i < info.length; i++) {
+        results[i] = new DfsPath(info[i], this);
       }
+      return results;
     }
   }
 
-  public DistributedFileSystem() {
-    super(new RawDistributedFileSystem());
+  public boolean mkdirs(Path f) throws IOException {
+    return dfs.mkdirs(getPath(f));
   }
 
-  /** @deprecated */
-  public DistributedFileSystem(InetSocketAddress namenode,
-                               Configuration conf) throws IOException {
-    super(new RawDistributedFileSystem(namenode, conf));
+  /** @deprecated */ @Deprecated
+  public void lock(Path f, boolean shared) throws IOException {
+    dfs.lock(getPath(f), !shared);
+  }
+
+  /** @deprecated */ @Deprecated
+  public void release(Path f) throws IOException {
+    dfs.release(getPath(f));
   }
 
   @Override
-  public long getContentLength(Path f) throws IOException {
-    return fs.getContentLength(f);
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+  throws IOException {
+    FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
   }
 
+  @Override
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+  throws IOException {
+    FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
+  }
+
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  throws IOException {
+    return tmpLocalFile;
+  }
+
+  /**
+   * Move completed local data to DFS destination
+   */
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+  throws IOException {
+    moveFromLocalFile(tmpLocalFile, fsOutputFile);
+  }
+
+  public void close() throws IOException {
+    super.close();
+    dfs.close();
+  }
+
+  public String toString() {
+    return "DFS[" + dfs + "]";
+  }
+
+  DFSClient getClient() {
+    return dfs;
+  }        
   /** Return the total raw capacity of the filesystem, disregarding
    * replication .*/
   public long getRawCapacity() throws IOException{
-    return ((RawDistributedFileSystem)fs).getRawCapacity();
+    return dfs.totalRawCapacity();
   }
 
   /** Return the total raw used space in the filesystem, disregarding
    * replication .*/
   public long getRawUsed() throws IOException{
-    return ((RawDistributedFileSystem)fs).getRawUsed();
+    return dfs.totalRawUsed();
   }
 
   /** Return statistics for each datanode. */
   public DatanodeInfo[] getDataNodeStats() throws IOException {
-    return ((RawDistributedFileSystem)fs).getDataNodeStats();
+    return dfs.datanodeReport();
   }
-    
+
   /**
    * Enter, leave or get safe mode.
    *  
    * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
    */
   public boolean setSafeMode(FSConstants.SafeModeAction action) 
-    throws IOException {
-    return ((RawDistributedFileSystem)fs).setSafeMode(action);
+  throws IOException {
+    return dfs.setSafeMode(action);
   }
 
   /*
@@ -401,26 +267,28 @@
    * files.  
    */
   public void refreshNodes() throws IOException {
-    ((RawDistributedFileSystem)fs).refreshNodes();
+    dfs.refreshNodes();
   }
 
   /**
    * Finalize previously upgraded files system state.
+   * @throws IOException
    */
   public void finalizeUpgrade() throws IOException {
-    ((RawDistributedFileSystem)fs).finalizeUpgrade();
+    dfs.finalizeUpgrade();
   }
 
   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
-                                                        ) throws IOException {
-    return ((RawDistributedFileSystem)fs).distributedUpgradeProgress(action);
+  ) throws IOException {
+    return dfs.distributedUpgradeProgress(action);
   }
 
   /*
-   * Dumps dfs data structures into specified file.
+   * Requests the namenode to dump data strcutures into specified 
+   * file.
    */
   public void metaSave(String pathname) throws IOException {
-    ((RawDistributedFileSystem)fs).metaSave(pathname);
+    dfs.metaSave(pathname);
   }
 
   /**
@@ -429,17 +297,52 @@
    * we can consider figuring out exactly which block is corrupt.
    */
   public boolean reportChecksumFailure(Path f, 
-                                       FSDataInputStream in, long inPos, 
-                                       FSDataInputStream sums, long sumsPos) {
-    return ((RawDistributedFileSystem)fs).reportChecksumFailure(
-                                                                f, in, inPos, sums, sumsPos);
+    FSDataInputStream in, long inPos, 
+    FSDataInputStream sums, long sumsPos) {
+
+    LocatedBlock lblocks[] = new LocatedBlock[2];
+
+    // Find block in data stream.
+    DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
+    Block dataBlock = dfsIn.getCurrentBlock();
+    if (dataBlock == null) {
+      LOG.error("Error: Current block in data stream is null! ");
+      return false;
+    }
+    DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; 
+    lblocks[0] = new LocatedBlock(dataBlock, dataNode);
+    LOG.info("Found checksum error in data stream at block=" + dataBlock.getBlockName() + 
+             " on datanode=" + dataNode[0].getName());
+
+    // Find block in checksum stream
+    DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
+    Block sumsBlock = dfsSums.getCurrentBlock();
+    if (sumsBlock == null) {
+      LOG.error("Error: Current block in checksum stream is null! ");
+      return false;
+    }
+    DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; 
+    lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
+    LOG.info("Found checksum error in checksum stream at block=" + sumsBlock.getBlockName() + 
+             " on datanode=" + sumsNode[0].getName());
+
+    // Ask client to delete blocks.
+    dfs.reportChecksumFailure(f.toString(), lblocks);
+
+    return true;
   }
 
   /**
    * Returns the stat information about the file.
    */
-  @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    return ((RawDistributedFileSystem)fs).getFileStatus(f);
+    if (f instanceof DfsPath) {
+      DfsPath p = (DfsPath) f;
+      return p.info;
+    }
+    else {
+      DFSFileInfo p = dfs.getFileInfo(getPath(f));
+      return p;
+    }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Jul 16 14:34:59 2007
@@ -86,12 +86,20 @@
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;
   public static final byte OP_READ_BLOCK = (byte) 81;
-  public static final byte OP_READSKIP_BLOCK = (byte) 82;
-  public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
-
-  // Encoding types
-  public static final byte RUNLENGTH_ENCODING = 0;
-  public static final byte CHUNKED_ENCODING = 1;
+  public static final byte OP_READ_METADATA = (byte) 82;
+  
+  public static final int OP_STATUS_SUCCESS = 0;  
+  public static final int OP_STATUS_ERROR = 1;  
+  public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
+  public static final int OP_STATUS_ERROR_INVALID = 3;  
+  public static final int OP_STATUS_ERROR_EXISTS = 4;  
+
+  
+  /** Version for data transfers between clients and datanodes
+   * This should change when serialization of DatanodeInfo, not just
+   * when protocol changes. It is not very obvious. 
+   */
+  public static final int DATA_TRANFER_VERSION = 5; //Should it be 1?
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;
@@ -115,8 +123,9 @@
   public static int MAX_PATH_LENGTH = 8000;
   public static int MAX_PATH_DEPTH = 1000;
     
-  //TODO mb@media-style.com: should be conf injected?
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+  //TODO mb@media-style.com: should be conf injected?
+  public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
 
   // SafeMode actions
   public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
@@ -149,7 +158,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -6;
+  public static final int LAYOUT_VERSION = -7;
   // Current version: 
-  // Dustributed upgrade is introduced.
+  // Block Level CRCs added.
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon Jul 16 14:34:59 2007
@@ -87,7 +87,14 @@
                           boolean resetIdx) throws IOException {
       if (numBlocks < maxBlocksPerDir) {
         File dest = new File(dir, b.getBlockName());
-        src.renameTo(dest);
+        File metaData = getMetaFile( src );
+        if ( ! metaData.renameTo( getMetaFile(dest) ) ||
+            ! src.renameTo( dest ) ) {
+          throw new IOException( "could not move files for " + b +
+                                 " from tmp to " + 
+                                 dest.getAbsolutePath() );
+        }
+
         numBlocks += 1;
         return dest;
       }
@@ -155,6 +162,7 @@
 
       File blockFiles[] = dir.listFiles();
       for (int i = 0; i < blockFiles.length; i++) {
+        //We are not enforcing presense of metadata file
         if (Block.isBlockFilename(blockFiles[i])) {
           volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
         }
@@ -262,6 +270,8 @@
     
     FSVolume(File currentDir, Configuration conf) throws IOException {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+      // add block size to the configured reserved space
+      this.reserved += conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
       this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
                                          (float) USABLE_DISK_PCT_DEFAULT);
       File parent = currentDir.getParentFile();
@@ -418,6 +428,14 @@
   //
   //////////////////////////////////////////////////////
 
+  //Find better place?
+  public static final String METADATA_EXTENSION = ".meta";
+  public static final short METADATA_VERSION = 1;
+    
+  public static File getMetaFile( File f ) {
+    return new File( f.getAbsolutePath() + METADATA_EXTENSION );
+  }
+    
   FSVolumeSet volumes;
   private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
   private int maxBlocksPerDir = 0;
@@ -467,20 +485,29 @@
   }
 
   /**
-   * Get a stream of data from the indicated block.
+   * Get File name for a given block.
    */
-  public synchronized InputStream getBlockData(Block b) throws IOException {
+  public synchronized File getBlockFile(Block b) throws IOException {
     if (!isValidBlock(b)) {
       throw new IOException("Block " + b + " is not valid.");
     }
-    // File should be opened with the lock.
-    return new FileInputStream(getFile(b));
+    return getFile(b);
   }
 
+  static class BlockWriteStreams {
+    OutputStream dataOut;
+    OutputStream checksumOut;
+    
+    BlockWriteStreams( File f ) throws IOException {
+      dataOut = new FileOutputStream( f );
+      checksumOut = new FileOutputStream( getMetaFile( f ) );
+    }
+  }
+  
   /**
    * Start writing to a block file
    */
-  public OutputStream writeToBlock(Block b) throws IOException {
+  public BlockWriteStreams writeToBlock(Block b) throws IOException {
     //
     // Make sure the block isn't a valid one - we're still creating it!
     //
@@ -515,7 +542,7 @@
       synchronized (volumes) {
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
-        f = v.createTmpFile(b);
+        f = createTmpFile(v, b);
       }
       ongoingCreates.put(b, f);
       volumeMap.put(b, v);
@@ -526,9 +553,23 @@
     // REMIND - mjc - make this a filter stream that enforces a max
     // block size, so clients can't go crazy
     //
-    return new FileOutputStream(f);
+    return new BlockWriteStreams( f );
   }
 
+  File createTmpFile( FSVolume vol, Block blk ) throws IOException {
+    if ( vol == null ) {
+      synchronized ( this ) {
+        vol = volumeMap.get( blk );
+        if ( vol == null ) {
+          throw new IOException("Could not find volume for block " + blk);
+        }
+      }
+    }
+    synchronized ( volumes ) {
+      return vol.createTmpFile(blk);
+    }
+  }
+  
   //
   // REMIND - mjc - eventually we should have a timeout system
   // in place to clean up block files left by abandoned clients.
@@ -545,8 +586,6 @@
     if (f == null || !f.exists()) {
       throw new IOException("No temporary file " + f + " for block " + b);
     }
-    long finalLen = f.length();
-    b.setNumBytes(finalLen);
     FSVolume v = volumeMap.get(b);
         
     File dest = null;
@@ -575,6 +614,7 @@
    * Check whether the given block is a valid one.
    */
   public boolean isValidBlock(Block b) {
+    //Should we check for metadata file too?
     File f = getFile(b);
     return (f!= null && f.exists());
   }
@@ -619,7 +659,8 @@
         blockMap.remove(invalidBlks[i]);
         volumeMap.remove(invalidBlks[i]);
       }
-      if (!f.delete()) {
+      File metaFile = getMetaFile( f );
+      if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
                           + invalidBlks[i] + " at file " + f);
         error = true;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon Jul 16 14:34:59 2007
@@ -113,6 +113,13 @@
     String getLocalName() {
       return name;
     }
+    
+    String getAbsoluteName() {
+      // recursively constructs the absolute path.
+      // Any escaping of name required?
+      return ((parent != null) ? 
+              (parent.getAbsoluteName() + Path.SEPARATOR): "") + name;
+    }
 
     /**
      * Get file blocks 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Jul 16 14:34:59 2007
@@ -416,6 +416,109 @@
                               curReplicasDelta, expectedReplicasDelta);
   }
 
+  /**
+   * Used only during DFS upgrade for block level CRCs (HADOOP-1134).
+   * This returns information for a given blocks that includes:
+   * <li> full path name for the file that contains the block.
+   * <li> offset of first byte of the block.
+   * <li> file length and length of the block.
+   * <li> all block locations for the crc file (".file.crc").
+   * <li> replication for crc file.
+   * When replicas is true, it includes replicas of the block.
+   */
+  public synchronized BlockCrcInfo blockCrcInfo(Block block, 
+                                                boolean replicas) {
+    BlockCrcInfo crcInfo = new BlockCrcInfo();
+    crcInfo.status = BlockCrcInfo.STATUS_ERROR;
+    
+    FSDirectory.INode fileINode = blocksMap.getINode(block);
+    if ( fileINode == null || fileINode.isDir() ) {
+      // Most probably reason is that this block does not exist
+      if (blocksMap.getStoredBlock(block) == null) {
+        crcInfo.status = BlockCrcInfo.STATUS_UNKNOWN_BLOCK;
+      } else {
+        LOG.warn("getBlockCrcInfo(): Could not find file for " + block);
+      }
+      return crcInfo;
+    }
+
+    crcInfo.fileName = fileINode.getAbsoluteName();
+    
+    // Find the offset and length for this block.
+    Block[] fileBlocks = fileINode.getBlocks();
+    crcInfo.blockLen = -1;
+    if ( fileBlocks != null ) {
+      for ( Block b:fileBlocks ) {
+        if ( block.equals(b) ) {
+          crcInfo.blockLen = b.getNumBytes();
+        }
+        if ( crcInfo.blockLen < 0 ) {
+          crcInfo.startOffset += b.getNumBytes();
+        }
+        crcInfo.fileSize += b.getNumBytes();
+      }
+    }
+
+    if ( crcInfo.blockLen <= 0 ) {
+      LOG.warn("blockCrcInfo(): " + block + 
+               " could not be found in blocks for " + crcInfo.fileName);
+      return crcInfo;
+    }
+    
+    String fileName = fileINode.getLocalName();    
+    if ( fileName.startsWith(".") && fileName.endsWith(".crc") ) {
+      crcInfo.status = BlockCrcInfo.STATUS_CRC_BLOCK;
+      return crcInfo;
+    }
+
+    if (replicas) {
+      // include block replica locations, instead of crcBlocks
+      crcInfo.blockLocationsIncluded = true;
+      
+      DatanodeInfo[] dnInfo = new DatanodeInfo[blocksMap.numNodes(block)];
+      Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+      for (int i=0; it != null && it.hasNext(); i++ ) {
+        dnInfo[i] = new DatanodeInfo(it.next());
+      }
+      crcInfo.blockLocations = new LocatedBlock(block, dnInfo, 
+                                                crcInfo.startOffset);
+    } else {
+
+      //Find CRC file
+      String crcName = "." + fileName + ".crc";
+      FSDirectory.INode crcINode = fileINode.getParent().getChild(crcName);
+
+      if ( crcINode == null ) {
+        // Should we log this?
+        crcInfo.status = BlockCrcInfo.STATUS_NO_CRC_DATA;
+        return crcInfo;
+      }
+
+      Block[] blocks = crcINode.getBlocks();
+      if ( blocks == null )  {
+        LOG.warn("getBlockCrcInfo(): could not find blocks for crc file for " +
+                 crcInfo.fileName);
+        return crcInfo;
+      }
+
+      crcInfo.crcBlocks = new LocatedBlock[ blocks.length ];
+      for (int i=0; i<blocks.length; i++) {
+        DatanodeInfo[] dnArr = new DatanodeInfo[ blocksMap.numNodes(blocks[i]) ];
+        int idx = 0;
+        for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[i]); 
+        it.hasNext();) { 
+          dnArr[ idx++ ] = it.next();
+        }
+        crcInfo.crcBlocks[i] = new LocatedBlock(blocks[i], dnArr);
+      }
+
+      crcInfo.crcReplication = crcINode.getReplication();
+    }
+    
+    crcInfo.status = BlockCrcInfo.STATUS_DATA_BLOCK;
+    return crcInfo;
+  }
+  
   /////////////////////////////////////////////////////////
   //
   // These methods are called by HadoopFS clients
@@ -1120,18 +1223,28 @@
    * invalidate some blocks that make up the file.
    */
   public boolean delete(String src) throws IOException {
-    boolean status = deleteInternal(src);
+    boolean status = deleteInternal(src, true);
     getEditLog().logSync();
     return status;
   }
 
   /**
+   * An internal delete function that does not enforce safe mode
+   */
+  boolean deleteInSafeMode(String src) throws IOException {
+    boolean status = deleteInternal(src, false);
+    getEditLog().logSync();
+    return status;
+  }
+  /**
    * Remove the indicated filename from the namespace.  This may
    * invalidate some blocks that make up the file.
    */
-  private synchronized boolean deleteInternal(String src) throws IOException {
+  private synchronized boolean deleteInternal(String src, 
+                                              boolean enforceSafeMode) 
+                                              throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
-    if (isInSafeMode())
+    if (enforceSafeMode && isInSafeMode())
       throw new SafeModeException("Cannot delete " + src, safeMode);
     Block deletedBlocks[] = dir.delete(src);
     if (deletedBlocks != null) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java Mon Jul 16 14:34:59 2007
@@ -93,38 +93,24 @@
     Socket s = new Socket();
     s.connect(addr, FSConstants.READ_TIMEOUT);
     s.setSoTimeout(FSConstants.READ_TIMEOUT);
-    //
-    // Xmit header info to datanode
-    //
-    DataOutputStream os = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-    os.write(FSConstants.OP_READSKIP_BLOCK);
-    new Block(blockId, blockSize).write(os);
-    os.writeLong(offsetIntoBlock);
-    os.flush();
-
-    //
-    // Get bytes in block, set streams
-    //
-    DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-    long curBlockSize = in.readLong();
-    long amtSkipped = in.readLong();
-    if (curBlockSize != blockSize) {
-      throw new IOException("Recorded block size is " + blockSize + ", but datanode reports size of " + curBlockSize);
-    }
-    if (amtSkipped != offsetIntoBlock) {
-      throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
-    }
       
-    long amtToRead = chunkSizeToView;
-    if (amtToRead + offsetIntoBlock > blockSize)
-      amtToRead = blockSize - offsetIntoBlock;
+      long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
+      
+      // Use the block name for file name. 
+      DFSClient.BlockReader blockReader = 
+        DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
+                                             blockId, offsetIntoBlock, 
+                                             amtToRead, 
+                                             conf.getInt("io.file.buffer.size",
+                                                         4096));
+        
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
     int retries = 2;
-    while (true) {
+    while ( amtToRead > 0 ) {
       int numRead;
       try {
-        numRead = in.read(buf, readOffset, (int)amtToRead);
+        numRead = blockReader.readAll(buf, readOffset, (int)amtToRead);
       }
       catch (IOException e) {
         retries--;
@@ -134,11 +120,9 @@
       }
       amtToRead -= numRead;
       readOffset += numRead;
-      if (amtToRead == 0)
-        break;
     }
+    blockReader = null;
     s.close();
-    in.close();
     out.print(new String(buf));
   }
   public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Jul 16 14:34:59 2007
@@ -688,6 +688,11 @@
     return namesystem.processDistributedUpgradeCommand(comm);
   }
 
+  public BlockCrcInfo blockCrcUpgradeGetBlockLocations(Block block) 
+                                                       throws IOException {
+    return namesystem.blockCrcInfo(block, true);
+  }
+
   /** 
    * Verify request.
    * 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java Mon Jul 16 14:34:59 2007
@@ -310,8 +310,9 @@
     InetSocketAddress targetAddr = null;
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     Socket s = null;
-    DataInputStream in = null;
-    DataOutputStream out = null;
+    DFSClient.BlockReader blockReader = null; 
+    Block block = lblock.getBlock(); 
+
     while (s == null) {
       DatanodeInfo chosenNode;
       
@@ -336,27 +337,12 @@
         s.connect(targetAddr, FSConstants.READ_TIMEOUT);
         s.setSoTimeout(FSConstants.READ_TIMEOUT);
         
-        //
-        // Xmit header info to datanode
-        //
-        out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-        out.write(FSConstants.OP_READSKIP_BLOCK);
-        lblock.getBlock().write(out);
-        out.writeLong(0L);
-        out.flush();
+        blockReader = 
+          DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
+                                               block.getBlockId(), 
+                                               block.getBlockId(), 0, -1,
+                                               conf.getInt("io.file.buffer.size", 4096));
         
-        //
-        // Get bytes in block, set streams
-        //
-        in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-        long curBlockSize = in.readLong();
-        long amtSkipped = in.readLong();
-        if (curBlockSize != lblock.getBlock().len) {
-          throw new IOException("Recorded block size is " + lblock.getBlock().len + ", but datanode reports size of " + curBlockSize);
-        }
-        if (amtSkipped != 0L) {
-          throw new IOException("Asked for offset of " + 0L + ", but only received offset of " + amtSkipped);
-        }
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         LOG.info("Failed to connect to " + targetAddr + ":" + ex);
@@ -370,22 +356,26 @@
         s = null;
       }
     }
-    if (in == null) {
+    if (blockReader == null) {
       throw new Exception("Could not open data stream for " + lblock.getBlock().getBlockName());
     }
     byte[] buf = new byte[1024];
     int cnt = 0;
     boolean success = true;
+    long bytesRead = 0;
     try {
-      while ((cnt = in.read(buf)) > 0) {
+      while ((cnt = blockReader.read(buf, 0, buf.length)) > 0) {
         fos.write(buf, 0, cnt);
+        bytesRead += cnt;
+      }
+      if ( bytesRead != block.getNumBytes() ) {
+        throw new IOException("Recorded block size is " + block.getNumBytes() + 
+                              ", but datanode returned " +bytesRead+" bytes");
       }
     } catch (Exception e) {
       e.printStackTrace();
       success = false;
     } finally {
-      try {in.close(); } catch (Exception e1) {}
-      try {out.close(); } catch (Exception e1) {}
       try {s.close(); } catch (Exception e1) {}
     }
     if (!success)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java Mon Jul 16 14:34:59 2007
@@ -33,6 +33,8 @@
     initialize();
     // Registered distributed upgrade objects here
     // registerUpgrade(new UpgradeObject());
+    registerUpgrade(new BlockCrcUpgradeObjectNamenode());
+    registerUpgrade(new BlockCrcUpgradeObjectDatanode());
   }
 
   static class UOSignature implements Comparable<UOSignature> {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Mon Jul 16 14:34:59 2007
@@ -22,6 +22,7 @@
 import java.util.Enumeration;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
+import java.net.Socket;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
@@ -450,5 +451,48 @@
       tmp.deleteOnExit();
     }
     return tmp;
+  }
+  
+  //XXX These functions should be in IO Utils rather than FileUtil
+  // Reads len bytes in a loop.
+  public static void readFully( InputStream in, byte buf[],
+                                int off, int len ) throws IOException {
+    int toRead = len;
+    while ( toRead > 0 ) {
+      int ret = in.read( buf, off, toRead );
+      if ( ret < 0 ) {
+        throw new IOException( "Premeture EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }
+  }
+  
+  public static void closeSocket( Socket sock ) {
+    // avoids try { close() } dance
+    if ( sock != null ) {
+      try {
+       sock.close();
+      } catch ( IOException ignored ) {
+      }
+    }
+  }
+  public static void closeStream( InputStream in ) {
+    // avoids try { close() } dance
+    if ( in != null ) {
+      try {
+        in.close();
+      } catch ( IOException ignored ) {
+      }
+    }
+  }
+  public static void closeStream( OutputStream out ) {
+    // avoids try { close() } dance
+    if ( out != null ) {
+      try {
+        out.close();
+      } catch ( IOException ignored ) {
+      }
+    }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Mon Jul 16 14:34:59 2007
@@ -22,6 +22,7 @@
 import java.text.SimpleDateFormat;
 
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.dfs.ChecksumDistributedFileSystem;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.util.ToolBase;
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Mon Jul 16 14:34:59 2007
@@ -42,11 +42,15 @@
 
   public void testCopyToLocal() throws IOException {
     Configuration conf = new Configuration();
+    /* This tests some properties of ChecksumFileSystem as well.
+     * Make sure that we create ChecksumDFS */
+    conf.set("fs.hdfs.impl",
+             "org.apache.hadoop.dfs.ChecksumDistributedFileSystem");
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
-               fs instanceof DistributedFileSystem);
-    DistributedFileSystem dfs = (DistributedFileSystem)fs;
+               fs instanceof ChecksumDistributedFileSystem);
+    ChecksumDistributedFileSystem dfs = (ChecksumDistributedFileSystem)fs;
     FsShell shell = new FsShell();
     shell.setConf(conf);
 
@@ -132,11 +136,15 @@
    */
   public void testDFSShell() throws IOException {
     Configuration conf = new Configuration();
+    /* This tests some properties of ChecksumFileSystem as well.
+     * Make sure that we create ChecksumDFS */
+    conf.set("fs.hdfs.impl",
+             "org.apache.hadoop.dfs.ChecksumDistributedFileSystem");
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
-               fs instanceof DistributedFileSystem);
-    DistributedFileSystem fileSys = (DistributedFileSystem)fs;
+            fs instanceof ChecksumDistributedFileSystem);
+    ChecksumDistributedFileSystem fileSys = (ChecksumDistributedFileSystem)fs;
     FsShell shell = new FsShell();
     shell.setConf(conf);
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Mon Jul 16 14:34:59 2007
@@ -117,7 +117,7 @@
     assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
         
     DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) 
-      ((DistributedFileSystem)fileSys).getRawFileSystem().open(name);
+      ((DistributedFileSystem)fileSys).open(name);
     Collection<LocatedBlock> dinfo = dis.getAllBlocks();
 
     for (LocatedBlock blk : dinfo) { // for each block

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSInputChecker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSInputChecker.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSInputChecker.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSInputChecker.java Mon Jul 16 14:34:59 2007
@@ -214,6 +214,8 @@
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", BLOCK_SIZE);
     conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
+    conf.set("fs.hdfs.impl",
+             "org.apache.hadoop.dfs.ChecksumDistributedFileSystem");
     Random rand = new Random(seed);
     rand.nextBytes(expected);
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSOutputSummer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSOutputSummer.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSOutputSummer.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSOutputSummer.java Mon Jul 16 14:34:59 2007
@@ -113,6 +113,8 @@
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", BLOCK_SIZE);
     conf.setInt("io.bytes.per.checksum", BYTES_PER_CHECKSUM);
+    conf.set("fs.hdfs.impl",
+             "org.apache.hadoop.dfs.ChecksumDistributedFileSystem");      
     MiniDFSCluster cluster = new MiniDFSCluster(
         conf, NUM_OF_DATANODES, true, null);
     fileSys = cluster.getFileSystem();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java Mon Jul 16 14:34:59 2007
@@ -81,6 +81,7 @@
    */
   public void testSmallBlock() throws IOException {
     Configuration conf = new Configuration();
+    conf.set("io.bytes.per.checksum", "1");
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Mon Jul 16 14:34:59 2007
@@ -36,6 +36,8 @@
     FileSystem fileSys = null;
     try {
       JobConf conf = new JobConf();
+      conf.set("fs.hdfs.impl",
+               "org.apache.hadoop.dfs.ChecksumDistributedFileSystem");      
       dfs = new MiniDFSCluster(conf, 1, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(2, fileSys.getName(), 4);



Mime
View raw message