hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r556743 [1/3] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/mapred/
Date Mon, 16 Jul 2007 21:35:03 GMT
Author: cutting
Date: Mon Jul 16 14:34:59 2007
New Revision: 556743

URL: http://svn.apache.org/viewvc?view=rev&rev=556743
Log:
HADOOP-1134.  Add optimized checksum support to HDFS.  Contributed by Raghu.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ChecksumDistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSInputChecker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFSOutputSummer.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSmallBlock.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 14:34:59 2007
@@ -362,6 +362,11 @@
 113. HADOOP-1613.  Fix "DFS Health" page to display correct time of
      last contact.  (Dhruba Borthakur via cutting)
 
+114. HADOOP-1134.  Add optimized checksum support to HDFS.  Checksums
+     are now stored with each block, rather than as parallel files.
+     This reduces the namenode's memory requirements and increases
+     data integrity.  (Raghu Angadi via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Mon Jul 16 14:34:59 2007
@@ -39,7 +39,9 @@
   /**
    */
   public static boolean isBlockFilename(File f) {
-    if (f.getName().startsWith("blk_")) {
+    String name = f.getName();
+    if ( name.startsWith( "blk_" ) && 
+        name.indexOf( '.' ) < 0 ) {
       return true;
     } else {
       return false;

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?view=auto&rev=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Mon Jul 16 14:34:59 2007
@@ -0,0 +1,2092 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.*;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Daemon;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+import java.util.concurrent.*;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Random;
+
+/* This containtains information about CRC file and blocks created by
+ * ChecksumFileSystem for a given block.
+ */
+class BlockCrcInfo implements Writable {
+  
+  public static final int STATUS_DATA_BLOCK = 0;
+  public static final int STATUS_CRC_BLOCK = 1; // block belongs a .crc file
+  public static final int STATUS_NO_CRC_DATA = 2;
+  public static final int STATUS_UNKNOWN_BLOCK = 3;
+  public static final int STATUS_ERROR = 4; // unknown error
+  
+  int status = STATUS_ERROR;
+
+  String fileName = "";
+  long fileSize;
+  long startOffset;
+  long blockLen;
+  
+  LocatedBlock[] crcBlocks; // actual block locations.
+  int crcReplication; // expected replication.
+  
+  // set during 'offline upgrade'
+  boolean blockLocationsIncluded;
+  LocatedBlock blockLocations = new LocatedBlock(); 
+  
+  private void writeBlockLocations(LocatedBlock[] locArr, DataOutput out) 
+                                   throws IOException {
+    int len = ( locArr == null ) ? 0 : locArr.length;
+    out.writeInt( len );
+    if ( len > 0 ) {
+      for (LocatedBlock loc : locArr) {
+        loc.write( out );
+      }
+    }    
+  }
+  
+  private LocatedBlock[] readBlockLocations(DataInput in) throws IOException {
+    int len = in.readInt();
+    LocatedBlock[] locArr = (len > 0) ? new LocatedBlock[len] : null; 
+    for (int i=0; i<len; i++) {
+      locArr[i] = new LocatedBlock();
+      locArr[i].readFields( in );
+    }
+    return locArr;
+  }
+  
+  // Writable interface
+  
+  public void write(DataOutput out) throws IOException {
+    out.writeInt( status );
+    Text.writeString( out, fileName );
+    out.writeLong( fileSize );
+    out.writeLong( startOffset );
+    out.writeLong( blockLen );
+    writeBlockLocations(crcBlocks, out);
+    out.writeInt(crcReplication);
+    out.writeBoolean(blockLocationsIncluded);
+    blockLocations.write(out);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    status = in.readInt();
+    fileName = Text.readString( in );
+    fileSize = in.readLong();
+    startOffset = in.readLong();
+    blockLen = in.readLong();
+    crcBlocks = readBlockLocations( in );
+    crcReplication = in.readInt();
+    blockLocationsIncluded = in.readBoolean();
+    blockLocations.readFields( in );
+  }
+}
+
+/**
+ * Consolidates various information regd upgrade of a single block at the
+ * Datanode.
+ */
+class DNBlockUpgradeInfo {
+  
+  Block block;
+  DataNode dataNode;
+  DatanodeProtocol namenode;
+  
+  BlockCrcInfo crcInfo; // CrcInfo fetched from the namenode.
+  boolean offlineUpgrade;
+  
+  /** Returns string that has block id and the associated file */
+  public String toString() {
+    return block + " (file: " +
+           ( (crcInfo == null || crcInfo.fileName == null) ? 
+             "Unknown" : crcInfo.fileName ) + ")";
+  }
+}
+
+/**
+ * This class contains various utilities for upgrade of DFS during switch
+ * to block level CRCs (HADOOP-1134).
+ */
+class BlockCrcUpgradeUtils {
+  
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.BlockCrcUpgradeUtils");
+  
+  /* If some operation does not finish in this time, mostly something
+   * has gone wrong in the cluster (mostly it would be wrong configuration).
+   */ 
+  public static final int LONG_TIMEOUT_MINUTES = 5;
+  public static final int LONG_TIMEOUT_MILLISEC = LONG_TIMEOUT_MINUTES*60*1000;
+  public static final int PRE_BLOCK_CRC_LAYOUT_VERSION = -6;
+  
+  /** 
+   * Reads block data from a given set of datanodes. It tries each
+   * datanode in order and tries next datanode if fetch from prev datanode
+   * fails.
+   * 
+   * @param loc locations of the block
+   * @param offset offset into the block
+   * @param len len of the data to be read from the given offset
+   * @param buf buffer for data
+   * @param bufOffset offset into the buffer for writing
+   * @throws IOException
+   */ 
+  static void readFromRemoteNode(LocatedBlock loc, String crcFile,
+                                 long offset, long len,
+                                 byte[] buf, int bufOffset) 
+                                 throws IOException {
+    Block blk = loc.getBlock();
+    String dnStr = "";
+    
+    for (DatanodeInfo dn : loc.getLocations()) {
+      dnStr += dn.getName() + " ";
+      Socket dnSock = null;
+      try {
+        InetSocketAddress dnAddr = DataNode.createSocketAddr(dn.getName());
+        dnSock = new Socket();
+        dnSock.connect(dnAddr, FSConstants.READ_TIMEOUT);
+        dnSock.setSoTimeout(FSConstants.READ_TIMEOUT);
+        DFSClient.BlockReader reader = DFSClient.BlockReader.newBlockReader
+                    (dnSock, crcFile, blk.getBlockId(), offset, len, 
+                     (int)Math.min(len, 4096));
+        FileUtil.readFully(reader, buf, bufOffset, (int)len);
+        return;
+      } catch (IOException ioe) {
+        LOG.warn("Could not read " + blk + " from " + dn.getName() + " : " +
+                 StringUtils.stringifyException(ioe));
+      } finally {
+        if ( dnSock != null )
+          dnSock.close();
+      }
+    }
+    
+    throw new IOException("Could not fetch data for " + blk +
+                          " from datanodes " + dnStr);
+  }
+   
+  /**
+   * Reads data from an HDFS file from a given file-offset. 
+   * Before opening the file, it forces "fs.hdfs.impl" to
+   * "ChecksumDistributedFileSystem". So this is meant to read
+   * only files that have associcated ".crc" files.
+   * 
+   * @param filename HDFS complete filename for this file. 
+   * @param fileOffset fileOffset to read from
+   * @param len length of the data to read.
+   * @param namenodeAddr Namenode address for creating "hdfs://..." path.
+   * @param buf buffer to read into
+   * @param bufOffset offset into read buffer.
+   * @throws IOException
+   */
+  static void readDfsFileData(String filename, long fileOffset, int len, 
+                              InetSocketAddress namenodeAddr,
+                              byte[] buf, int bufOffset)
+                              throws IOException {
+    /*
+     * Read from an HDFS file.
+     */
+    Configuration conf = new Configuration();
+    //Makesure we use ChecksumFileSystem.
+    conf.set("fs.hdfs.impl",
+             "org.apache.hadoop.dfs.ChecksumDistributedFileSystem");
+    URI uri;
+    try {
+      uri = new URI("hdfs://" + namenodeAddr.getHostName() + ":" +
+                    namenodeAddr.getPort());
+    } catch (URISyntaxException e) {
+      throw new IOException("Got URISyntaxException for " + filename);
+    }
+    
+    FileSystem fs = FileSystem.get(uri, conf);
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(new Path(filename));
+      if (fileOffset > 0) {
+        in.seek(fileOffset);
+      }
+      in.readFully(buf, bufOffset, len);
+    } finally {
+      FileUtil.closeStream(in);
+    }
+  }
+  
+  
+  /**
+   * This method is the main entry point for fetching CRC data for  
+   * a block from corresponding blocks for ".crc" file. 
+   * <br><br>
+   * 
+   * It first reads header from the ".crc" file and then invokes 
+   * readCrcBuf() to read the actual CRC data.
+   * It then writes the checksum to disk.
+   *
+   * 
+   * @param blockInfo
+   * @throws IOException when it fails to fetch CRC data for any reason.
+   */
+  static void readCrcFileData(DNBlockUpgradeInfo blockInfo)
+                              throws IOException {
+    
+    //First read the crc header  
+    byte[] header = new byte[8]; //'c r c \0 int '
+    int bytesRead = 0;
+    BlockCrcInfo crcInfo = blockInfo.crcInfo;
+    
+    for(int i=0; i<crcInfo.crcBlocks.length && 
+                 bytesRead < header.length; i++) {
+      
+      LocatedBlock loc = crcInfo.crcBlocks[i];
+      long toRead = Math.min(loc.getBlock().getNumBytes(),
+                             header.length-bytesRead);
+      readFromRemoteNode(loc, "."+crcInfo.fileName+".crc", 
+                         0, toRead, header, bytesRead);
+      bytesRead += toRead;
+    }
+    
+    if (bytesRead != header.length || header[0] != 'c' || 
+        header[1] != 'r' || header[2] != 'c' || header[3] != 0) {
+      // Should be very rare.
+      throw new IOException("Could not read header from crc file");
+    }
+    
+    int bytesPerChecksum = ((header[4] & 0xff) << 24) |
+                           ((header[5] & 0xff) << 16) |
+                           ((header[6] & 0xff) << 8) |
+                           (header[7] & 0xff);
+    
+    // sanity check the value. Is 100 MB good reasonable upper limt?
+    if (bytesPerChecksum < 1 || bytesPerChecksum > 100*1024*1024) {
+      throw new IOException("Insane value for bytesPerChecksum (" +
+                            bytesPerChecksum + ")");
+    }
+    
+    byte[] crcBuf = null;    
+        
+    try {
+      crcBuf = readCrcBuf(blockInfo, bytesPerChecksum);
+    } catch (IOException ioe) {
+      LOG.warn("Failed to fetch CRC data for " + blockInfo);
+      throw ioe;
+    } 
+    
+    writeCrcData(blockInfo, bytesPerChecksum, crcBuf);
+    /* After successful write(), we could inform the name node about it.
+     * or we can just inform name after all the blocks have been upgraded.
+     * Waiting for all the blocks to complete is probably better.
+     */
+  }
+  
+  /** 
+   * Low level function to create metadata file for a block with the
+   * CRC data. If crcBuf is null, it writes a metadata file with empty checksum.
+   * 
+   * @param blockInfo
+   * @param bytesPerChecksum 
+   * @param crcBuf buffer containing CRC. null implies metadata file should be
+   *        written with empty checksum.
+   * @throws IOException
+   */
+  static void writeCrcData(DNBlockUpgradeInfo blockInfo, int bytesPerChecksum,
+                           byte[] crcBuf) throws IOException {
+    Block block = blockInfo.block;
+    
+    File blockFile = blockInfo.dataNode.data.getBlockFile( block );
+    File metaFile = FSDataset.getMetaFile( blockFile );
+    
+    if ( bytesPerChecksum <= 0 ) {
+      if (crcBuf == null) {
+        bytesPerChecksum = blockInfo.dataNode.defaultBytesPerChecksum;
+      } else {
+        throw new IOException("Illegal Argument bytesPerChecksum(" +
+                              bytesPerChecksum);
+      }
+    }
+    
+    if ( metaFile.exists() ) {
+      if ( true ) {
+        throw new IOException("metadata file exists but unexpected for " +
+                              blockInfo);
+      }
+      // Verify the crcBuf. this should be removed.
+      if ( crcBuf == null )  {
+        return;
+      }
+      FileInputStream in = null;
+      try {
+        in = new FileInputStream( metaFile );
+        in.skip(7); //should be skipFully().
+        byte[] storedChecksum = new byte[ crcBuf.length ];
+        FileUtil.readFully(in, storedChecksum, 0, storedChecksum.length);
+        if ( !Arrays.equals(crcBuf, storedChecksum) ) {
+          throw new IOException("CRC does not match");
+        }
+      } finally {
+        FileUtil.closeStream(in);
+      }
+      return;
+    }
+    
+    File tmpBlockFile = null;
+    File tmpMetaFile = null;
+    DataOutputStream out = null;
+    try {
+      tmpBlockFile = blockInfo.dataNode.data.createTmpFile(null, block);
+      tmpMetaFile = FSDataset.getMetaFile( tmpBlockFile );
+      out = new DataOutputStream( new FileOutputStream(tmpMetaFile) );
+      
+      // write the header
+      out.writeShort( FSDataset.METADATA_VERSION );  
+      DataChecksum checksum = 
+        DataChecksum.newDataChecksum( ((crcBuf == null) ?
+                                       DataChecksum.CHECKSUM_NULL :
+                                       DataChecksum.CHECKSUM_CRC32 ),
+                                       bytesPerChecksum );
+      checksum.writeHeader(out);
+      if (crcBuf != null) {
+        out.write(crcBuf);
+      }
+      out.close();
+      out = null;
+      
+      if ( !tmpMetaFile.renameTo( metaFile ) ) {
+        throw new IOException("Could not rename " + tmpMetaFile + " to " +
+                              metaFile);
+      }
+    } finally {
+      FileUtil.closeStream(out);
+      if ( tmpBlockFile != null ) {
+        tmpBlockFile.delete();
+      }
+    }
+  }
+  
+  /**
+   * This regenerates CRC for a block by checksumming few bytes at either end 
+   * of the block (read from the HDFS file) and the local block data. This is 
+   * invoked only when either end of the block  doesn't fall on checksum
+   * boundary (very uncommon). <i>bytesBefore</i> or <i>bytesAfter</i>
+   * should have non-zero length.
+   *  
+   * @param blockInfo
+   * @param oldCrcBuf CRC buffer fetched from ".crc" file
+   * @param bytesPerChecksum
+   * @param bytesBefore bytes located before this block in the file
+   * @param bytesAfter bytes located after this block in the file
+   * @return Returns the generated CRC in a buffer.
+   * @throws IOException
+   */
+  static byte[] regenerateCrcBuf(DNBlockUpgradeInfo blockInfo,
+                                 byte[] oldCrcBuf, int bytesPerChecksum,
+                                 byte[] bytesBefore, byte[] bytesAfter)
+                                 throws IOException {
+    
+    DataChecksum verificationChecksum = DataChecksum.newDataChecksum
+                                         (DataChecksum.CHECKSUM_CRC32, 
+                                          bytesPerChecksum);
+    DataChecksum newChecksum = DataChecksum.newDataChecksum
+                                         (DataChecksum.CHECKSUM_CRC32, 
+                                          bytesPerChecksum);
+    
+    BlockCrcInfo crcInfo = blockInfo.crcInfo;
+    
+    int checksumSize = newChecksum.getChecksumSize();
+    int newCrcSize = (int) (crcInfo.blockLen/bytesPerChecksum*checksumSize);
+    if ( crcInfo.blockLen%bytesPerChecksum > 0 ) {
+      newCrcSize += checksumSize;
+    }
+    byte[] newCrcBuf = new byte[newCrcSize];
+    int newCrcOffset = 0;
+    int oldCrcOffset = 0;
+    
+    Block block = blockInfo.block;
+    File blockFile = blockInfo.dataNode.data.getBlockFile( block );
+    if ( blockFile == null || !blockFile.exists() ) {
+      throw new IOException("Block file "  + 
+                            ((blockFile != null) ? blockFile.getAbsolutePath()
+                              : "NULL") + " does not exist.");
+    }
+    
+    byte[] blockBuf = new byte[bytesPerChecksum];
+    FileInputStream in = null;
+       
+    try {
+      boolean chunkTainted = false;
+      boolean prevChunkTainted = false;
+      long bytesRead = 0;
+
+      if ( bytesBefore.length > 0 ) {
+        verificationChecksum.update(bytesBefore, 0, bytesBefore.length);
+      }
+      int verifyLen = bytesPerChecksum - bytesBefore.length;
+      long verifiedOffset = -bytesBefore.length;
+      long writtenOffset = 0;
+
+      in = new FileInputStream( blockFile );
+      
+      while ( bytesRead <= crcInfo.blockLen ) {        
+        /* In each iteration we read number of bytes required for newChecksum,
+         * except in the last iteration where we read 0 bytes.
+         * newChecksum updated in an iteration is written in the next 
+         * iteration, because only in the next iteration will we be 
+         * gauranteed that we have verified enough data to write newChecksum.
+         * All the comparisions below are chosen to enforce the above.
+         */
+        int toRead = (int) Math.min(crcInfo.blockLen - bytesRead, 
+                                    bytesPerChecksum);
+        /* if bytesBefore.length == 0, then we need not rechecksum but
+         * simply copy from oldCrcBuf for most of the block.
+         * But we are not optimizing for this case.
+         */
+        if ( toRead > 0 ) {
+          FileUtil.readFully(in, blockBuf, 0, toRead);
+        }
+
+        if ( (toRead == 0 && bytesAfter.length > 0) || toRead >= verifyLen ) {
+          if ( toRead > 0 ) {
+            verificationChecksum.update(blockBuf, 0, verifyLen);
+          }
+          prevChunkTainted = chunkTainted;
+          chunkTainted = !verificationChecksum.compare(oldCrcBuf, oldCrcOffset);
+          oldCrcOffset += checksumSize;
+          verifiedOffset += bytesPerChecksum;
+          verificationChecksum.reset();
+        }
+
+        /* We update newCrcBuf only after all the bytes checksummed are
+         * verified.
+         */
+        long diff = verifiedOffset - writtenOffset;
+        if ( toRead == 0 || diff > bytesPerChecksum || 
+            bytesRead >= verifiedOffset ) {
+          // decide if we need to reset the checksum.
+          if ( (diff > bytesPerChecksum && prevChunkTainted) || 
+              (diff < 2L*bytesPerChecksum && chunkTainted) ) {
+            LOG.warn("Resetting checksum for " + blockInfo + " at offset "
+                     + writtenOffset);
+            newChecksum.reset();
+          }
+          newChecksum.writeValue(newCrcBuf, newCrcOffset, true);
+          newCrcOffset += checksumSize;
+          writtenOffset += Math.min( crcInfo.blockLen-writtenOffset,
+                                     bytesPerChecksum );
+        }
+
+        if ( toRead == 0 ) {
+          //Finally done with update the new CRC buffer!
+          break;
+        }
+
+        if ( toRead != verifyLen ) {
+          int tmpOff = ( toRead > verifyLen ) ? verifyLen : 0; 
+          verificationChecksum.update(blockBuf, tmpOff, toRead-tmpOff);
+        }
+
+        bytesRead += toRead;
+        if ( bytesRead == crcInfo.blockLen && bytesAfter.length > 0 ) {
+          /* We are at the edge.
+           * if bytesBefore.length == 0, then blockLen % bytesPerChecksum
+           * can not be 0.
+           */
+          verificationChecksum.update(bytesAfter, 0, bytesAfter.length);
+        }
+        newChecksum.update(blockBuf, 0, toRead);
+      }
+
+      //XXX Remove the assert.
+      assert newCrcBuf.length == newCrcOffset : "something is wrong"; 
+      return newCrcBuf;
+    } finally {
+      FileUtil.closeStream(in);
+    }
+  }
+
+  /**
+   * Reads multiple copies of CRC data from different replicas and  
+   * compares them. It selects the CRC data that matches on majority
+   * of the copies. If first 'replication/2+1' copies match with each
+   * other it does not read rest of the copies of CRC data, this would be
+   * the common case for most the blocks.
+   * <br><br>
+   * 
+   * When CRC data falls on multiple blocks, the permutations it tries are 
+   * pretty simple. If there 2 blocks involved, with replication of 3, it tries
+   * 3 combinations and not 9. Currently it does not read more replicas
+   * if CRC data is over-replicated, reads only up to file's 'replication'.
+   * <br><br> 
+   * 
+   * If all copies are different, it returns the first one.
+   * <br>
+   * 
+   * @param blockInfo
+   * @param crcStart start offset into the ".crc" file
+   * @param crcSize length of the CRC data
+   * @return Returns buffer containing crc data. buffer.length == crcSize
+   * @throws IOException when it can not fetch even one copy of the CRC data.
+   */
+  static byte[] findMajorityCrcBuf(DNBlockUpgradeInfo blockInfo,
+                                   long crcStart, int crcSize)
+                                   throws IOException {
+    
+    Block block = blockInfo.block;
+    BlockCrcInfo crcInfo = blockInfo.crcInfo;
+    int replication = crcInfo.crcReplication;
+    
+    // internal/local class
+    class CrcBufInfo {
+      CrcBufInfo(byte[] b, long checksum) {
+        buf = b;
+        crc32 = checksum;
+      }
+      byte[] buf;
+      long crc32;
+      int  numMatches = 0;
+    }
+    
+    CrcBufInfo[] bufInfoArr = new CrcBufInfo[replication];
+    int numBufs = 0;
+    CRC32 crc32 = new CRC32();
+    boolean atleastOneNewReplica = true;
+    
+    for(int i=0; i<replication && atleastOneNewReplica; i++) {
+      /* when crc data falls on multiple blocks, we don't
+       * try fetching from all combinations of the replicas. We could.
+       * We try only crcReplication combinations.
+       */
+      int bytesRead = 0;
+      byte[] buf = new byte[crcSize];
+      long offset = 0;
+      atleastOneNewReplica = false;
+      String crcFileName = "."+blockInfo.crcInfo.fileName+".crc";
+      for (LocatedBlock loc : crcInfo.crcBlocks) {          
+
+        long blockStart = crcStart - offset + bytesRead; 
+        long blockSize = loc.getBlock().getNumBytes();
+        offset += blockSize;
+        if ( blockSize <= blockStart ){
+          continue;
+        }
+        
+        DatanodeInfo dn;
+        DatanodeInfo [] dnArr = loc.getLocations();
+        if ( dnArr.length > i ) {
+          dn = dnArr[i];
+          atleastOneNewReplica = true;
+        } else {
+          // if all the data is in single block, then no need to read
+          if ( (bytesRead == 0 || !atleastOneNewReplica) &&
+               (blockSize - blockStart) >= (crcSize - bytesRead) ) {
+            break;
+          }
+          dn = dnArr[dnArr.length-1];
+        }
+        
+        try {
+          DatanodeInfo[] tmpArr = new DatanodeInfo[1];
+          tmpArr[0] = dn;
+          long toRead = Math.min(crcSize - bytesRead, blockSize-blockStart);
+          readFromRemoteNode(new LocatedBlock(loc.getBlock(), tmpArr),
+                             crcFileName, blockStart, toRead, buf, bytesRead);
+          bytesRead += toRead;
+        } catch (IOException ioe) {
+          LOG.warn("Error while fetching crc data from " + dn.getName() +
+                   "for " + blockInfo + StringUtils.stringifyException(ioe));
+          break;
+        }
+        
+        if ( bytesRead < crcSize ) {
+          continue;
+        }
+        
+        crc32.reset();
+        crc32.update(buf, 0, crcSize);
+        long crc = crc32.getValue();
+
+        for(int j=0; j<numBufs+1; j++) {
+          if ( j < numBufs && crc != bufInfoArr[j].crc32 ) {
+            LOG.warn("Mismatch in crc for " + blockInfo);
+            continue;
+          }
+
+          CrcBufInfo info = ( j < numBufs ) ? bufInfoArr[j] :
+            new CrcBufInfo(buf, crc);
+          
+          info.numMatches++;
+          if (info.numMatches >= (replication/2 + replication%2)) {
+            LOG.info("readCrcBuf: " + info.numMatches + 
+                     " out of " + replication + " matched for " + blockInfo);
+            return buf;
+          }
+
+          if ( j == numBufs ) {
+            bufInfoArr[ numBufs++ ] = info;
+          }
+          break;
+        }
+        
+        // Done reading crcSize bytes.
+        break;
+      }
+    }
+    
+    /* Now we have an error or some buffer that might not have 
+     * absolute majority.
+     * Try to pick the buffer that that has max number of matches.
+     */
+    CrcBufInfo selectedBuf = null;
+    for (int i=0; i<numBufs; i++) {
+      CrcBufInfo info = bufInfoArr[i];
+      if (selectedBuf == null || selectedBuf.numMatches < info.numMatches) {
+        selectedBuf = info;
+      }
+    }
+
+    if (selectedBuf == null) {      
+      throw new IOException("Could not fetch any crc data for " + block);
+    }
+
+    LOG.info("readCrcBuf: " + selectedBuf.numMatches + 
+             " out of " + replication + " matched for " + blockInfo);
+    
+    //Print a warning if numMatches is 1?
+    return  selectedBuf.buf;      
+  }
+  
+  /**
+   * Reads CRC data for a block from corresponding ".crc". Usually it 
+   * fetches CRC from blocks that belong to ".crc" file. When the given 
+   * block does not start or end on a checksum boundary, it would read data
+   * from ".crc" DFS itself (as opposed to directly fetching from blocks) and
+   * regenerate CRC for the block.
+   * 
+   * @param blockInfo
+   * @param bytesPerChecksum bytesPerChecksum from ".crc" file header 
+   * @return buffer containing CRC data for the block.
+   * @throws IOException
+   */
+  static byte[] readCrcBuf(DNBlockUpgradeInfo blockInfo, int bytesPerChecksum)
+                           throws IOException {
+    
+    BlockCrcInfo crcInfo = blockInfo.crcInfo;
+    
+    int checksumSize = 4; // CRC32.
+    
+    /* Following two arrayas are used in the case where block 
+     * does not fall on edges. This happens when 'dfs.block.size' is not
+     * a multiple of 'io.bytes.per.checksum'. Apparently it can also happen
+     * because of a known bug in DFSClient, but I am not sure. 
+     */
+    byte [] bytesBefore = new byte[(int)(crcInfo.startOffset % 
+                                         bytesPerChecksum)]; 
+    int tmpMod = (int) ((crcInfo.blockLen + bytesBefore.length) % 
+                        bytesPerChecksum);
+    if ( tmpMod != 0 ) {
+      tmpMod = (int) Math.min(bytesPerChecksum - tmpMod,
+                              (crcInfo.fileSize - crcInfo.startOffset - 
+                               crcInfo.blockLen));
+    }
+    byte [] bytesAfter = new byte[tmpMod];
+    
+    if ( bytesBefore.length > 0 || bytesAfter.length > 0 ) {
+      if ( bytesBefore.length > 0 ) {
+        readDfsFileData(crcInfo.fileName,
+                        crcInfo.startOffset-bytesBefore.length,
+                        bytesBefore.length,
+                        blockInfo.dataNode.getNameNodeAddr(),
+                        bytesBefore, 0);
+      }
+      if ( bytesAfter.length > 0 ) {
+        readDfsFileData(crcInfo.fileName,  
+                        crcInfo.startOffset+crcInfo.blockLen, 
+                        bytesAfter.length,
+                        blockInfo.dataNode.getNameNodeAddr(),                        
+                        bytesAfter, 0);
+      }
+    }
+    
+    // Now fetch the crc. XXX change 8 to HEADER_SIZE.
+    long crcStart = 8 + ( (crcInfo.startOffset-bytesBefore.length)/
+                          bytesPerChecksum * checksumSize );
+    
+    long tmpLen = crcInfo.blockLen + bytesBefore.length + bytesAfter.length;
+    int crcSize = (int) (tmpLen/bytesPerChecksum*checksumSize);
+    if (tmpLen % bytesPerChecksum > 0) {
+      crcSize += checksumSize;
+    }
+    
+    byte[] crcBuf = findMajorityCrcBuf(blockInfo, crcStart, crcSize);
+    
+    if ( bytesBefore.length > 0 || bytesAfter.length > 0 ) {
+      /* We have crc for data that is larger than the blocks data.
+       * So regenerate new CRC data.
+       */
+      crcBuf = regenerateCrcBuf(blockInfo, crcBuf, 
+                                bytesPerChecksum, bytesBefore, bytesAfter);
+    }
+    return crcBuf;
+  }
+  
+  /** Generates CRC data for a block by reading block data from the local
+   * storage. Usually this is used as the last resort option to generate
+   * metadata for a block.
+   * <br><br>
+   * 
+   * Bytes per checksum is based on config (io.bytes.per.checksum).
+   * 
+   * @param blockInfo Block information.
+   * @throws IOException
+   */
+  static void generateLocalCrcData( DNBlockUpgradeInfo blockInfo ) 
+                                   throws IOException {
+    
+    Block block = blockInfo.block;
+    File blockFile = blockInfo.dataNode.data.getBlockFile( block );
+    if (blockFile == null || !blockFile.exists()) {
+      throw new IOException("Could not local file for block");
+    }
+    long blockLen = blockFile.length();
+    if ( blockLen != blockFile.length()) {
+      LOG.warn("Mismatch in length for block: local file size is " +
+               blockLen + " but should be " + blockInfo.crcInfo.blockLen +
+               " for " + blockInfo + ". Using local file size for CRC");
+    }
+    int bytesPerChecksum = blockInfo.dataNode.defaultBytesPerChecksum;
+    DataChecksum checksum = 
+      DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+                                   bytesPerChecksum);
+    int checksumSize = checksum.getChecksumSize();
+    int crcBufSize = (int) ((blockLen/bytesPerChecksum)*checksumSize);
+    if (blockLen%bytesPerChecksum != 0) {
+      crcBufSize += checksumSize;
+    }
+    byte crcBuf[] = new byte[crcBufSize];
+
+    
+    InputStream in = null;
+    try {
+      in = new FileInputStream(blockFile);
+      
+      long totalRead = 0;
+      byte[] dataBuf = new byte[bytesPerChecksum];
+      int crcBufPos = 0;     
+      
+      while (totalRead < blockLen) {
+        int toRead = Math.min((int)(blockLen - totalRead), bytesPerChecksum);
+        FileUtil.readFully(in, dataBuf, 0, toRead );
+        
+        checksum.update(dataBuf, 0, toRead);
+        crcBufPos += checksum.writeValue(crcBuf, crcBufPos, true);
+        checksum.reset();
+        
+        totalRead += toRead;
+      }
+    } finally {
+      FileUtil.closeStream(in);
+    }
+    
+    writeCrcData(blockInfo, bytesPerChecksum, crcBuf);
+  }
+    
+  /**
+   * Fetches CRC data from a remote node. Sends READ_BLOCK_METADATA
+   * command. Extracts CRC information and returns it.
+   */
+  static byte[] readCrcFromReplica(DNBlockUpgradeInfo blockInfo,
+                                   DatanodeInfo dnInfo,
+                                   DataChecksum[] checksumArr
+                                   ) throws IOException {
+    Socket dnSock = null;
+    
+    String errMsg = "";
+    
+    try {
+      do {
+        InetSocketAddress dnAddr = DataNode.createSocketAddr(dnInfo.getName());
+        dnSock = new Socket();
+        dnSock.connect(dnAddr, FSConstants.READ_TIMEOUT);
+        dnSock.setSoTimeout(FSConstants.READ_TIMEOUT);
+
+        DataOutputStream out = new DataOutputStream(dnSock.getOutputStream());
+        DataInputStream in = new DataInputStream(dnSock.getInputStream());
+
+        // Write the header:
+        out.writeShort( DataNode.DATA_TRANFER_VERSION );
+        out.writeByte( DataNode.OP_READ_METADATA );
+        out.writeLong( blockInfo.block.getBlockId() );
+
+        byte reply = in.readByte();
+        if ( reply != DataNode.OP_STATUS_SUCCESS ) {
+          errMsg = "Got error(" + reply + ") in reply";
+          break;
+        }
+
+        // There is no checksum for this transfer.
+        int len = in.readInt();
+        int headerLen = 2 + DataChecksum.HEADER_LEN;
+        
+        if ( len < headerLen ) {
+          errMsg = "len is too short";
+          break;
+        }
+        
+        if ( len > 0 ) {
+          // Verify that version is same
+          short version = in.readShort();
+          if ( version != FSDataset.METADATA_VERSION ) {
+            errMsg = "Version mismatch";
+            break;
+          }
+
+          DataChecksum checksum = DataChecksum.newDataChecksum(in);
+          
+          if ( checksum.getChecksumType() != DataChecksum.CHECKSUM_CRC32 ) {
+            errMsg = "Checksum is not CRC32";
+            break;
+          }
+          
+          int crcBufLen = (int) (( (blockInfo.crcInfo.blockLen + 
+                                    checksum.getBytesPerChecksum()-1)/
+                                    checksum.getBytesPerChecksum() ) *
+                                  checksum.getChecksumSize());
+          
+          if ( (len - headerLen) != crcBufLen ) {
+            errMsg = "CRC data is too short";
+          }
+          byte[] crcBuf = new byte[crcBufLen];
+          in.readFully(crcBuf);
+          
+          //just read the last int
+          in.readInt();
+          
+          checksumArr[0] = checksum;
+          return crcBuf;
+        }
+      } while (false);
+    } finally {
+      FileUtil.closeSocket( dnSock );
+    }
+    
+    throw new IOException("Error while fetching CRC from replica on " +
+                          dnInfo.getName() + ": " + errMsg); 
+  }
+    
+    
+  /**
+   * Reads metadata from the replicas and writes the CRC from the
+   * first successful fetch.
+   */
+  static void readCrcFromReplicas(DNBlockUpgradeInfo blockInfo) 
+                                  throws IOException {
+    
+    /* Reads metadata from from the replicas */
+    DatanodeInfo[] dnArr = ( blockInfo.crcInfo.blockLocationsIncluded ? 
+                             blockInfo.crcInfo.blockLocations.getLocations() :
+                            new DatanodeInfo[0]);
+    
+    DataChecksum[] checksumArr = new DataChecksum[1];
+    IOException ioe = null;    
+    String myName = blockInfo.dataNode.dnRegistration.getName();
+    
+    for (DatanodeInfo dnInfo : dnArr) {
+      if ( dnInfo.getName().equals(myName) ) {
+        LOG.info("XXX skipping crcInfo fetch from " + dnInfo.getName());
+      } else {
+        try {
+          byte[] crcBuf = readCrcFromReplica(blockInfo, dnInfo, checksumArr);
+          LOG.info("read crcBuf from " + dnInfo.getName() + " for " +
+                   blockInfo);
+          
+          writeCrcData(blockInfo, checksumArr[0].getBytesPerChecksum(), 
+                       crcBuf);
+          return;
+        } catch (IOException e) {
+          LOG.warn("Could not fetch crc data from " + dnInfo.getName() +
+                   " : " + e);
+          ioe = e;
+        }
+      }
+    }
+    
+    if ( ioe != null ) {
+      throw ioe;
+    }
+    
+    throw new IOException("Could not fetch crc data from any node");
+  }
+  
+  
+  /**
+   * The method run by the upgrade threads. It contacts namenode for 
+   * information about the block and invokes appropriate method to create 
+   * metadata file for the block.
+   * 
+   * @param blockInfo
+   */
+  static void upgradeBlock( DNBlockUpgradeInfo blockInfo ) {
+
+    UpgradeCommand ret = null;
+    
+    if ( blockInfo.offlineUpgrade ) {
+      blockInfo.crcInfo = getBlockLocations(blockInfo.namenode, 
+                                            blockInfo.block);
+    } else { 
+      ret = sendCommand(blockInfo.namenode, new CrcInfoCommand(blockInfo.block),
+                        -1);
+    }
+    
+    /* Should be removed.
+    if (true) {
+      int sleepTime = (new Random(blockInfo.block.getBlockId())).nextInt(10);
+       BlockCrcUpgradeUtils.sleep(sleepTime,"XXX before upgrading the block");
+    } */
+    
+    try {
+     
+      if ( !blockInfo.offlineUpgrade ) {
+        if ( ret == noUpgradeOnNamenode ) {
+          throw new IOException("No upgrade is running on Namenode");
+        }
+
+        if ( ret == null || ((CrcInfoCommandReply)ret).crcInfo == null ) {
+          throw new IOException("Could not get crcInfo from Namenode");
+        }
+
+        blockInfo.crcInfo = ((CrcInfoCommandReply)ret).crcInfo;
+      }
+      
+      if ( blockInfo.crcInfo == null ) {
+        throw new IOException("Could not fetch crcInfo for " + 
+                              blockInfo.block);
+      }
+      
+      switch (blockInfo.crcInfo.status) {
+      
+      case BlockCrcInfo.STATUS_DATA_BLOCK:
+        try {
+          if (blockInfo.offlineUpgrade) {
+            readCrcFromReplicas(blockInfo);
+          } else {
+            readCrcFileData(blockInfo);
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception in " + 
+                   ((blockInfo.offlineUpgrade) ? 
+                    "readCrcFromReplicas()" : "readCrcFileData()") + 
+                   " for " + blockInfo + ". will try to generate local crc.");
+          throw e;
+        }
+        break;
+        
+      case BlockCrcInfo.STATUS_NO_CRC_DATA:
+        generateLocalCrcData(blockInfo);
+        break;
+        
+      case BlockCrcInfo.STATUS_UNKNOWN_BLOCK:
+        LOG.info("block is already deleted. Will create an empty " +
+                 "metadata file for " + blockInfo);
+        writeCrcData(blockInfo, 0, null);
+        break;
+        
+      case BlockCrcInfo.STATUS_CRC_BLOCK :
+        writeCrcData(blockInfo, 0, null);
+        break;
+        
+      case BlockCrcInfo.STATUS_ERROR:
+        LOG.info("unknown error. will generate local crc data for " + 
+                 blockInfo);
+        generateLocalCrcData(blockInfo);
+        break;
+        
+      default:
+        LOG.error("Unknown status from Namenode for " + blockInfo);
+        assert false : "Unknown status from Namenode";
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not fetch crc for " + blockInfo + 
+               " will generate local crc data : exception :" + 
+               StringUtils.stringifyException(e));
+      try {
+        // last option:
+        generateLocalCrcData(blockInfo);
+      } catch (IOException ioe) {
+        LOG.warn("Could not generate local crc data for " + blockInfo +
+                 " : exception : " + StringUtils.stringifyException(ioe));
+      }
+    }
+  }
+
+  static UpgradeCommand noUpgradeOnNamenode = new UpgradeCommand();
+  
+  /** 
+   * retries command in case of timeouts. <br>
+   * If retries is < 0, retries forever. <br>
+   * NOTE: Does not throw an exception. <br>
+   */
+  static UpgradeCommand sendCommand(DatanodeProtocol namenode,
+                                    UpgradeCommand cmd, int retries) {
+    for(int i=0; i<=retries || retries<0; i++) {
+      try {
+        UpgradeCommand reply = namenode.processUpgradeCommand(cmd);
+        if ( reply == null ) {
+          /* namenode might not be running upgrade or finished
+           * an upgrade. We just return a static object */
+          return noUpgradeOnNamenode;
+        }
+        return reply;
+      } catch (IOException e) {
+        // print the stack trace only for the last retry.
+        LOG.warn("Exception while sending command " + 
+                 cmd.getAction() + ": " + e +
+                 ((retries<0 || i>=retries)? "... will retry ..." : 
+                   ": " + StringUtils.stringifyException(e)));
+      }
+    }
+    return null; 
+  }
+  
+  // Similar to sendCommand(). Invokes command in a loop.
+  static BlockCrcInfo getBlockLocations(DatanodeProtocol namenode, 
+                                        Block block) {
+    for (;;) {
+      try {
+        return namenode.blockCrcUpgradeGetBlockLocations(block);
+      } catch (IOException e) {
+        LOG.warn("Exception while fetching block Locations from namenode: " +
+                 e + " ... will retry ...");
+      }
+    }
+  }
+  
+    
+  /** sleep method that catches and swallows InterruptedException
+   */
+  static void sleep(int seconds, String message) {
+    if ( message != null ) {
+      LOG.info("XXX Sleeping for " + seconds + " seconds. msg: " + message);
+    }
+    try {
+      Thread.sleep(seconds*1000L);
+    } catch (InterruptedException ignored) {}
+  }
+  
+  /* Upgrade commands */
+  static final int DN_CMD_STATS = 200;
+  static final int DN_CMD_CRC_INFO = 201;
+  static final int DN_CMD_CRC_INFO_REPLY = 202;
+  
+  // what is this version for?
+  static final int DN_CMD_VERSION = PRE_BLOCK_CRC_LAYOUT_VERSION;
+
+  static class DatanodeStatsCommand extends UpgradeCommand {
+    DatanodeID datanodeId;
+    int blocksUpgraded;
+    int blocksRemaining;
+    int errors;
+    
+    DatanodeStatsCommand() {
+      super(DN_CMD_STATS, DN_CMD_VERSION, (short)0);
+      datanodeId = new DatanodeID();
+    }
+    
+    public DatanodeStatsCommand(short status, DatanodeID dn,
+                                int blocksUpgraded, int blocksRemaining,
+                                int errors) {
+      super(DN_CMD_STATS, DN_CMD_VERSION, status);
+      
+      //copy so that only ID part gets serialized
+      datanodeId = new DatanodeID(dn); 
+      this.blocksUpgraded = blocksUpgraded;
+      this.blocksRemaining = blocksRemaining;
+      this.errors = errors;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      datanodeId.readFields(in);
+      blocksUpgraded = in.readInt();
+      blocksRemaining = in.readInt();
+      errors = in.readInt();
+    }
+
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      datanodeId.write(out);
+      out.writeInt(blocksUpgraded);
+      out.writeInt(blocksRemaining);
+      out.writeInt(errors);
+    }
+  }
+
+  static class CrcInfoCommand extends UpgradeCommand {
+    Block block;
+    
+    public CrcInfoCommand() {
+      super(DN_CMD_CRC_INFO, DN_CMD_VERSION, (short)0);      
+      block = new Block();
+    }
+    
+    public CrcInfoCommand(Block blk) {
+      // We don't need status
+      super(DN_CMD_CRC_INFO, DN_CMD_VERSION, (short)0);
+      block = blk;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      block.readFields(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      block.write(out);
+    }
+  }
+  
+  static class CrcInfoCommandReply extends UpgradeCommand {
+    BlockCrcInfo crcInfo;
+    
+    public CrcInfoCommandReply(){
+      super(DN_CMD_CRC_INFO_REPLY, DN_CMD_VERSION, (short)0);      
+      crcInfo = new BlockCrcInfo();
+    }
+    
+    public CrcInfoCommandReply(BlockCrcInfo info) {
+      super(DN_CMD_CRC_INFO_REPLY, DN_CMD_VERSION, (short)0);
+      crcInfo = info;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      crcInfo.readFields(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      crcInfo.write(out);
+    }
+  }
+
+  static class BlockCrcUpgradeStatusReport extends UpgradeStatusReport {
+
+    String extraText = "";
+    
+    public BlockCrcUpgradeStatusReport() {
+    }
+    
+    public BlockCrcUpgradeStatusReport(int version, short status,
+                                       String extraText) {
+      super(version, status);
+      this.extraText = extraText;
+    }
+    
+    @Override
+    public String getStatusText(boolean details) {
+      return super.getStatusText(details) + "\n\n" + extraText;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      extraText = Text.readString(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      Text.writeString(out, extraText);
+    }
+  }
+}
+
+/**
+ * This class checks all the blocks and upgrades any that do not have 
+ * meta data associated with them.
+ * 
+ * Things to consider : 
+ *   Should it rescan at the end of the loop?
+ */
+class BlockCrcUpgradeObjectDatanode extends UpgradeObjectDatanode {
+
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.BlockCrcUpgradeObjectDatanode");
+
+  DatanodeProtocol namenode;
+  
+  // stats
+  int blocksPreviouslyUpgraded;
+  int blocksToUpgrade;
+  int blocksUpgraded;
+  int errors;
+
+  //This should be a config. set it to 5 otherwise.
+  static final int poolSize = 5;
+  
+  List<UpgradeExecutor> completedList = new LinkedList<UpgradeExecutor>();
+
+  /* this is set when the datanode misses the regular upgrade.
+   * When this is set, it upgrades the block by reading metadata from
+   * the other replicas.
+   */
+  boolean offlineUpgrade = false;
+  boolean upgradeCompleted = false;
+  
+  boolean isOfflineUpgradeOn() {
+    return offlineUpgrade;
+  }
+  
+  // common upgrade interface:
+  
+  public int getVersion() {
+    return BlockCrcUpgradeUtils.PRE_BLOCK_CRC_LAYOUT_VERSION;
+  }
+
+  /*
+   * Start upgrade if it not already running. It sends status to
+   * namenode even if an upgrade is already in progress.
+   */
+  public synchronized UpgradeCommand startUpgrade() throws IOException {
+
+    if ( offlineUpgrade ) {
+      //run doUpgrade here.
+      doUpgrade();
+    }
+    
+    return null; 
+  }
+
+  
+  @Override
+  public String getDescription() {
+    return "Block CRC Upgrade at Datanode";
+  }
+
+  @Override
+  public short getUpgradeStatus() {
+    return (blocksToUpgrade == blocksUpgraded) ? 100 :
+      (short) Math.floor(blocksUpgraded*100.0/blocksToUpgrade);
+  }
+
+  public UpgradeCommand completeUpgrade() throws IOException {
+    // return latest stats command.
+    assert getUpgradeStatus() == 100;
+    return new BlockCrcUpgradeUtils.
+               DatanodeStatsCommand(getUpgradeStatus(),                                 
+                                    getDatanode().dnRegistration,
+                                    blocksPreviouslyUpgraded + blocksUpgraded,
+                                    blocksToUpgrade-blocksUpgraded,
+                                    errors); 
+  }
+  
+  
+  // see description for super.preUpgradeAction().
+  @Override
+  boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
+    int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
+    if(nsUpgradeVersion >= getVersion()) {
+      return false; // Normal upgrade.
+    }
+    
+    LOG.info("\n  This Datanode has missed a cluster wide Block CRC Upgrade." +
+             "\n  Will perform an 'offline' upgrade of the blocks." +
+             "\n  During this time, Datanode does not heartbeat.");
+    
+    DataNode dataNode = getDatanode();
+    
+    //Make sure namenode removes this node from the registered nodes
+    try {
+      // Should we add another error type? Right now only DISK_ERROR removes it
+      // from node list. 
+      dataNode.namenode.errorReport(dataNode.dnRegistration,
+                                    DatanodeProtocol.NOTIFY, 
+                                    "Performing an offline upgrade. " +
+                                    "Will be back online once the ugprade " +
+                                    "completes. Please see datanode logs.");
+      
+    } catch(IOException ignored) {}
+    
+    offlineUpgrade = true;
+    return true;
+  }
+
+  public BlockCrcUpgradeObjectDatanode() {
+  }
+  
+  class UpgradeExecutor implements Runnable {
+    Block block;
+    Throwable throwable;
+    
+    UpgradeExecutor( Block b ) {
+      block = b;
+    }
+    public void run() {
+      try {
+        DNBlockUpgradeInfo blockInfo = new DNBlockUpgradeInfo();
+        blockInfo.block = block;
+        blockInfo.dataNode = getDatanode();
+        blockInfo.namenode = namenode;
+        blockInfo.offlineUpgrade = offlineUpgrade;
+        BlockCrcUpgradeUtils.upgradeBlock( blockInfo );
+      } catch ( Throwable t ) {
+        throwable = t;
+      }
+      synchronized (completedList) {
+        completedList.add( this );
+        completedList.notify();
+      }
+    }
+  }
+  
+  void doUpgrade() throws IOException {
+    
+    if ( upgradeCompleted ) {
+      assert offlineUpgrade : 
+             ("Multiple calls to doUpgrade is expected only during " +
+              "offline upgrade");
+      return;
+    }
+    
+    FSDataset dataset = getDatanode().data;
+
+    // Set up the retry policy so that each attempt waits for one minute.
+    Configuration conf = new Configuration();
+    // set rpc timeout to one minute.
+    conf.set("ipc.client.timeout", "60000");
+    
+    RetryPolicy timeoutPolicy = 
+       RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+               BlockCrcUpgradeUtils.LONG_TIMEOUT_MINUTES,
+               1, TimeUnit.MILLISECONDS);
+
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap = 
+                            new HashMap<String, RetryPolicy>();
+    // do we need to set the policy for connection failures also? 
+    methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy);
+    methodNameToPolicyMap.put("blockCrcUpgradeGetBlockLocations", methodPolicy);
+    
+    LOG.info("Starting Block CRC upgrade.");
+    
+    namenode = (DatanodeProtocol) RetryProxy.create(
+                    DatanodeProtocol.class,
+                    RPC.waitForProxy(DatanodeProtocol.class,
+                                     DatanodeProtocol.versionID,
+                                     getDatanode().getNameNodeAddr(),
+                                     conf),
+                    methodNameToPolicyMap);
+                                  
+    conf = null;
+   
+    // Get a list of all the blocks :
+   
+    LinkedList<UpgradeExecutor> blockList = new LinkedList<UpgradeExecutor>();
+    
+    //Fill blockList with blocks to be upgraded.
+    Block [] blockArr = dataset.getBlockReport();
+    
+    for ( Block b : blockArr ) {
+      File blockFile = dataset.getBlockFile( b );
+      if (!blockFile.exists()) {
+        LOG.error("could not find block file " + blockFile);
+        errors++;
+        continue;
+      }
+      File metaFile = FSDataset.getMetaFile( blockFile );
+      if ( metaFile.exists() ) {
+        blocksPreviouslyUpgraded++;
+      } else {
+        blocksToUpgrade++;
+        blockList.add( new UpgradeExecutor( b ) );
+      }
+    }
+    blockArr = null;
+    
+    ExecutorService pool = Executors.newFixedThreadPool( poolSize );
+    
+    LOG.info("Starting upgrade of " + blocksToUpgrade + " blocks out of " +
+             (blocksToUpgrade + blocksPreviouslyUpgraded));
+    
+    // Do we need to do multiple retries for each upgrade?
+    
+    for (Iterator<UpgradeExecutor> it = blockList.iterator(); it.hasNext();) {
+      pool.submit( it.next() );
+    }
+
+    // Inform the namenode
+    sendStatus();
+    
+    // Now wait for the tasks to complete.
+    int nLeft = blockList.size();
+    
+    long now = System.currentTimeMillis();
+    // Report status to namenode every so many seconds:
+    long statusReportIntervalMilliSec = 60*1000;
+    long lastStatusReportTime = now;
+    long lastUpdateTime = now;
+    long lastWarnTime = now;
+    
+    while ( nLeft > 0 ) {
+      synchronized ( completedList ) {
+        if ( completedList.size() <= 0 ) {
+          try {
+            completedList.wait(1000);
+          } catch (InterruptedException ignored) {}
+        }
+        
+        now = System.currentTimeMillis();
+        
+        if ( completedList.size() > 0 ) {
+          UpgradeExecutor exe = completedList.remove(0);
+          nLeft--;
+          if ( exe.throwable != null ) {
+            errors++;
+            LOG.error("Got an exception during upgrade of " +
+                      exe.block + ": " + 
+                      StringUtils.stringifyException( exe.throwable ));
+          }
+          blocksUpgraded++;
+          lastUpdateTime = now;
+        } else {
+          if ((now - lastUpdateTime) >= 
+                BlockCrcUpgradeUtils.LONG_TIMEOUT_MILLISEC &&
+              (now - lastWarnTime) >= 
+                BlockCrcUpgradeUtils.LONG_TIMEOUT_MILLISEC) {
+            lastWarnTime = now;
+            LOG.warn("No block was updated in last " +
+                      BlockCrcUpgradeUtils.LONG_TIMEOUT_MINUTES +
+                      " minutes! will keep waiting... ");
+          }  
+        } 
+      }
+      
+      if ( (now-lastStatusReportTime) > statusReportIntervalMilliSec ) {
+        sendStatus();
+        lastStatusReportTime = System.currentTimeMillis();
+      }
+    }
+
+    upgradeCompleted = true;
+    
+    LOG.info("Completed BlockCrcUpgrade. total of " + 
+             (blocksPreviouslyUpgraded + blocksToUpgrade) +
+             " blocks : " + blocksPreviouslyUpgraded + " blocks previously " +
+             "upgraded, " + blocksUpgraded + " blocks upgraded this time " +
+             "with " + errors + " errors.");       
+
+    // now inform the name node about the completion.
+
+    // What if there is no upgrade running on Namenode now?
+    while (!sendStatus());
+    
+  }
+  
+  /** Sends current status and stats to namenode and logs it to local log*/ 
+  boolean sendStatus() {
+    BlockCrcUpgradeUtils.DatanodeStatsCommand cmd = null;
+    
+    LOG.info((offlineUpgrade ? "Offline " : "") + "Block CRC Upgrade : " + 
+             getUpgradeStatus() + "% completed.");
+    
+    if (offlineUpgrade) {
+      return true;
+    }
+    
+    synchronized (this) {
+      cmd = new BlockCrcUpgradeUtils.
+      DatanodeStatsCommand(getUpgradeStatus(),                                 
+                           getDatanode().dnRegistration,
+                           blocksPreviouslyUpgraded + blocksUpgraded,
+                           blocksToUpgrade-blocksUpgraded,
+                           errors);
+    }
+
+    UpgradeCommand reply = BlockCrcUpgradeUtils.sendCommand(namenode, cmd, 0);
+    if ( reply == null ) {
+      LOG.warn("Could not send status to Namenode. Namenode might be " +
+               "over loaded or down.");
+    }
+
+    // its ok even if reply == noUpgradeOnNamenode
+    return reply != null;
+  }
+}
+
+/**
+ * Once an upgrade starts at the namenode , this class manages the upgrade 
+ * process.
+ */
+class BlockCrcUpgradeObjectNamenode extends UpgradeObjectNamenode {
+  
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.BlockCrcUpgradeNamenode");
+  
+  static final long inactivityExtension = 10*1000; // 10 seconds
+  long lastNodeCompletionTime = 0;
+  
+  enum UpgradeStatus {
+    INITIALIZED,
+    STARTED,
+    DATANODES_DONE,
+    COMPLETED,
+  }
+  
+  UpgradeStatus upgradeStatus = UpgradeStatus.INITIALIZED;
+  
+  class DnInfo { 
+    short percentCompleted = 0;
+    long blocksUpgraded = 0;
+    long blocksRemaining = 0;
+    long errors = 0;
+    
+    DnInfo(short pcCompleted) {
+      percentCompleted = status;
+    }
+    DnInfo() {}
+    
+    void setStats(BlockCrcUpgradeUtils.DatanodeStatsCommand cmd) {
+      percentCompleted = cmd.getCurrentStatus();
+      blocksUpgraded = cmd.blocksUpgraded;
+      blocksRemaining = cmd.blocksRemaining;
+      errors = cmd.errors;
+    }
+    
+    boolean isDone() {
+      return percentCompleted >= 100;
+    }
+  }
+  
+  /* We should track only the storageIDs and not DatanodeID, which
+   * includes datanode name and storage id.
+   */
+  HashMap<DatanodeID, DnInfo> dnMap = new HashMap<DatanodeID, DnInfo>();
+  HashMap<DatanodeID, DnInfo> unfinishedDnMap = 
+                                      new HashMap<DatanodeID, DnInfo>();  
+
+  Daemon monitorThread;
+  double avgDatanodeCompletionPct = 0;
+  
+  boolean forceDnCompletion = false;
+  
+  //Upgrade object interface:
+  
+  public int getVersion() {
+    return BlockCrcUpgradeUtils.PRE_BLOCK_CRC_LAYOUT_VERSION;
+  }
+
+  public UpgradeCommand completeUpgrade() throws IOException {
+    return null;
+  }
+ 
+  @Override
+  public String getDescription() {
+    return "Block CRC Upgrade at Namenode"; 
+  }
+
+  @Override
+  public synchronized short getUpgradeStatus() {
+    // Reserve 10% for deleting files.
+    if ( upgradeStatus == UpgradeStatus.COMPLETED ) {
+      return 100;
+    }   
+    if ( upgradeStatus == UpgradeStatus.DATANODES_DONE ) {
+      return 90;
+    }
+    
+    return (short) Math.floor(avgDatanodeCompletionPct * 0.9);
+  }
+
+  public UpgradeCommand startUpgrade() throws IOException {
+    
+    assert monitorThread == null;
+    
+    lastNodeCompletionTime = System.currentTimeMillis();
+    
+    monitorThread = new Daemon(new UpgradeMonitor());
+    monitorThread.start();    
+    
+    return super.startUpgrade();
+  }
+  
+  @Override
+  public synchronized void forceProceed() throws IOException {    
+    if (isUpgradeDone() || 
+        upgradeStatus == UpgradeStatus.DATANODES_DONE) {
+      LOG.info("forceProceed is a no op now since the stage waiting " +
+               "waiting for Datanode to completed is finished. " +
+               "Upgrade should soon complete");
+      return;
+    }
+    
+    if (forceDnCompletion) {
+      LOG.warn("forceProceed is already set for this upgrade. It can take " +
+               "a short while to take affect. Please wait.");
+      return;
+    }
+    
+    LOG.info("got forceProceed request for this upgrade. Datanodes upgrade " +
+             "will be considered done. It can take a few seconds to take " +
+             "effect.");
+    forceDnCompletion = true;
+  }
+
+  @Override
+  UpgradeCommand processUpgradeCommand(UpgradeCommand command) 
+                                           throws IOException {
+    switch (command.getAction()) {
+
+    case BlockCrcUpgradeUtils.DN_CMD_CRC_INFO :
+      return handleCrcInfoCmd(command);
+    
+    case BlockCrcUpgradeUtils.DN_CMD_STATS :
+      return handleStatsCmd(command);
+
+     default:
+       throw new IOException("Unknown Command for BlockCrcUpgrade : " +
+                             command.getAction());
+    }
+  }
+
+  @Override
+  public UpgradeStatusReport getUpgradeStatusReport(boolean details) 
+                                                    throws IOException {
+
+    /* If 'details' is true should we update block level status?
+     * It could take multiple minutes
+     * updateBlckLevelStats()?
+     */
+    
+    String replyString = "";
+    
+    short status = 0;
+    
+    synchronized (this) {
+     
+      status = getUpgradeStatus();
+     
+      replyString = String.format(
+      ((monitorThread == null) ? "\tUpgrade has not been started yet.\n" : "")+
+      ((forceDnCompletion) ? "\tForce Proceed is ON\n" : "") +
+      "\tLast Block Level Stats updated at : %tc\n" +
+      "\tLast Block Level Stats : %s\n" +
+      "\tBrief Datanode Status  : %s\n" +
+      "%s",
+      latestBlockLevelStats.updatedAt,
+      latestBlockLevelStats.statusString("\n\t                         "), 
+      printStatus("\n\t                         "), 
+      ((status < 100 && upgradeStatus == UpgradeStatus.DATANODES_DONE) ?
+      "\tNOTE: Upgrade at the Datanodes has finished. Deleteing \".crc\" " +
+      "files\n\tcan take longer than status implies.\n" : "")
+      );
+      
+      if (details) {
+        // list all the known data nodes
+        StringBuilder str = null;
+        Iterator<DatanodeID> keys = dnMap.keySet().iterator();
+        Iterator<DnInfo> values = dnMap.values().iterator();
+        
+        for(; keys.hasNext() && values.hasNext() ;) {
+          DatanodeID dn = keys.next();
+          DnInfo info = values.next();
+          String dnStr = "\t\t" + dn.getName() + "\t : " + 
+                         info.percentCompleted + " % \t" +
+                         info.blocksUpgraded + " u \t" +
+                         info.blocksRemaining + " r \t" +
+                         info.errors + " e\n";
+          if ( str == null ) {
+            str = new StringBuilder(dnStr.length()*
+                                    (dnMap.size() + (dnMap.size()+7)/8));
+          }
+          str.append(dnStr);
+        }
+        
+        replyString += "\n\tDatanode Stats (total: " + dnMap.size() + "): " +
+                       "pct Completion(%) blocks upgraded (u) " +
+                       "blocks remaining (r) errors (e)\n\n" +
+                       (( str == null ) ?
+                        "\t\tThere are no known Datanodes\n" : str);
+      }      
+    }
+    
+    return new BlockCrcUpgradeUtils.BlockCrcUpgradeStatusReport(
+                   BlockCrcUpgradeUtils.PRE_BLOCK_CRC_LAYOUT_VERSION,
+                   status, replyString);
+  }
+
+  private UpgradeCommand handleCrcInfoCmd(UpgradeCommand cmd) {
+    BlockCrcUpgradeUtils.CrcInfoCommand crcCmd =
+                   (BlockCrcUpgradeUtils.CrcInfoCommand)cmd;
+    
+    BlockCrcInfo crcInfo = getFSNamesystem().blockCrcInfo(crcCmd.block,
+                                                          false);
+    return new BlockCrcUpgradeUtils.CrcInfoCommandReply(crcInfo);
+  }
+  
+  private synchronized UpgradeCommand handleStatsCmd(UpgradeCommand cmd) {
+    
+    BlockCrcUpgradeUtils.DatanodeStatsCommand stats =
+      (BlockCrcUpgradeUtils.DatanodeStatsCommand)cmd;
+    
+    DatanodeID dn = stats.datanodeId;
+    DnInfo dnInfo = dnMap.get(dn);
+    boolean alreadyCompleted = (dnInfo != null && dnInfo.isDone());
+    
+    if ( dnInfo == null ) {
+      dnInfo = new DnInfo();
+      dnMap.put(dn, dnInfo);
+      LOG.info("Upgrade started/resumed at datanode " + dn.getName());  
+    }
+    
+    dnInfo.setStats(stats);
+
+    if ( !dnInfo.isDone() ) {
+      unfinishedDnMap.put(dn, dnInfo);
+    }
+    
+    if ( dnInfo.isDone() && !alreadyCompleted ) {
+      LOG.info("upgrade completed on datanode " + dn.getName());      
+      unfinishedDnMap.remove(dn);
+      if (unfinishedDnMap.size() == 0) {
+        lastNodeCompletionTime = System.currentTimeMillis();
+      }
+    }   
+    
+    //Should we send any more info?
+    return new UpgradeCommand();
+  }
+  
+  public BlockCrcUpgradeObjectNamenode() {
+  }
+  
+  // For now we will wait for all the nodes to complete upgrade.
+  synchronized boolean isUpgradeDone() {
+    return upgradeStatus == UpgradeStatus.COMPLETED;    
+  }
+  
+  synchronized String printStatus(String spacing) {
+    //NOTE: iterates on all the datanodes.
+    
+    // Calculate % completion on all the data nodes.
+    long errors = 0;
+    long totalCompletion = 0;
+    for( Iterator<DnInfo> it = dnMap.values().iterator(); it.hasNext(); ) {
+      DnInfo dnInfo = it.next();
+      totalCompletion += dnInfo.percentCompleted;            
+      errors += dnInfo.errors;
+    }
+    
+    avgDatanodeCompletionPct = totalCompletion/(dnMap.size() + 1e-20);
+    
+    String msg = "Avg completion on Datanodes: " +              
+                 String.format("%.2f%%", avgDatanodeCompletionPct) +
+                 " with " + errors + " errors. " +
+                 ((unfinishedDnMap.size() > 0) ? spacing + 
+                   unfinishedDnMap.size() + " out of " + dnMap.size() +
+                   " nodes are not done." : "");
+                 
+    LOG.info("Upgrade " + (isUpgradeDone() ? 
+             "completed. " : "still running. ") + spacing + msg);
+    return msg;
+  }
+  
+  private synchronized void setStatus(UpgradeStatus status) {
+    upgradeStatus = status;
+  }
+
+  /* Checks if upgrade completed based on datanode's status and/or 
+   * if all the blocks are upgraded.
+   */
+  private synchronized UpgradeStatus checkOverallCompletion() {
+    
+    if (upgradeStatus == UpgradeStatus.COMPLETED ||
+        upgradeStatus == UpgradeStatus.DATANODES_DONE) {
+      return upgradeStatus;
+    }
+    
+    if (upgradeStatus != UpgradeStatus.DATANODES_DONE) {
+      boolean datanodesDone =
+        (dnMap.size() > 0 && unfinishedDnMap.size() == 0 &&
+         ( System.currentTimeMillis() - lastNodeCompletionTime ) > 
+        inactivityExtension) || forceDnCompletion ;
+                 
+      if ( datanodesDone ) {
+        LOG.info(((forceDnCompletion) ? "Forcing " : "Marking ") +
+                 "Datanode Upgrade done");
+        upgradeStatus = UpgradeStatus.DATANODES_DONE;
+      }
+    }
+    
+    if (upgradeStatus != UpgradeStatus.DATANODES_DONE &&
+        latestBlockLevelStats.updatedAt > 0) {
+      // check if last block report marked all
+      if (latestBlockLevelStats.minimallyReplicatedBlocks == 0 &&
+          latestBlockLevelStats.underReplicatedBlocks == 0) {
+        
+        LOG.info("Marking datanode upgrade complete since all the blocks are " +
+                 "upgraded (even though some datanodes may not have " +
+                 "reported completion. Block level stats :\n\t" +
+                 latestBlockLevelStats.statusString("\n\t"));
+        upgradeStatus = UpgradeStatus.DATANODES_DONE;
+      }
+    }
+    
+    return upgradeStatus;
+  } 
+    
+  /**
+   * This class monitors the upgrade progress and periodically prints 
+   * status message to log.
+   */
+  class UpgradeMonitor implements Runnable {
+    
+    static final long statusReportIntervalMillis = 1*60*1000;
+    static final long blockReportIntervalMillis = 5*60*1000;
+    static final int sleepTimeSec = 1;
+    
+    public void run() {
+      long lastReportTime = System.currentTimeMillis();
+      long lastBlockReportTime = lastReportTime;
+      
+      while ( !isUpgradeDone() ) {
+        UpgradeStatus status = checkOverallCompletion();
+        
+        if ( status == UpgradeStatus.DATANODES_DONE ) {
+          deleteCrcFiles();
+          setStatus(UpgradeStatus.COMPLETED);
+        }
+        
+        long now = System.currentTimeMillis();
+        
+        
+        if (now-lastBlockReportTime >= blockReportIntervalMillis) {
+          updateBlockLevelStats();
+          // Check if all the blocks have been upgraded.
+          lastBlockReportTime = now;
+        }
+        
+        if ((now - lastReportTime) >= statusReportIntervalMillis || 
+            isUpgradeDone()) {
+          printStatus("\n\t");
+          lastReportTime = now;
+        }
+        
+        BlockCrcUpgradeUtils.sleep(sleepTimeSec, null);
+      }
+      
+      LOG.info("Leaving the monitor thread");
+    }
+  }
+  
+  private BlockLevelStats latestBlockLevelStats = new BlockLevelStats();
+  // internal class to hold the stats.
+  private static class BlockLevelStats {
+    long fullyReplicatedBlocks = 0;
+    long minimallyReplicatedBlocks = 0;
+    long underReplicatedBlocks = 0; // includes unReplicatedBlocks
+    long unReplicatedBlocks = 0; // zero replicas upgraded
+    long errors;
+    long updatedAt;
+    
+    String statusString(String spacing) {
+      long totalBlocks = fullyReplicatedBlocks + 
+                         minimallyReplicatedBlocks +
+                         underReplicatedBlocks;
+      double multiplier = 100/(totalBlocks + 1e-20);
+      
+      if (spacing.equals("")) {
+        spacing = ", ";
+      }
+      
+      return String.format(
+                     "Total Blocks : %d" +
+                     "%sFully Upgragraded : %.2f%%" +
+                     "%sMinimally Upgraded : %.2f%%" +
+                     "%sUnder Upgraded : %.2f%% (includes Un-upgraded blocks)" +
+                     "%sUn-upgraded : %.2f%%" + 
+                     "%sErrors : %d", totalBlocks, 
+                     spacing, (fullyReplicatedBlocks * multiplier),
+                     spacing, (minimallyReplicatedBlocks * multiplier),
+                     spacing, (underReplicatedBlocks * multiplier),
+                     spacing, (unReplicatedBlocks * multiplier),
+                     spacing, errors);
+    }
+  }
+  
+  void updateBlockLevelStats(String path, BlockLevelStats stats) {
+    DFSFileInfo[] fileArr = getFSNamesystem().getListing(path);
+    
+    for (DFSFileInfo file:fileArr) {
+      if (file.isDir()) {
+        updateBlockLevelStats(file.getPath(), stats);
+      } else {
+        // Get the all the blocks.
+        LocatedBlocks blockLoc = null;
+        try {
+          blockLoc = getFSNamesystem().getBlockLocations(null, file.getPath(), 0, 
+                                                  file.getLen());
+          int numBlocks = blockLoc.locatedBlockCount();
+          for (int i=0; i<numBlocks; i++) {
+            LocatedBlock loc = blockLoc.get(i);
+            DatanodeInfo[] dnArr = loc.getLocations();
+            int numUpgraded = 0;
+            synchronized (this) {
+              for (DatanodeInfo dn:dnArr) {
+                DnInfo dnInfo = dnMap.get(dn);
+                if (dnInfo != null && dnInfo.isDone()) {
+                  numUpgraded++;
+                }
+              }
+            }
+            
+            if (numUpgraded >= file.getReplication()) {
+              stats.fullyReplicatedBlocks++;
+            } else if (numUpgraded >= getFSNamesystem().getMinReplication()) {
+              stats.minimallyReplicatedBlocks++;
+            } else {
+              stats.underReplicatedBlocks++;
+            }
+            if (numUpgraded == 0) {
+              stats.unReplicatedBlocks++;
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("BlockCrcUpgrade: could not get block locations for " +
+                    file.getPath() + " : " + StringUtils.stringifyException(e));
+          stats.errors++;
+        }
+      }
+    }
+  }
+  
+  void updateBlockLevelStats() {
+    /* This iterates over all the blocks and updates various 
+     * counts.
+     * Since iterating over all the blocks at once would be quite 
+     * large operation under lock, we iterate over all the files
+     * and update the counts for blocks that belong to a file.
+     */
+      
+    LOG.info("Starting update of block level stats. " +
+             "This could take a few minutes");
+    BlockLevelStats stats = new BlockLevelStats();
+    updateBlockLevelStats("/", stats);
+    stats.updatedAt = System.currentTimeMillis();
+    
+    LOG.info("Block level stats:\n\t" + stats.statusString("\n\t"));
+    synchronized (this) {
+      latestBlockLevelStats = stats;
+    }
+  }
+  
+  private int deleteCrcFiles(String path) {
+    // Recursively deletes files
+    DFSFileInfo[] fileArr = getFSNamesystem().getListing(path);
+    
+    int numFilesDeleted = 0;
+    
+    HashSet<String> fileSet = new HashSet<String>();
+    
+    // build a small hashMap
+    for ( DFSFileInfo file:fileArr ) {
+      String name = file.getName();
+      if (!file.isDir() && (!name.startsWith(".") || 
+                            !name.endsWith(".crc"))) {
+        fileSet.add(name);
+      }
+    }
+    
+    for ( DFSFileInfo file:fileArr ) {
+      if ( !file.isDir() ) {
+        String name = file.getName();
+        int extraLen = ".".length() + ".crc".length();
+        if (name.startsWith(".") && name.endsWith(".crc") && 
+            name.length() > extraLen) {
+          String dataFile = name.substring(1, name.length()-extraLen+1);
+          
+          /* Deleting many files at once might be too much load on namenode,
+           * especially on large clusters. We could throttle based on
+           * some namesystem state. We can set high-low watermarks for 
+           * pending deletes, etc.
+           */
+          if (fileSet.contains(dataFile)) {
+            String filepath = path + (path.endsWith("/") ? "" : "/") + name;
+            
+            try {
+              LOG.debug("Deleting " + filepath);
+              if (getFSNamesystem().deleteInSafeMode(filepath)) {
+                numFilesDeleted++;
+              }
+            } catch (IOException e) {
+              LOG.error("Exception while deleting " + filepath + 
+                        " : " + StringUtils.stringifyException(e));
+            }
+          }
+        }
+      }
+    }
+    
+    // Reduce memory before recursion
+    fileSet = null;
+    
+    for ( DFSFileInfo file:fileArr ) {
+      if ( file.isDir() ) {
+        numFilesDeleted += deleteCrcFiles(file.getPath());
+      }
+    }
+    
+    return numFilesDeleted;
+  }
+  
+  int deleteCrcFiles() {
+    /* Iterate over all the files and delete any file with name .fname.crc
+     * if fname exists in the directory.
+     */
+    while (true) {
+      LOG.info("Deleting \".crc\" files. This can take few minutes ... ");
+      int numFilesDeleted = deleteCrcFiles("/");
+      LOG.info("Deleted " + numFilesDeleted + " files");
+      break;
+      // Should we iterate again? No need for now!
+    }
+    return 0;
+  } 
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ChecksumDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ChecksumDistributedFileSystem.java?view=auto&rev=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ChecksumDistributedFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ChecksumDistributedFileSystem.java Mon Jul 16 14:34:59 2007
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+import java.net.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An implementation of ChecksumFileSystem over DistributedFileSystem. 
+ * Note that as of now (May 07), DistributedFileSystem natively checksums 
+ * all of its data. Using this class is not be necessary in most cases.
+ * Currently provided mainly for backward compatibility and testing.
+ */
+public class ChecksumDistributedFileSystem extends ChecksumFileSystem {
+  
+  public ChecksumDistributedFileSystem() {
+    super( new DistributedFileSystem() );
+  }
+
+  /** @deprecated */
+  public ChecksumDistributedFileSystem(InetSocketAddress namenode,
+                                       Configuration conf) throws IOException {
+    super( new DistributedFileSystem(namenode, conf) );
+  }
+  
+  /** Any extra interface that DistributeFileSystem provides can be
+   * accessed with this.*/
+  DistributedFileSystem getDFS() {
+    return (DistributedFileSystem)fs;
+  }
+
+  @Override
+  public long getContentLength(Path f) throws IOException {
+    return fs.getContentLength(f);
+  }
+
+  /** Return the total raw capacity of the filesystem, disregarding
+   * replication .*/
+  public long getRawCapacity() throws IOException{
+    return getDFS().getRawCapacity();
+  }
+
+  /** Return the total raw used space in the filesystem, disregarding
+   * replication .*/
+  public long getRawUsed() throws IOException{
+    return getDFS().getRawUsed();
+  }
+
+  /** Return statistics for each datanode. */
+  public DatanodeInfo[] getDataNodeStats() throws IOException {
+    return getDFS().getDataNodeStats();
+  }
+    
+  /**
+   * Enter, leave or get safe mode.
+   *  
+   * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+   */
+  public boolean setSafeMode(FSConstants.SafeModeAction action) 
+    throws IOException {
+    return getDFS().setSafeMode(action);
+  }
+
+  /*
+   * Refreshes the list of hosts and excluded hosts from the configured 
+   * files.  
+   */
+  public void refreshNodes() throws IOException {
+    getDFS().refreshNodes();
+  }
+
+  /**
+   * Finalize previously upgraded files system state.
+   */
+  public void finalizeUpgrade() throws IOException {
+    getDFS().finalizeUpgrade();
+  }
+
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
+                                                        ) throws IOException {
+    return getDFS().distributedUpgradeProgress(action);
+  }
+
+  /*
+   * Dumps dfs data structures into specified file.
+   */
+  public void metaSave(String pathname) throws IOException {
+    getDFS().metaSave(pathname);
+  }
+
+  /**
+   * We need to find the blocks that didn't match.  Likely only one 
+   * is corrupt but we will report both to the namenode.  In the future,
+   * we can consider figuring out exactly which block is corrupt.
+   */
+  public boolean reportChecksumFailure(Path f, 
+                                       FSDataInputStream in, long inPos, 
+                                       FSDataInputStream sums, long sumsPos) {
+    return getDFS().reportChecksumFailure(f, in, inPos, sums, sumsPos);
+  }
+  
+  
+  /**
+   * Returns the stat information about the file.
+   */
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    return getDFS().getFileStatus(f);
+  }
+
+}
\ No newline at end of file



Mime
View raw message