hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r803601 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Wed, 12 Aug 2009 16:58:02 GMT
Author: hairong
Date: Wed Aug 12 16:58:02 2009
New Revision: 803601

URL: http://svn.apache.org/viewvc?rev=803601&view=rev
Log:
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas. Contributed by Hairong Kuang.

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Wed Aug 12 16:58:02 2009
@@ -92,6 +92,9 @@
     HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
     DataTransferProtocol.  (szetszwo)
 
+    HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
+    (hairong)
+    
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Wed Aug 12 16:58:02 2009
@@ -79,5 +79,16 @@
       return description;
     }
   }
+  
+  /**
+   * Define Replica Type
+   */
+  static public enum ReplicaState {
+    FINALIZED,  // finalized replica
+    RBW,        // replica being written
+    RWR,        // replica waiting to be recovered
+    RUR,        // replica under recovery
+    TEMPORARY   // temporary replica
+  }
 }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Aug 12 16:58:02 2009
@@ -1560,9 +1560,8 @@
     // This can happen if the namenode and client start recovering the same
     // file at the same time.
     synchronized (ongoingRecovery) {
-      Block tmp = new Block();
-      tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
-      if (ongoingRecovery.get(tmp) != null) {
+      if (ongoingRecovery.get(new Block(block.getBlockId(), block.getNumBytes(), 
+          GenerationStamp.WILDCARD_STAMP)) != null) {
         String msg = "Block " + block + " is already being recovered, " +
                      " ignoring this request to recover it.";
         LOG.info(msg);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Aug 12 16:58:02 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 
@@ -105,8 +106,9 @@
         if ( ! metaData.renameTo( newmeta ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
-                                 " from tmp to " + 
-                                 dest.getAbsolutePath() );
+                                 " from " + src + " to " + 
+                                 dest.getAbsolutePath() + " or from"
+                                 + metaData + " to " + newmeta);
         }
         if (DataNode.LOG.isDebugEnabled()) {
           DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
@@ -171,7 +173,7 @@
       return Block.GRANDFATHER_GENERATION_STAMP;
     }
 
-    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
+    void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
           children[i].getVolumeMap(volumeMap, volume);
@@ -179,11 +181,18 @@
       }
 
       File blockFiles[] = dir.listFiles();
-      for (int i = 0; i < blockFiles.length; i++) {
-        if (Block.isBlockFilename(blockFiles[i])) {
-          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
-          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), 
-                        new ReplicaInfo(volume, blockFiles[i]));
+      for (File blockFile : blockFiles) {
+        if (Block.isBlockFilename(blockFile)) {
+          long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+          long blockId = Block.filename2id(blockFile.getName());
+          ReplicaInfo oldReplica = volumeMap.add(
+              new FinalizedReplica(blockId, blockFile.length(), genStamp, 
+              volume, blockFile.getParentFile()));
+          if (oldReplica != null) {
+            DataNode.LOG.warn("Two block files have the same block id exits " +
+            		"on disk: " + oldReplica.getBlockFile() +
+            		" and " + blockFile );
+          }
         }
       }
     }
@@ -403,7 +412,7 @@
       DiskChecker.checkDir(tmpDir);
     }
       
-    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+    void getVolumeMap(ReplicasMap volumeMap) {
       dataDir.getVolumeMap(volumeMap, this);
     }
       
@@ -496,7 +505,7 @@
       return remaining;
     }
       
-    synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+    synchronized void getVolumeMap(ReplicasMap volumeMap) {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
       }
@@ -538,24 +547,6 @@
   public static final short METADATA_VERSION = 1;
     
 
-  static class ActiveFile {
-    final File file;
-    final List<Thread> threads = new ArrayList<Thread>(2);
-
-    ActiveFile(File f, List<Thread> list) {
-      file = f;
-      if (list != null) {
-        threads.addAll(list);
-      }
-      threads.add(Thread.currentThread());
-    }
-    
-    public String toString() {
-      return getClass().getSimpleName() + "(file=" + file
-          + ", threads=" + threads + ")";
-    }
-  } 
-  
   static String getMetaFileName(String blockFileName, long genStamp) {
     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
   }
@@ -605,22 +596,7 @@
 
   /** Return the block file for the given ID */ 
   public File findBlockFile(long blockId) {
-    final Block b = new Block(blockId);
-    File blockfile = null;
-    ActiveFile activefile = ongoingCreates.get(b);
-    if (activefile != null) {
-      blockfile = activefile.file;
-    }
-    if (blockfile == null) {
-      blockfile = getFile(b);
-    }
-    if (blockfile == null) {
-      if (DataNode.LOG.isDebugEnabled()) {
-        DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
-        DataNode.LOG.debug("volumeMap=" + volumeMap);
-      }
-    }
-    return blockfile;
+    return getFile(blockId);
   }
 
   /** {@inheritDoc} */
@@ -651,9 +627,8 @@
   }
 
   FSVolumeSet volumes;
-  private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  HashMap<Block,ReplicaInfo> volumeMap = null;
+  ReplicasMap volumeMap = new ReplicasMap();
   static  Random random = new Random();
 
   // Used for synchronizing access to usage stats
@@ -669,7 +644,6 @@
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
     volumes = new FSVolumeSet(volArray);
-    volumeMap = new HashMap<Block, ReplicaInfo>();
     volumes.getVolumeMap(volumeMap);
     registerMBean(storage.getStorageID());
   }
@@ -737,15 +711,27 @@
   }
 
   /**
-   * Returns handles to the block file and its metadata file
+   * Get the meta info of a block stored in volumeMap
+   * @param b block
+   * @return the meta replica information
+   * @throws IOException if no entry is in the map or 
+   *                        there is a generation stamp mismatch
    */
-  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
-                          long blkOffset, long ckoff) throws IOException {
-
+  private ReplicaInfo getReplicaInfo(Block b) throws IOException {
     ReplicaInfo info = volumeMap.get(b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
+    return info;
+  }
+  
+  /**
+   * Returns handles to the block file and its metadata file
+   */
+  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
+                          long blkOffset, long ckoff) throws IOException {
+
+    ReplicaInfo info = getReplicaInfo(b);
     FSVolume v = info.getVolume();
     File blockFile = v.getTmpFile(b);
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
@@ -774,23 +760,16 @@
    * @param block Block
    * @param numLinks Detach if the number of links exceed this value
    * @throws IOException
-   * @return - true if the specified block was detached
+   * @return - true if the specified block was detached or the block
+   *           is not in any snapshot.
    */
   public boolean detachBlock(Block block, int numLinks) throws IOException {
     ReplicaInfo info = null;
 
     synchronized (this) {
-      info = volumeMap.get(block);
-    }
-    return info.detachBlock(block, numLinks);
-  }
-
-  static private <T> void updateBlockMap(Map<Block, T> blockmap,
-      Block oldblock, Block newblock) throws IOException {
-    if (blockmap.containsKey(oldblock)) {
-      T value = blockmap.remove(oldblock);
-      blockmap.put(newblock, value);
+      info = getReplicaInfo(block);
     }
+   return info.detachBlock(numLinks);
   }
 
   /** {@inheritDoc} */
@@ -822,18 +801,24 @@
 
   /**
    * Try to update an old block to a new block.
-   * If there are ongoing create threads running for the old block,
+   * If there are write threads running for the old block,
    * the threads will be returned without updating the block. 
    * 
-   * @return ongoing create threads if there is any. Otherwise, return null.
+   * @return write threads if there is any. Otherwise, return null.
    */
   private synchronized List<Thread> tryUpdateBlock(
       Block oldblock, Block newblock) throws IOException {
-    //check ongoing create threads
-    final ActiveFile activefile = ongoingCreates.get(oldblock);
-    if (activefile != null && !activefile.threads.isEmpty()) {
+    //check write threads
+    final ReplicaInfo replicaInfo = volumeMap.get(oldblock.getBlockId());
+    File blockFile = replicaInfo==null?null:replicaInfo.getBlockFile();
+    if (blockFile == null) {
+      throw new IOException("Block " + oldblock + " does not exist.");
+    }
+
+    if (replicaInfo instanceof ReplicaInPipeline) {
+      List<Thread> threads = ((ReplicaInPipeline)replicaInfo).getThreads();
       //remove dead threads
-      for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
+      for(Iterator<Thread> i = threads.iterator(); i.hasNext(); ) {
         final Thread t = i.next();
         if (!t.isAlive()) {
           i.remove();
@@ -841,19 +826,14 @@
       }
 
       //return living threads
-      if (!activefile.threads.isEmpty()) {
-        return new ArrayList<Thread>(activefile.threads);
+      if (!threads.isEmpty()) {
+        return new ArrayList<Thread>(threads);
       }
     }
 
     //No ongoing create threads is alive.  Update block.
-    File blockFile = findBlockFile(oldblock.getBlockId());
-    if (blockFile == null) {
-      throw new IOException("Block " + oldblock + " does not exist.");
-    }
-
-    File oldMetaFile = findMetaFile(blockFile);
-    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+    File oldMetaFile = replicaInfo.getMetaFile();
+    long oldgs = replicaInfo.getGenerationStamp();
     
     //rename meta file to a tmp file
     File tmpMetaFile = new File(oldMetaFile.getParent(),
@@ -863,7 +843,7 @@
     }
 
     //update generation stamp
-    if (oldgs > newblock.getGenerationStamp()) {
+    if (oldgs >= newblock.getGenerationStamp()) {
       throw new IOException("Cannot update block (id=" + newblock.getBlockId()
           + ") generation stamp from " + oldgs
           + " to " + newblock.getGenerationStamp());
@@ -878,15 +858,16 @@
       truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
     }
 
+    // update replicaInfo
+    replicaInfo.setGenerationStamp(newblock.getGenerationStamp());
+    replicaInfo.setNumBytes(newblock.getNumBytes());
+    
     //rename the tmp file to the new meta file (with new generation stamp)
-    File newMetaFile = getMetaFile(blockFile, newblock);
+    File newMetaFile = replicaInfo.getMetaFile();
     if (!tmpMetaFile.renameTo(newMetaFile)) {
       throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
     }
 
-    updateBlockMap(ongoingCreates, oldblock, newblock);
-    updateBlockMap(volumeMap, oldblock, newblock);
-
     // paranoia! verify that the contents of the stored block 
     // matches the block file on disk.
     validateBlockMetadata(newblock);
@@ -964,6 +945,7 @@
     //
     // Make sure the block isn't a valid one - we're still creating it!
     //
+    ReplicaInfo replicaInfo = volumeMap.get(b);
     if (isValidBlock(b)) {
       if (!isRecovery) {
         throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
@@ -973,7 +955,9 @@
       // some of the packets were not received by the client. The client 
       // re-opens the connection and retries sending those packets.
       // The other reason is that an "append" is occurring to this block.
-      detachBlock(b, 1);
+      if (replicaInfo != null) {
+        replicaInfo.detachBlock(1);
+      }
     }
     long blockSize = b.getNumBytes();
 
@@ -986,10 +970,9 @@
       //
       // Is it already in the create process?
       //
-      ActiveFile activeFile = ongoingCreates.get(b);
-      if (activeFile != null) {
-        f = activeFile.file;
-        threads = activeFile.threads;
+      if (replicaInfo != null && replicaInfo instanceof ReplicaInPipeline) {
+        f = replicaInfo.getBlockFile();
+        threads = ((ReplicaInPipeline)replicaInfo).getThreads();
         
         if (!isRecovery) {
           throw new BlockAlreadyExistsException("Block " + b +
@@ -999,27 +982,27 @@
             thread.interrupt();
           }
         }
-        ongoingCreates.remove(b);
       }
       FSVolume v = null;
-      if (!isRecovery) {
+      if (!isRecovery) { // create a new block
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
         f = createTmpFile(v, b);
-        volumeMap.put(b, new ReplicaInfo(v));
+        replicaInfo = new ReplicaInPipeline(b.getBlockId(), 
+            b.getGenerationStamp(), v, f.getParentFile());
+        volumeMap.add(replicaInfo);
       } else if (f != null) {
         DataNode.LOG.info("Reopen already-open Block for append " + b);
-        // create or reuse temporary file to hold block in the designated volume
-        v = volumeMap.get(b).getVolume();
-        volumeMap.put(b, new ReplicaInfo(v));
       } else {
         // reopening block for appending to it.
         DataNode.LOG.info("Reopen Block for append " + b);
-        v = volumeMap.get(b).getVolume();
+        v = replicaInfo.getVolume();
         f = createTmpFile(v, b);
-        File blkfile = getBlockFile(b);
-        File oldmeta = getMetaFile(b);
-        File newmeta = getMetaFile(f, b);
+        File blkfile = replicaInfo.getBlockFile();
+        File oldmeta = replicaInfo.getMetaFile();
+        replicaInfo = new ReplicaInPipeline(replicaInfo,
+            v, f.getParentFile(), threads);
+        File newmeta = replicaInfo.getMetaFile();
 
         // rename meta file to tmp directory
         DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
@@ -1042,7 +1025,7 @@
                                   " to tmp dir " + f);
           }
         }
-        volumeMap.put(b, new ReplicaInfo(v));
+        volumeMap.add(replicaInfo);
       }
       if (f == null) {
         DataNode.LOG.warn("Block " + b + " reopen failed " +
@@ -1050,7 +1033,6 @@
         throw new IOException("Block " + b + " reopen failed " +
                               " Unable to locate tmp file.");
       }
-      ongoingCreates.put(b, new ActiveFile(f, threads));
     }
 
     try {
@@ -1093,7 +1075,7 @@
                                  throws IOException {
     long size = 0;
     synchronized (this) {
-      FSVolume vol = volumeMap.get(b).getVolume();
+      FSVolume vol = getReplicaInfo(b).getVolume();
       size = vol.getTmpFile(b).length();
     }
     if (size < dataOffset) {
@@ -1111,7 +1093,7 @@
 
   synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
     if ( vol == null ) {
-      vol = volumeMap.get( blk ).getVolume();
+      vol = getReplicaInfo( blk ).getVolume();
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
@@ -1131,40 +1113,43 @@
    * Complete the block write!
    */
   public synchronized void finalizeBlock(Block b) throws IOException {
-    ActiveFile activeFile = ongoingCreates.get(b);
-    if (activeFile == null) {
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
+    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
       throw new IOException("Block " + b + " is already finalized.");
     }
-    File f = activeFile.file;
-    if (f == null || !f.exists()) {
-      throw new IOException("No temporary file " + f + " for block " + b);
-    }
-    FSVolume v = volumeMap.get(b).getVolume();
-    if (v == null) {
-      throw new IOException("No volume for temporary file " + f + 
-                            " for block " + b);
+    ReplicaInfo newReplicaInfo = null;
+    if (replicaInfo.getState() == ReplicaState.RUR &&
+       ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == 
+         ReplicaState.FINALIZED) {
+      newReplicaInfo = ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+    } else {
+      FSVolume v = replicaInfo.getVolume();
+      File f = replicaInfo.getBlockFile();
+      if (v == null) {
+        throw new IOException("No volume for temporary file " + f + 
+            " for block " + b);
+      }
+
+      File dest = v.addBlock(b, f);
+      newReplicaInfo = new FinalizedReplica(b, v, dest.getParentFile());
     }
-        
-    File dest = null;
-    dest = v.addBlock(b, f);
-    volumeMap.put(b, new ReplicaInfo(v, dest));
-    ongoingCreates.remove(b);
+    volumeMap.add(newReplicaInfo);
   }
 
   /**
    * Remove the temporary block file (if any)
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    // remove the block from in-memory data structure
-    ActiveFile activefile = ongoingCreates.remove(b);
-    if (activefile == null) {
-      return;
-    }
-    volumeMap.remove(b);
-    
-    // delete the on-disk temp file
-    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
-      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+      // remove from volumeMap
+      volumeMap.remove(b);
+      
+      // delete the on-disk temp file
+      if (delBlockFromDisk(replicaInfo.getBlockFile(), 
+          replicaInfo.getMetaFile(), b)) {
+        DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+      }
     }
   }
 
@@ -1200,8 +1185,8 @@
   public Block[] getBlockReport() {
     ArrayList<Block> list =  new ArrayList<Block>(volumeMap.size());
     synchronized(this) {
-      for (Block b : volumeMap.keySet()) {
-        if (!ongoingCreates.containsKey(b)) {
+      for (ReplicaInfo b : volumeMap.replicas()) {
+        if (b.getState() == ReplicaState.FINALIZED) {
           list.add(new Block(b));
         }
       }
@@ -1216,7 +1201,7 @@
    * is needed to handle concurrent modification to the block.
    */
   synchronized Block[] getBlockList(boolean deepcopy) {
-    Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+    Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
     if (deepcopy) {
       for (int i = 0; i < list.length; i++) {
         list[i] = new Block(list[i]);
@@ -1227,9 +1212,15 @@
 
   /**
    * Check whether the given block is a valid one.
+   * valid means finalized
    */
   public boolean isValidBlock(Block b) {
-    return validateBlockFile(b) != null;
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo == null || 
+        replicaInfo.getState() != ReplicaState.FINALIZED) {
+      return false;
+    }
+    return replicaInfo.getBlockFile().exists();
   }
 
   /**
@@ -1248,10 +1239,7 @@
 
   /** {@inheritDoc} */
   public void validateBlockMetadata(Block b) throws IOException {
-    ReplicaInfo info = volumeMap.get(b);
-    if (info == null) {
-      throw new IOException("Block " + b + " does not exist in volumeMap.");
-    }
+    ReplicaInfo info = getReplicaInfo(b);
     FSVolume v = info.getVolume();
     File tmp = v.getTmpFile(b);
     File f = getFile(b);
@@ -1307,7 +1295,8 @@
       synchronized (this) {
         f = getFile(invalidBlks[i]);
         ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
-        if (dinfo == null) {
+        if (dinfo == null || 
+            dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
                            + invalidBlks[i] + 
                            ". BlockInfo not found in volumeMap.");
@@ -1366,16 +1355,24 @@
   }
 
   /**
-   * Turn the block identifier into a filename.
+   * Turn the block identifier into a filename; ignore generation stamp!!!
    */
   public synchronized File getFile(Block b) {
-    ReplicaInfo info = volumeMap.get(b);
+    return getFile(b.getBlockId());
+  }
+
+  /**
+   * Turn the block identifier into a filename
+   * @param blockId a block's id
+   * @return on disk data file path; null if the replica does not exist
+   */
+  private File getFile(long blockId) {
+    ReplicaInfo info = volumeMap.get(blockId);
     if (info != null) {
-      return info.getFile();
+      return info.getBlockFile();
     }
-    return null;
+    return null;    
   }
-
   /**
    * check if a data directory is healthy
    * @throws DiskErrorException
@@ -1459,11 +1456,12 @@
    */
   public void checkAndUpdate(long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
-    Block block = new Block(blockId);
     DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
+    ReplicaInfo memBlockInfo;
     synchronized (this) {
-      if (ongoingCreates.get(block) != null) {
+      memBlockInfo = volumeMap.get(blockId);
+      if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
       }
@@ -1472,7 +1470,6 @@
           Block.getGenerationStamp(diskMetaFile.getName()) :
             Block.GRANDFATHER_GENERATION_STAMP;
 
-      ReplicaInfo memBlockInfo = volumeMap.get(block);
       if (diskFile == null || !diskFile.exists()) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
@@ -1484,14 +1481,14 @@
           }
           return;
         }
-        if (!memBlockInfo.getFile().exists()) {
+        if (!memBlockInfo.getBlockFile().exists()) {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
-          volumeMap.remove(block);
+          volumeMap.remove(blockId);
           if (datanode.blockScanner != null) {
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(new Block(blockId));
           }
-          DataNode.LOG.warn("Removed block " + block.getBlockId()
+          DataNode.LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
           if (diskMetaFile != null && diskMetaFile.exists()
@@ -1507,23 +1504,20 @@
        */
       if (memBlockInfo == null) {
         // Block is missing in memory - add the block to volumeMap
-        ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
-        Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
-        volumeMap.put(diskBlock, diskBlockInfo);
+        ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
+            diskFile.length(), diskGS, vol, diskFile.getParentFile());
+        volumeMap.add(diskBlockInfo);
         if (datanode.blockScanner != null) {
-          datanode.blockScanner.addBlock(diskBlock);
+          datanode.blockScanner.addBlock(diskBlockInfo);
         }
-        DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+        DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
         return;
       }
       /*
        * Block exists in volumeMap and the block file exists on the disk
        */
-      // Iterate to get key from volumeMap for the blockId
-      Block memBlock = getBlockKey(blockId);
-
       // Compare block files
-      File memFile = memBlockInfo.getFile();
+      File memFile = memBlockInfo.getBlockFile();
       if (memFile.exists()) {
         if (memFile.compareTo(diskFile) != 0) {
           DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
@@ -1540,19 +1534,17 @@
             + memFile.getAbsolutePath()
             + " does not exist. Updating it to the file found during scan "
             + diskFile.getAbsolutePath());
-        ReplicaInfo info = volumeMap.remove(memBlock);
-        info.setFile(diskFile);
+        memBlockInfo.setDir(diskFile.getParentFile());
         memFile = diskFile;
 
         DataNode.LOG.warn("Updating generation stamp for block " + blockId
-            + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
-        memBlock.setGenerationStamp(diskGS);
-        volumeMap.put(memBlock, info);
+            + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
+        memBlockInfo.setGenerationStamp(diskGS);
       }
 
       // Compare generation stamp
-      if (memBlock.getGenerationStamp() != diskGS) {
-        File memMetaFile = getMetaFile(diskFile, memBlock);
+      if (memBlockInfo.getGenerationStamp() != diskGS) {
+        File memMetaFile = getMetaFile(diskFile, memBlockInfo);
         if (memMetaFile.exists()) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
             DataNode.LOG.warn("Metadata file in memory "
@@ -1569,23 +1561,19 @@
               : Block.GRANDFATHER_GENERATION_STAMP;
 
           DataNode.LOG.warn("Updating generation stamp for block " + blockId
-              + " from " + memBlock.getGenerationStamp() + " to " + gs);
+              + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
 
-          ReplicaInfo info = volumeMap.remove(memBlock);
-          memBlock.setGenerationStamp(gs);
-          volumeMap.put(memBlock, info);
+          memBlockInfo.setGenerationStamp(gs);
         }
       }
 
       // Compare block size
-      if (memBlock.getNumBytes() != memFile.length()) {
+      if (memBlockInfo.getNumBytes() != memFile.length()) {
         // Update the length based on the block file
-        corruptBlock = new Block(memBlock);
+        corruptBlock = new Block(memBlockInfo);
         DataNode.LOG.warn("Updating size of block " + blockId + " from "
-            + memBlock.getNumBytes() + " to " + memFile.length());
-        ReplicaInfo info = volumeMap.remove(memBlock);
-        memBlock.setNumBytes(memFile.length());
-        volumeMap.put(memBlock, info);
+            + memBlockInfo.getNumBytes() + " to " + memFile.length());
+        memBlockInfo.setNumBytes(memFile.length());
       }
     }
 
@@ -1605,18 +1593,14 @@
   }
 
   /**
-   * Get reference to the key in the volumeMap. To be called from methods that
+   * Get reference to the replica meta info in the replicasMap. 
+   * To be called from methods that
    * are synchronized on {@link FSDataset}
    * @param blockId
-   * @return key from the volumeMap
+   * @return replica's meta information from the replicas map
    */
-  Block getBlockKey(long blockId) {
+  ReplicaInfo getBlock(long blockId) {
     assert(Thread.holdsLock(this));
-    for (Block b : volumeMap.keySet()) {
-      if (b.getBlockId() == blockId) {
-        return b;
-      }
-    }
-    return null;
+    return volumeMap.get(blockId);
   }
 }

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class describes a replica that has been finalized.
+ */
+class FinalizedReplica extends ReplicaInfo {
+  private boolean detached;      // copy-on-write done for block
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  FinalizedReplica(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  FinalizedReplica(Block block, FSVolume vol, File dir) {
+    super(block, vol, dir);
+  }
+  
+  @Override  // ReplicaInfo
+  ReplicaState getState() {
+    return ReplicaState.FINALIZED;
+  }
+  
+  @Override // ReplicaInfo
+  boolean isDetached() {
+    return detached;
+  }
+
+  @Override  // ReplicaInfo
+  void setDetached() {
+    detached = true;
+  }
+  
+  @Override  // ReplicaInfo
+  long getVisibleLen() throws IOException {
+    return getNumBytes();       // all bytes are visible
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/** This class represents replicas being written. 
+ * Those are the replicas that
+ * are created in a pipeline initiated by a dfs client.
+ */
+class ReplicaBeingWritten extends ReplicaInPipeline {
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+    ReplicaBeingWritten(long blockId, long genStamp, 
+        FSVolume vol, File dir) {
+    super( blockId, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param threads a list of threads that are writing to this replica
+   */
+  ReplicaBeingWritten(Block block, 
+      FSVolume vol, File dir, List<Thread> threads) {
+    super( block, vol, dir, threads);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param threads a list of threads that are writing to this replica
+   */
+  ReplicaBeingWritten(long blockId, long len, long genStamp,
+      FSVolume vol, File dir, List<Thread> threads ) {
+    super( blockId, len, genStamp, vol, dir, threads);
+  }
+  
+  @Override   //ReplicaInfo
+  long getVisibleLen() throws IOException {
+    return getBytesAcked();       // all acked bytes are visible
+  }
+
+  @Override   //ReplicaInfo
+  ReplicaState getState() {
+    return ReplicaState.RBW;
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/** 
+ * This class defines a replica in a pipeline, which
+ * includes a persistent replica being written to by a dfs client or
+ * a temporary replica being replicated by a source datanode or
+ * being copied for the balancing purpose.
+ * 
+ * The base class implements a temporary replica
+ */
+class ReplicaInPipeline extends ReplicaInfo {
+  private long bytesAcked;
+  private long bytesOnDisk;
+  private List<Thread> threads = new ArrayList<Thread>();
+  
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param state replica state
+   */
+    ReplicaInPipeline(long blockId, long genStamp, 
+        FSVolume vol, File dir) {
+    this( blockId, 0L, genStamp, vol, dir, null);
+  }
+
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param threads a list of threads that are writing to this replica
+   */
+  ReplicaInPipeline(Block block, 
+      FSVolume vol, File dir, List<Thread> threads) {
+    this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
+        vol, dir, threads);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param threads a list of threads that are writing to this replica
+   */
+  ReplicaInPipeline(long blockId, long len, long genStamp,
+      FSVolume vol, File dir, List<Thread> threads ) {
+    super( blockId, len, genStamp, vol, dir);
+    this.bytesAcked = len;
+    this.bytesOnDisk = len;
+    setThreads(threads);
+    this.threads.add(Thread.currentThread());
+  }
+
+  @Override  //ReplicaInfo
+  long getVisibleLen() throws IOException {
+    // no bytes are visible
+    throw new IOException("No bytes are visible for temporary replicas");
+  }
+  
+  @Override  //ReplicaInfo
+  ReplicaState getState() {
+    return ReplicaState.TEMPORARY;
+  }
+  
+  /**
+   * Get the number of bytes acked
+   * @return the number of bytes acked
+   */
+  long getBytesAcked() {
+    return bytesAcked;
+  }
+  
+  /**
+   * Set the number bytes that have acked
+   * @param bytesAcked
+   */
+  void setBytesAcked(long bytesAcked) {
+    this.bytesAcked = bytesAcked;
+  }
+  
+  /**
+   * Get the number of bytes that have written to disk
+   * @return the number of bytes that have written to disk
+   */
+  long getBytesOnDisk() {
+    return bytesOnDisk;
+  }
+  
+  /**
+   * Set the number of bytes on disk
+   * @param bytesOnDisk number of bytes on disk
+   */
+  void setBytesOnDisk(long bytesOnDisk) {
+    this.bytesOnDisk = bytesOnDisk;
+  }
+  
+  /**
+   * Set the threads that are writing to this replica
+   * @param threads a list of threads writing to this replica
+   */
+  public void setThreads(List<Thread> threads) {
+    if (threads != null) {
+      threads.addAll(threads);
+    }
+  }
+  
+  /**
+   * Get a list of threads writing to this replica 
+   * @return a list of threads writing to this replica
+   */
+  public List<Thread> getThreads() {
+    return threads;
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Aug 12 16:58:02 2009
@@ -22,61 +22,137 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.io.IOUtils;
 
 /**
- * This class is used by the datanode to maintain the map from a block 
- * to its metadata.
+ * This class is used by datanodes to maintain meta data of its replicas.
+ * It provides a general interface for meta information of a replica.
  */
-class ReplicaInfo {
+abstract class ReplicaInfo extends Block {
+  private FSVolume volume;      // volume where the replica belongs
+  private File     dir;         // directory where block & meta files belong
 
-  private FSVolume volume;       // volume where the block belongs
-  private File     file;         // block file
-  private boolean detached;      // copy-on-write done for block
-
-  ReplicaInfo(FSVolume vol, File file) {
-    this.volume = vol;
-    this.file = file;
-    detached = false;
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(long blockId, long genStamp, FSVolume vol, File dir) {
+    this( blockId, 0L, genStamp, vol, dir);
   }
-
-  ReplicaInfo(FSVolume vol) {
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(Block block, FSVolume vol, File dir) {
+    this(block.getBlockId(), block.getNumBytes(), 
+        block.getGenerationStamp(), vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp);
     this.volume = vol;
-    this.file = null;
-    detached = false;
+    this.dir = dir;
   }
 
+  /**
+   * Get this replica's meta file name
+   * @return this replica's meta file name
+   */
+  private String getMetaFileName() {
+    return getBlockName() + "_" + getGenerationStamp() + METADATA_EXTENSION; 
+  }
+  
+  /**
+   * Get the full path of this replica's data file
+   * @return the full path of this replica's data file
+   */
+  File getBlockFile() {
+    return new File(getDir(), getBlockName());
+  }
+  
+  /**
+   * Get the full path of this replica's meta file
+   * @return the full path of this replica's meta file
+   */
+  File getMetaFile() {
+    return new File(getDir(), getMetaFileName());
+  }
+  
+  /**
+   * Get the volume where this replica is located on disk
+   * @return the volume where this replica is located on disk
+   */
   FSVolume getVolume() {
     return volume;
   }
-
-  File getFile() {
-    return file;
+  
+  /**
+   * Set the volume where this replica is located on disk
+   */
+  void setVolume(FSVolume vol) {
+    this.volume = vol;
+  }
+  
+  /**
+   * Return the parent directory path where this replica is located
+   * @return the parent directory path where this replica is located
+   */
+  File getDir() {
+    return dir;
   }
 
-  void setFile(File f) {
-    file = f;
+  /**
+   * Set the parent directory where this replica is located
+   * @param dir the parent directory where the replica is located
+   */
+  void setDir(File dir) {
+    this.dir = dir;
   }
 
+
   /**
-   * Is this block already detached?
+   * Get the replica state
+   * @return the replica state
+   */
+  abstract ReplicaState getState();
+  
+  /**
+   * check if this replica has already detached.
+   * @return true if the replica has already detached or no need to detach; 
+   *         false otherwise
    */
   boolean isDetached() {
-    return detached;
+    return true;                // no need to be detached
   }
 
   /**
-   *  Block has been successfully detached
+   * set that this replica is detached
    */
   void setDetached() {
-    detached = true;
+    // no need to be detached
   }
-
-  /**
+  
+   /**
    * Copy specified file into a temporary file. Then rename the
    * temporary file to the original name. This will cause any
    * hardlinks to the original file to be removed. The temporary
@@ -84,7 +160,7 @@
    * be recovered (especially on Windows) on datanode restart.
    */
   private void detachFile(File file, Block b) throws IOException {
-    File tmpFile = volume.createDetachFile(b, file.getName());
+    File tmpFile = getVolume().createDetachFile(b, file.getName());
     try {
       FileInputStream in = new FileInputStream(file);
       try {
@@ -114,33 +190,60 @@
   }
 
   /**
-   * Returns true if this block was copied, otherwise returns false.
+   * Remove a hard link by copying the block to a temporary place and 
+   * then moving it back
+   * @param numLinks number of hard links
+   * @return true if copy is successful; 
+   *         false if it is already detached or no need to be detached
+   * @throws IOException if there is any copy error
    */
-  boolean detachBlock(Block block, int numLinks) throws IOException {
+  boolean detachBlock(int numLinks) throws IOException {
     if (isDetached()) {
       return false;
     }
-    if (file == null || volume == null) {
-      throw new IOException("detachBlock:Block not found. " + block);
+    File file = getBlockFile();
+    if (file == null || getVolume() == null) {
+      throw new IOException("detachBlock:Block not found. " + this);
     }
-    File meta = FSDataset.getMetaFile(file, block);
+    File meta = getMetaFile();
     if (meta == null) {
-      throw new IOException("Meta file not found for block " + block);
+      throw new IOException("Meta file not found for block " + this);
     }
 
     if (HardLink.getLinkCount(file) > numLinks) {
-      DataNode.LOG.info("CopyOnWrite for block " + block);
-      detachFile(file, block);
+      DataNode.LOG.info("CopyOnWrite for block " + this);
+      detachFile(file, this);
     }
     if (HardLink.getLinkCount(meta) > numLinks) {
-      detachFile(meta, block);
+      detachFile(meta, this);
     }
     setDetached();
     return true;
   }
+
+  /**
+   * Get the number of bytes that are visible to readers
+   * @return the number of bytes that are visible to readers
+   */
+  abstract long getVisibleLen() throws IOException;
+  
+  /**
+   * Set this replica's generation stamp to be a newer one
+   * @param newGS new generation stamp
+   * @throws IOException is the new generation stamp is not greater than the current one
+   */
+  void setNewerGenerationStamp(long newGS) throws IOException {
+    long curGS = getGenerationStamp();
+    if (newGS <= curGS) {
+      throw new IOException("New generation stamp (" + newGS 
+          + ") must be greater than current one (" + curGS + ")");
+    }
+    setGenerationStamp(newGS);
+  }
   
+  @Override  //Object
   public String toString() {
-    return getClass().getSimpleName() + "(volume=" + volume
-        + ", file=" + file + ", detached=" + detached + ")";
+    return getClass().getSimpleName() + " " + super.toString() + 
+    "(volume=" + volume + ", file=" + getBlockFile() + ")";
   }
 }

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class represents replicas that are under block recovery
+ * It has a recovery id that is equal to the generation stamp 
+ * that the replica will be bumped to after recovery
+ * The recovery id is used to handle multiple concurrent block recoveries.
+ * A recovery with higher recovery id preempts recoveries with a lower id.
+ *
+ */
+class ReplicaUnderRecovery extends ReplicaInfo {
+  private ReplicaInfo original; // the original replica that needs to be recovered
+  private long recoveryId; // recovery id; it is also the generation stamp 
+                           // that the replica will be bumped to after recovery
+
+  ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
+    super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
+        replica.getVolume(), replica.getDir());
+    if ( replica.getState() != ReplicaState.FINALIZED ||
+         replica.getState() != ReplicaState.RBW ||
+         replica.getState() != ReplicaState.RWR ) {
+      throw new IllegalArgumentException("Cannot recover replica: " + replica);
+    }
+    this.original = replica;
+    this.recoveryId = recoveryId;
+  }
+
+  /** 
+   * Get the recovery id
+   * @return the generation stamp that the replica will be bumped to 
+   */
+  long getRecoveryID() {
+    return recoveryId;
+  }
+
+  /** 
+   * Set the recovery id
+   * @param recoveryId the new recoveryId
+   */
+  void setRecoveryID(long recoveryId) {
+    if (recoveryId > this.recoveryId) {
+      this.recoveryId = recoveryId;
+    } else {
+      throw new IllegalArgumentException("The new rcovery id: " + recoveryId
+          + " must be greater than the current one: " + this.recoveryId);
+    }
+  }
+
+  /**
+   * Get the original replica that's under recovery
+   * @return the original replica under recovery
+   */
+  ReplicaInfo getOriginalReplica() {
+    return original;
+  }
+  
+  /**
+   * Get the original replica's state
+   * @return the original replica's state
+   */
+  ReplicaState getOrignalReplicaState() {
+    return original.getState();
+  }
+
+  @Override //ReplicaInfo
+  boolean isDetached() {
+    return original.isDetached();
+  }
+
+  @Override //ReplicaInfo
+  void setDetached() {
+    original.setDetached();
+  }
+  
+  @Override //ReplicaInfo
+  ReplicaState getState() {
+    return ReplicaState.RUR;
+  }
+  
+  @Override
+  long getVisibleLen() throws IOException {
+    return original.getVisibleLen();
+  }
+
+  @Override  //org.apache.hadoop.hdfs.protocol.Block
+  public void setBlockId(long blockId) {
+    super.setBlockId(blockId);
+    original.setBlockId(blockId);
+  }
+
+  @Override //org.apache.hadoop.hdfs.protocol.Block
+  public void setGenerationStamp(long gs) {
+    super.setGenerationStamp(gs);
+    original.setGenerationStamp(gs);
+  }
+  
+  @Override //org.apache.hadoop.hdfs.protocol.Block
+  public void setNumBytes(long numBytes) {
+    super.setNumBytes(numBytes);
+    original.setNumBytes(numBytes);
+  }
+  
+  @Override //ReplicaInfo
+  void setDir(File dir) {
+    super.setDir(dir);
+    original.setDir(dir);
+  }
+  
+  @Override //ReplicaInfo
+  void setVolume(FSVolume vol) {
+    super.setVolume(vol);
+    original.setVolume(vol);
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class represents a replica that is waiting to be recovered.
+ * After a datanode restart, any replica in "rbw" directory is loaded
+ * as a replica waiting to be recovered.
+ * A replica waiting to be recovered does not provision read nor
+ * participates in any pipeline recovery. It will become outdated if its
+ * client continues to write or be recovered as a result of
+ * lease recovery.
+ */
+class ReplicaWaitingToBeRecovered extends ReplicaInfo {
+  private boolean detached;      // copy-on-write done for block
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaWaitingToBeRecovered(Block block, FSVolume vol, File dir) {
+    super(block, vol, dir);
+  }
+  
+  @Override //ReplicaInfo
+  ReplicaState getState() {
+    return ReplicaState.RWR;
+  }
+  
+  @Override //ReplicaInfo
+  boolean isDetached() {
+    return detached;
+  }
+
+  @Override //ReplicaInfo
+  void setDetached() {
+    detached = true;
+  }
+  
+  @Override //ReplicaInfo
+  long getVisibleLen() throws IOException {
+    return -1;  //no bytes are visible
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+class ReplicasMap {
+  // HashMap: maps a block id to the replica's meta info
+  private HashMap<Long, ReplicaInfo> map = new HashMap<Long, ReplicaInfo>();
+  /**
+   * Get the meta information of the replica that matches both block id 
+   * and generation stamp
+   * @param block block with its id as the key
+   * @return the replica's meta information
+   * @throws IllegalArgumentException if the input block is null
+   */
+  ReplicaInfo get(Block block) {
+    if (block == null) {
+      throw new IllegalArgumentException("Do not expect null block");
+    }
+    ReplicaInfo replicaInfo = get(block.getBlockId());
+    if (replicaInfo != null && 
+        block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+      return replicaInfo;
+    }
+    return null;
+  }
+  
+  /**
+   * Get the meta information of the replica that matches the block id
+   * @param blockId a block's id
+   * @return the replica's meta information
+   */
+  ReplicaInfo get(long blockId) {
+    return map.get(blockId);
+  }
+  
+  /**
+   * Add a replica's meta information into the map 
+   * 
+   * @param replicaInfo a replica's meta information
+   * @return previous meta information of the replica
+   * @throws IllegalArgumentException if the input parameter is null
+   */
+  ReplicaInfo add(ReplicaInfo replicaInfo) {
+    if (replicaInfo == null) {
+      throw new IllegalArgumentException("Do not expect null block");
+    }
+    return  map.put(replicaInfo.getBlockId(), replicaInfo);
+  }
+  
+  /**
+   * Remove the replica's meta information from the map that matches
+   * the input block's id and generation stamp
+   * @param block block with its id as the key
+   * @return the removed replica's meta information
+   * @throws IllegalArgumentException if the input block is null
+   */
+  ReplicaInfo remove(Block block) {
+    if (block == null) {
+      throw new IllegalArgumentException("Do not expect null block");
+    }
+    Long key = Long.valueOf(block.getBlockId());
+    ReplicaInfo replicaInfo = map.get(key);
+    if (replicaInfo != null &&
+        replicaInfo.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+      return remove(key);
+    } 
+    
+    return null;
+  }
+  
+  /**
+   * Remove the replica's meta information from the map if present
+   * @param the block id of the replica to be removed
+   * @return the removed replica's meta information
+   */
+  ReplicaInfo remove(long blockId) {
+    return map.remove(blockId);
+  }
+ 
+  /**
+   * Get the size of the map
+   * @return the number of replicas in the map
+   */
+  int size() {
+    return map.size();
+  }
+  
+  /**
+   * Get a collection of the replicas
+   * @return a collection of the replicas
+   */
+  Collection<ReplicaInfo> replicas() {
+    return map.values();
+  }
+}

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Wed Aug 12 16:58:02 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -103,17 +104,20 @@
       }
       DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); 
 
+      DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
+      cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+
       //update blocks with random block sizes
+      long newGS = cluster.getNameNode().nextGenerationStamp(lastblock);
       Block[] newblocks = new Block[REPLICATION_NUM];
       for(int i = 0; i < REPLICATION_NUM; i++) {
         newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
-            lastblock.getGenerationStamp());
+            newGS);
         idps[i].updateBlock(lastblock, newblocks[i], false);
         checkMetaInfo(newblocks[i], idps[i]);
       }
-
-      DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
-      cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+      cluster.getNameNode().commitBlockSynchronization(lastblock, newGS, 
+          lastblocksize, false, false, new DatanodeID[]{});
 
       //block synchronization
       final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Wed Aug 12 16:58:02 2009
@@ -22,7 +22,6 @@
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,17 +66,16 @@
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
     synchronized (fds) {
-      for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
-        Block b = entry.getKey();
-        File f = entry.getValue().getFile();
-        File mf = FSDataset.getMetaFile(f, b);
+      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+        File f = b.getBlockFile();
+        File mf = b.getMetaFile();
         // Truncate a block file that has a corresponding metadata file
         if (f.exists() && f.length() != 0 && mf.exists()) {
           FileOutputStream s = new FileOutputStream(f);
           FileChannel channel = s.getChannel();
           channel.truncate(0);
           LOG.info("Truncated block file " + f.getAbsolutePath());
-          return entry.getKey().getBlockId();
+          return b.getBlockId();
         }
       }
     }
@@ -87,14 +85,13 @@
   /** Delete a block file */
   private long deleteBlockFile() {
     synchronized(fds) {
-      for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
-        Block b = entry.getKey();
-        File f = entry.getValue().getFile();
-        File mf = FSDataset.getMetaFile(f, b);
+      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+        File f = b.getBlockFile();
+        File mf = b.getMetaFile();
         // Delete a block file that has corresponding metadata file
         if (f.exists() && mf.exists() && f.delete()) {
           LOG.info("Deleting block file " + f.getAbsolutePath());
-          return entry.getKey().getBlockId();
+          return b.getBlockId();
         }
       }
     }
@@ -104,16 +101,12 @@
   /** Delete block meta file */
   private long deleteMetaFile() {
     synchronized(fds) {
-      for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
-        Block b = entry.getKey();
-        String blkfile = entry.getValue().getFile().getAbsolutePath();
-        long genStamp = b.getGenerationStamp();
-        String metafile = FSDataset.getMetaFileName(blkfile, genStamp);
-        File file = new File(metafile);
+      for (ReplicaInfo b : fds.volumeMap.replicas()) {
+        File file = b.getMetaFile();
         // Delete a metadata file
         if (file.exists() && file.delete()) {
           LOG.info("Deleting metadata file " + file.getAbsolutePath());
-          return entry.getKey().getBlockId();
+          return b.getBlockId();
         }
       }
     }
@@ -324,7 +317,7 @@
   }
 
   private void verifyAddition(long blockId, long genStamp, long size) {
-    Block memBlock = fds.getBlockKey(blockId);
+    Block memBlock = fds.getBlock(blockId);
     assertNotNull(memBlock);
     ReplicaInfo blockInfo;
     synchronized(fds) {
@@ -334,7 +327,7 @@
 
     // Added block has the same file as the one created by the test
     File file = new File(getBlockFile(blockId));
-    assertEquals(file.getName(), blockInfo.getFile().getName());
+    assertEquals(file.getName(), blockInfo.getBlockFile().getName());
 
     // Generation stamp is same as that of created file
     assertEquals(genStamp, memBlock.getGenerationStamp());
@@ -353,7 +346,7 @@
   private void verifyGenStamp(long blockId, long genStamp) {
     Block memBlock;
     synchronized(fds) {
-      memBlock = fds.getBlockKey(blockId);
+      memBlock = fds.getBlock(blockId);
     }
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());



Mime
View raw message