hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1296534 [3/11] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/ src...
Date Sat, 03 Mar 2012 00:43:00 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Sat Mar  3 00:42:49 2012
@@ -21,38 +21,25 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 
@@ -64,7 +51,7 @@ class NameNodeConnector {
   private static final Log LOG = Balancer.LOG;
   private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
 
-  final InetSocketAddress namenodeAddress;
+  final URI nameNodeUri;
   final String blockpoolID;
 
   final NamenodeProtocol namenode;
@@ -78,12 +65,17 @@ class NameNodeConnector {
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
 
-  NameNodeConnector(InetSocketAddress namenodeAddress, Configuration conf
-      ) throws IOException {
-    this.namenodeAddress = namenodeAddress;
-    this.namenode = createNamenode(namenodeAddress, conf);
-    this.client = DFSUtil.createNamenode(conf);
-    this.fs = FileSystem.get(NameNode.getUri(namenodeAddress), conf);
+  NameNodeConnector(URI nameNodeUri,
+      Configuration conf) throws IOException {
+    this.nameNodeUri = nameNodeUri;
+    
+    this.namenode =
+      NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
+        .getProxy();
+    this.client =
+      NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
+        .getProxy();
+    this.fs = FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
     this.blockpoolID = namespaceinfo.getBlockPoolID();
@@ -188,38 +180,11 @@ class NameNodeConnector {
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + "[namenodeAddress=" + namenodeAddress
+    return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
         + ", id=" + blockpoolID
         + "]";
   }
 
-  /** Build a NamenodeProtocol connection to the namenode and
-   * set up the retry policy
-   */ 
-  private static NamenodeProtocol createNamenode(InetSocketAddress address,
-      Configuration conf) throws IOException {
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        5, 200, TimeUnit.MILLISECONDS);
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        timeoutPolicy, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap =
-        new HashMap<String, RetryPolicy>();
-    methodNameToPolicyMap.put("getBlocks", methodPolicy);
-    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
-    RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
-        ProtobufRpcEngine.class);
-    NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
-            RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
-            UserGroupInformation.getCurrentUser(), conf,
-            NetUtils.getDefaultSocketFactory(conf));
-    NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
-        NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
-    return new NamenodeProtocolTranslatorPB(retryProxy);
-  }
-
   /**
    * Periodically updates access keys.
    */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Sat Mar  3 00:42:49 2012
@@ -183,7 +183,7 @@ public class BlockInfo extends Block imp
   /**
    * Count the number of data-nodes the block belongs to.
    */
-  int numNodes() {
+  public int numNodes() {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert triplets.length % 3 == 0 : "Malformed BlockInfo";
     for(int idx = getCapacity()-1; idx >= 0; idx--) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Sat Mar  3 00:42:49 2012
@@ -28,6 +28,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.U
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Util;
@@ -68,6 +71,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -80,17 +84,27 @@ public class BlockManager {
   /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
 
+  private static final String QUEUE_REASON_CORRUPT_STATE =
+    "it has the wrong state or generation stamp";
+
+  private static final String QUEUE_REASON_FUTURE_GENSTAMP =
+    "generation stamp is in the future";
+
   private final Namesystem namesystem;
 
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
+  
+  private final PendingDataNodeMessages pendingDNMessages =
+    new PendingDataNodeMessages();
 
   private volatile long pendingReplicationBlocksCount = 0L;
   private volatile long corruptReplicaBlocksCount = 0L;
   private volatile long underReplicatedBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
   private volatile long excessBlocksCount = 0L;
+  private volatile long postponedMisreplicatedBlocksCount = 0L;
   
   /** Used by metrics */
   public long getPendingReplicationBlocksCount() {
@@ -116,6 +130,14 @@ public class BlockManager {
   public long getExcessBlocksCount() {
     return excessBlocksCount;
   }
+  /** Used by metrics */
+  public long getPostponedMisreplicatedBlocksCount() {
+    return postponedMisreplicatedBlocksCount;
+  }
+  /** Used by metrics */
+  public int getPendingDataNodeMessageCount() {
+    return pendingDNMessages.count();
+  }
 
   /**replicationRecheckInterval is how often namenode checks for new replication work*/
   private final long replicationRecheckInterval;
@@ -134,6 +156,15 @@ public class BlockManager {
 
   /** Blocks to be invalidated. */
   private final InvalidateBlocks invalidateBlocks;
+  
+  /**
+   * After a failover, over-replicated blocks may not be handled
+   * until all of the replicas have done a block report to the
+   * new active. This is to make sure that this NameNode has been
+   * notified of all block deletions that might have been pending
+   * when the failover happened.
+   */
+  private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
 
   //
   // Keeps a TreeSet for every named node. Each treeset contains
@@ -316,49 +347,15 @@ public class BlockManager {
       out.println("Metasave: Blocks waiting for replication: " + 
                   neededReplications.size());
       for (Block block : neededReplications) {
-        List<DatanodeDescriptor> containingNodes =
-                                          new ArrayList<DatanodeDescriptor>();
-        List<DatanodeDescriptor> containingLiveReplicasNodes =
-          new ArrayList<DatanodeDescriptor>();
-        
-        NumberReplicas numReplicas = new NumberReplicas();
-        // source node returned is not used
-        chooseSourceDatanode(block, containingNodes,
-            containingLiveReplicasNodes, numReplicas);
-        assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
-        int usableReplicas = numReplicas.liveReplicas() +
-                             numReplicas.decommissionedReplicas();
-       
-        if (block instanceof BlockInfo) {
-          String fileName = ((BlockInfo)block).getINode().getFullPathName();
-          out.print(fileName + ": ");
-        }
-        // l: == live:, d: == decommissioned c: == corrupt e: == excess
-        out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
-                  " (replicas:" +
-                  " l: " + numReplicas.liveReplicas() +
-                  " d: " + numReplicas.decommissionedReplicas() +
-                  " c: " + numReplicas.corruptReplicas() +
-                  " e: " + numReplicas.excessReplicas() + ") "); 
-
-        Collection<DatanodeDescriptor> corruptNodes = 
-                                      corruptReplicas.getNodes(block);
-        
-        for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-             jt.hasNext();) {
-          DatanodeDescriptor node = jt.next();
-          String state = "";
-          if (corruptNodes != null && corruptNodes.contains(node)) {
-            state = "(corrupt)";
-          } else if (node.isDecommissioned() || 
-              node.isDecommissionInProgress()) {
-            state = "(decommissioned)";
-          }          
-          out.print(" " + node + state + " : ");
-        }
-        out.println("");
+        dumpBlockMeta(block, out);
       }
     }
+    
+    // Dump any postponed over-replicated blocks
+    out.println("Mis-replicated blocks that have been postponed:");
+    for (Block block : postponedMisreplicatedBlocks) {
+      dumpBlockMeta(block, out);
+    }
 
     // Dump blocks from pendingReplication
     pendingReplications.metaSave(out);
@@ -369,6 +366,58 @@ public class BlockManager {
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
   }
+  
+  /**
+   * Dump the metadata for the given block in a human-readable
+   * form.
+   */
+  private void dumpBlockMeta(Block block, PrintWriter out) {
+    List<DatanodeDescriptor> containingNodes =
+                                      new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> containingLiveReplicasNodes =
+      new ArrayList<DatanodeDescriptor>();
+    
+    NumberReplicas numReplicas = new NumberReplicas();
+    // source node returned is not used
+    chooseSourceDatanode(block, containingNodes,
+        containingLiveReplicasNodes, numReplicas);
+    assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
+    int usableReplicas = numReplicas.liveReplicas() +
+                         numReplicas.decommissionedReplicas();
+    
+    if (block instanceof BlockInfo) {
+      String fileName = ((BlockInfo)block).getINode().getFullPathName();
+      out.print(fileName + ": ");
+    }
+    // l: == live:, d: == decommissioned c: == corrupt e: == excess
+    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+              " (replicas:" +
+              " l: " + numReplicas.liveReplicas() +
+              " d: " + numReplicas.decommissionedReplicas() +
+              " c: " + numReplicas.corruptReplicas() +
+              " e: " + numReplicas.excessReplicas() + ") "); 
+
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(block);
+    
+    for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+         jt.hasNext();) {
+      DatanodeDescriptor node = jt.next();
+      String state = "";
+      if (corruptNodes != null && corruptNodes.contains(node)) {
+        state = "(corrupt)";
+      } else if (node.isDecommissioned() || 
+          node.isDecommissionInProgress()) {
+        state = "(decommissioned)";
+      }
+      
+      if (node.areBlockContentsStale()) {
+        state += " (block deletions maybe out of date)";
+      }
+      out.print(" " + node + state + " : ");
+    }
+    out.println("");
+  }
 
   /** @return maxReplicationStreams */
   public int getMaxReplicationStreams() {
@@ -425,7 +474,7 @@ public class BlockManager {
     
     final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock, commitBlock);
     if(countNodes(lastBlock).liveReplicas() >= minReplication)
-      completeBlock(fileINode,fileINode.numBlocks()-1);
+      completeBlock(fileINode,fileINode.numBlocks()-1, false);
     return b;
   }
 
@@ -437,19 +486,15 @@ public class BlockManager {
    * of replicas reported from data-nodes.
    */
   private BlockInfo completeBlock(final INodeFile fileINode,
-      final int blkIndex) throws IOException {
-    return completeBlock(fileINode, blkIndex, false);
-  }
-
-  public BlockInfo completeBlock(final INodeFile fileINode, 
-      final int blkIndex, final boolean force) throws IOException {
+      final int blkIndex, boolean force) throws IOException {
     if(blkIndex < 0)
       return null;
     BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
     if(curBlock.isComplete())
       return curBlock;
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
-    if(!force && ucBlock.numNodes() < minReplication)
+    int numNodes = ucBlock.numNodes();
+    if (!force && numNodes < minReplication)
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
     if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
@@ -458,20 +503,43 @@ public class BlockManager {
     BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
     // replace penultimate block in file
     fileINode.setBlock(blkIndex, completeBlock);
+    
+    // Since safe-mode only counts complete blocks, and we now have
+    // one more complete block, we need to adjust the total up, and
+    // also count it as safe, if we have at least the minimum replica
+    // count. (We may not have the minimum replica count yet if this is
+    // a "forced" completion when a file is getting closed by an
+    // OP_CLOSE edit on the standby).
+    namesystem.adjustSafeModeBlockTotals(0, 1);
+    namesystem.incrementSafeBlockCount(
+        Math.min(numNodes, minReplication));
+    
     // replace block in the blocksMap
     return blocksMap.replaceBlock(completeBlock);
   }
 
   private BlockInfo completeBlock(final INodeFile fileINode,
-      final BlockInfo block) throws IOException {
+      final BlockInfo block, boolean force) throws IOException {
     BlockInfo[] fileBlocks = fileINode.getBlocks();
     for(int idx = 0; idx < fileBlocks.length; idx++)
       if(fileBlocks[idx] == block) {
-        return completeBlock(fileINode, idx);
+        return completeBlock(fileINode, idx, force);
       }
     return block;
   }
+  
+  /**
+   * Force the given block in the given file to be marked as complete,
+   * regardless of whether enough replicas are present. This is necessary
+   * when tailing edit logs as a Standby.
+   */
+  public BlockInfo forceCompleteBlock(final INodeFile fileINode,
+      final BlockInfoUnderConstruction block) throws IOException {
+    block.commitBlock(block);
+    return completeBlock(fileINode, block, true);
+  }
 
+  
   /**
    * Convert the last block of the file to an under construction block.<p>
    * The block is converted only if the file has blocks and the last one
@@ -508,6 +576,14 @@ public class BlockManager {
       String datanodeId = dd.getStorageID();
       invalidateBlocks.remove(datanodeId, oldBlock);
     }
+    
+    // Adjust safe-mode totals, since under-construction blocks don't
+    // count in safe-mode.
+    namesystem.adjustSafeModeBlockTotals(
+        // decrement safe if we had enough
+        targets.length >= minReplication ? -1 : 0,
+        // always decrement total blocks
+        -1);
 
     final long fileLength = fileINode.computeContentSummary().getLength();
     final long pos = fileLength - ucBlock.getNumBytes();
@@ -598,8 +674,8 @@ public class BlockManager {
     final boolean isCorrupt = numCorruptNodes == numNodes;
     final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
     final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    int j = 0;
     if (numMachines > 0) {
-      int j = 0;
       for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
           it.hasNext();) {
         final DatanodeDescriptor d = it.next();
@@ -608,6 +684,12 @@ public class BlockManager {
           machines[j++] = d;
       }
     }
+    assert j == machines.length :
+      "isCorrupt: " + isCorrupt + 
+      " numMachines: " + numMachines +
+      " numNodes: " + numNodes +
+      " numCorrupt: " + numCorruptNodes +
+      " numCorruptRepls: " + numCorruptReplicas;
     final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
     return new LocatedBlock(eb, machines, pos, isCorrupt);
   }
@@ -772,6 +854,14 @@ public class BlockManager {
 
     node.resetBlocks();
     invalidateBlocks.remove(node.getStorageID());
+    
+    // If the DN hasn't block-reported since the most recent
+    // failover, then we may have been holding up on processing
+    // over-replicated blocks because of it. But we can now
+    // process those blocks.
+    if (node.areBlockContentsStale()) {
+      rescanPostponedMisreplicatedBlocks();
+    }
   }
 
   /**
@@ -809,22 +899,18 @@ public class BlockManager {
    */
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String reason) throws IOException {
-    namesystem.writeLock();
-    try {
-      final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
-      if (storedBlock == null) {
-        // Check if the replica is in the blockMap, if not
-        // ignore the request for now. This could happen when BlockScanner
-        // thread of Datanode reports bad block before Block reports are sent
-        // by the Datanode on startup
-        NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
-            + blk + " not found.");
-        return;
-      }
-      markBlockAsCorrupt(storedBlock, dn, reason);
-    } finally {
-      namesystem.writeUnlock();
+    assert namesystem.hasWriteLock();
+    final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
+    if (storedBlock == null) {
+      // Check if the replica is in the blockMap, if not
+      // ignore the request for now. This could happen when BlockScanner
+      // thread of Datanode reports bad block before Block reports are sent
+      // by the Datanode on startup
+      NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+          + blk + " not found.");
+      return;
     }
+    markBlockAsCorrupt(storedBlock, dn, reason);
   }
 
   private void markBlockAsCorrupt(BlockInfo storedBlock,
@@ -876,10 +962,17 @@ public class BlockManager {
           + " because datanode " + dn.getName() + " does not exist.");
     }
 
-    // Check how many copies we have of the block. If we have at least one
-    // copy on a live node, then we can delete it.
-    int count = countNodes(blk).liveReplicas();
-    if (count >= 1) {
+    // Check how many copies we have of the block
+    NumberReplicas nr = countNodes(blk);
+    if (nr.replicasOnStaleNodes() > 0) {
+      NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
+          "invalidation of block " + blk + " on " + dn + " because " +
+          nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
+          "with potentially out-of-date block reports.");
+      postponeBlock(blk);
+
+    } else if (nr.liveReplicas() >= 1) {
+      // If we have at least one copy on a live node, then we can delete it.
       addToInvalidates(blk, dn);
       removeStoredBlock(blk, node);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -892,6 +985,13 @@ public class BlockManager {
     }
   }
 
+  private void postponeBlock(Block blk) {
+    if (postponedMisreplicatedBlocks.add(blk)) {
+      postponedMisreplicatedBlocksCount++;
+    }
+  }
+  
+  
   void updateState() {
     pendingReplicationBlocksCount = pendingReplications.size();
     underReplicatedBlocksCount = neededReplications.size();
@@ -930,7 +1030,7 @@ public class BlockManager {
    *
    * @return number of blocks scheduled for replication during this iteration.
    */
-  private int computeReplicationWork(int blocksToProcess) throws IOException {
+  int computeReplicationWork(int blocksToProcess) throws IOException {
     List<List<Block>> blocksToReplicate = null;
     namesystem.writeLock();
     try {
@@ -981,8 +1081,10 @@ public class BlockManager {
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
                 block, containingNodes, liveReplicaNodes, numReplicas);
-            if(srcNode == null) // block can not be replicated from any node
+            if(srcNode == null) { // block can not be replicated from any node
+              LOG.debug("Block " + block + " cannot be repl from any node");
               continue;
+          }
 
             assert liveReplicaNodes.size() == numReplicas.liveReplicas();
             // do not schedule more if enough replicas is already pending
@@ -1232,7 +1334,7 @@ public class BlockManager {
         srcNode = node;
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned, corrupt, excess);
+      numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
     return srcNode;
   }
 
@@ -1314,7 +1416,7 @@ public class BlockManager {
 
       // To minimize startup time, we discard any second (or later) block reports
       // that we receive while still in startup phase.
-      if (namesystem.isInStartupSafeMode() && node.numBlocks() > 0) {
+      if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
         NameNode.stateChangeLog.info("BLOCK* processReport: "
             + "discarded non-initial block report from " + nodeID.getName()
             + " because namenode still in startup phase");
@@ -1328,6 +1430,19 @@ public class BlockManager {
       } else {
         processReport(node, newReport);
       }
+      
+      // Now that we have an up-to-date block report, we know that any
+      // deletions from a previous NN iteration have been accounted for.
+      boolean staleBefore = node.areBlockContentsStale();
+      node.receivedBlockReport();
+      if (staleBefore && !node.areBlockContentsStale()) {
+        LOG.info("BLOCK* processReport: " +
+            "Received first block report from " + node +
+            " after becoming active. Its block contents are no longer" +
+            " considered stale.");
+        rescanPostponedMisreplicatedBlocks();
+      }
+      
     } finally {
       endTime = Util.now();
       namesystem.writeUnlock();
@@ -1340,6 +1455,37 @@ public class BlockManager {
         + ", processing time: " + (endTime - startTime) + " msecs");
   }
 
+  /**
+   * Rescan the list of blocks which were previously postponed.
+   */
+  private void rescanPostponedMisreplicatedBlocks() {
+    for (Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
+         it.hasNext();) {
+      Block b = it.next();
+      
+      BlockInfo bi = blocksMap.getStoredBlock(b);
+      if (bi == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+              "Postponed mis-replicated block " + b + " no longer found " +
+              "in block map.");
+        }
+        it.remove();
+        postponedMisreplicatedBlocksCount--;
+        continue;
+      }
+      MisReplicationResult res = processMisReplicatedBlock(bi);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+            "Re-scanned block " + b + ", result is " + res);
+      }
+      if (res != MisReplicationResult.POSTPONE) {
+        it.remove();
+        postponedMisreplicatedBlocksCount--;
+      }
+    }
+  }
+  
   private void processReport(final DatanodeDescriptor node,
       final BlockListAsLongs report) throws IOException {
     // Normal case:
@@ -1392,9 +1538,19 @@ public class BlockManager {
     assert (node.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
+    boolean isStandby = namesystem.isInStandbyState();
+    
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState reportedState = itBR.getCurrentReplicaState();
+      
+      if (isStandby &&
+          namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
+        queueReportedBlock(node, iblk, reportedState,
+            QUEUE_REASON_FUTURE_GENSTAMP);
+        continue;
+      }
+      
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
       // If block does not belong to any file, we are done.
       if (storedBlock == null) continue;
@@ -1404,7 +1560,14 @@ public class BlockManager {
       BlockToMarkCorrupt c = checkReplicaCorrupt(
           iblk, reportedState, storedBlock, ucState, node);
       if (c != null) {
-        markBlockAsCorrupt(c.blockInfo, node, c.reason);
+        if (namesystem.isInStandbyState()) {
+          // In the Standby, we may receive a block report for a file that we
+          // just have an out-of-date gen-stamp or state for, for example.
+          queueReportedBlock(node, iblk, reportedState,
+              QUEUE_REASON_CORRUPT_STATE);
+        } else {
+          markBlockAsCorrupt(c.blockInfo, node, c.reason);
+        }
         continue;
       }
       
@@ -1487,7 +1650,8 @@ public class BlockManager {
    * @param toCorrupt replicas with unexpected length or generation stamp;
    *        add to corrupt replicas
    * @param toUC replicas of blocks currently under construction
-   * @return
+   * @return the up-to-date stored block, if it should be kept.
+   *         Otherwise, null.
    */
   private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
       final Block block, final ReplicaState reportedState, 
@@ -1502,6 +1666,13 @@ public class BlockManager {
           + " replicaState = " + reportedState);
     }
   
+    if (namesystem.isInStandbyState() &&
+        namesystem.isGenStampInFuture(block.getGenerationStamp())) {
+      queueReportedBlock(dn, block, reportedState,
+          QUEUE_REASON_FUTURE_GENSTAMP);
+      return null;
+    }
+    
     // find block by blockId
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if(storedBlock == null) {
@@ -1519,15 +1690,24 @@ public class BlockManager {
 
     // Ignore replicas already scheduled to be removed from the DN
     if(invalidateBlocks.contains(dn.getStorageID(), block)) {
-      assert storedBlock.findDatanode(dn) < 0 : "Block " + block
-        + " in invalidated blocks set should not appear in DN " + dn;
+/*  TODO: following assertion is incorrect, see HDFS-2668
+assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+        + " in recentInvalidatesSet should not appear in DN " + dn; */
       return storedBlock;
     }
 
     BlockToMarkCorrupt c = checkReplicaCorrupt(
         block, reportedState, storedBlock, ucState, dn);
     if (c != null) {
-      toCorrupt.add(c);
+      if (namesystem.isInStandbyState()) {
+        // If the block is an out-of-date generation stamp or state,
+        // but we're the standby, we shouldn't treat it as corrupt,
+        // but instead just queue it for later processing.
+        queueReportedBlock(dn, storedBlock, reportedState,
+            QUEUE_REASON_CORRUPT_STATE);
+      } else {
+        toCorrupt.add(c);
+      }
       return storedBlock;
     }
 
@@ -1545,6 +1725,68 @@ public class BlockManager {
     return storedBlock;
   }
 
+  /**
+   * Queue the given reported block for later processing in the
+   * standby node. {@see PendingDataNodeMessages}.
+   * @param reason a textual reason to report in the debug logs
+   */
+  private void queueReportedBlock(DatanodeDescriptor dn, Block block,
+      ReplicaState reportedState, String reason) {
+    assert namesystem.isInStandbyState();
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Queueing reported block " + block +
+          " in state " + reportedState + 
+          " from datanode " + dn + " for later processing " +
+          "because " + reason + ".");
+    }
+    pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
+  }
+
+  /**
+   * Try to process any messages that were previously queued for the given
+   * block. This is called from FSEditLogLoader whenever a block's state
+   * in the namespace has changed or a new block has been created.
+   */
+  public void processQueuedMessagesForBlock(Block b) throws IOException {
+    Queue<ReportedBlockInfo> queue = pendingDNMessages.takeBlockQueue(b);
+    if (queue == null) {
+      // Nothing to re-process
+      return;
+    }
+    processQueuedMessages(queue);
+  }
+  
+  private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
+      throws IOException {
+    for (ReportedBlockInfo rbi : rbis) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing previouly queued message " + rbi);
+      }
+      processAndHandleReportedBlock(
+          rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
+    }
+  }
+  
+  /**
+   * Process any remaining queued datanode messages after entering
+   * active state. At this point they will not be re-queued since
+   * we are the definitive master node and thus should be up-to-date
+   * with the namespace information.
+   */
+  public void processAllPendingDNMessages() throws IOException {
+    assert !namesystem.isInStandbyState() :
+      "processAllPendingDNMessages() should be called after exiting " +
+      "standby state!";
+    int count = pendingDNMessages.count();
+    if (count > 0) {
+      LOG.info("Processing " + count + " messages from DataNodes " +
+          "that were previously queued during standby state.");
+    }
+    processQueuedMessages(pendingDNMessages.takeAll());
+    assert pendingDNMessages.count() == 0;
+  }
+
   /*
    * The next two methods test the various cases under which we must conclude
    * the replica is corrupt, or under construction.  These are laid out
@@ -1675,13 +1917,15 @@ public class BlockManager {
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
     if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
-        && numCurrentReplica >= minReplication)
-      storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
-
-    // check whether safe replication is reached for the block
-    // only complete blocks are counted towards that
-    if(storedBlock.isComplete())
+        && numCurrentReplica >= minReplication) {
+      completeBlock(storedBlock.getINode(), storedBlock, false);
+    } else if (storedBlock.isComplete()) {
+      // check whether safe replication is reached for the block
+      // only complete blocks are counted towards that.
+      // In the case that the block just became complete above, completeBlock()
+      // handles the safe block count maintenance.
       namesystem.incrementSafeBlockCount(numCurrentReplica);
+    }
   }
 
   /**
@@ -1738,15 +1982,17 @@ public class BlockManager {
       + pendingReplications.getNumReplicas(storedBlock);
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
-        numLiveReplicas >= minReplication)
-      storedBlock = completeBlock(fileINode, storedBlock);
-
-    // check whether safe replication is reached for the block
-    // only complete blocks are counted towards that
-    // Is no-op if not in safe mode.
-    if(storedBlock.isComplete())
+        numLiveReplicas >= minReplication) {
+      storedBlock = completeBlock(fileINode, storedBlock, false);
+    } else if (storedBlock.isComplete()) {
+      // check whether safe replication is reached for the block
+      // only complete blocks are counted towards that
+      // Is no-op if not in safe mode.
+      // In the case that the block just became complete above, completeBlock()
+      // handles the safe block count maintenance.
       namesystem.incrementSafeBlockCount(numCurrentReplica);
-
+    }
+    
     // if file is under construction, then done for now
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
@@ -1839,49 +2085,93 @@ public class BlockManager {
   public void processMisReplicatedBlocks() {
     assert namesystem.hasWriteLock();
 
-    long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0,
+    long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0,
          nrUnderConstruction = 0;
     neededReplications.clear();
     for (BlockInfo block : blocksMap.getBlocks()) {
-      INodeFile fileINode = block.getINode();
-      if (fileINode == null) {
-        // block does not belong to any file
+      MisReplicationResult res = processMisReplicatedBlock(block);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("block " + block + ": " + res);
+      }
+      switch (res) {
+      case UNDER_REPLICATED:
+        nrUnderReplicated++;
+        break;
+      case OVER_REPLICATED:
+        nrOverReplicated++;
+        break;
+      case INVALID:
         nrInvalid++;
-        addToInvalidates(block);
-        continue;
-      }
-      if (!block.isComplete()) {
-        // Incomplete blocks are never considered mis-replicated --
-        // they'll be reached when they are completed or recovered.
+        break;
+      case POSTPONE:
+        nrPostponed++;
+        postponeBlock(block);
+        break;
+      case UNDER_CONSTRUCTION:
         nrUnderConstruction++;
-        continue;
-      }
-      // calculate current replication
-      short expectedReplication = fileINode.getReplication();
-      NumberReplicas num = countNodes(block);
-      int numCurrentReplica = num.liveReplicas();
-      // add to under-replicated queue if need to be
-      if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
-        if (neededReplications.add(block, numCurrentReplica, num
-            .decommissionedReplicas(), expectedReplication)) {
-          nrUnderReplicated++;
-        }
-      }
-
-      if (numCurrentReplica > expectedReplication) {
-        // over-replicated block
-        nrOverReplicated++;
-        processOverReplicatedBlock(block, expectedReplication, null, null);
+        break;
+      case OK:
+        break;
+      default:
+        throw new AssertionError("Invalid enum value: " + res);
       }
     }
-
+    
     LOG.info("Total number of blocks            = " + blocksMap.size());
     LOG.info("Number of invalid blocks          = " + nrInvalid);
     LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
-    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
+    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated +
+        ((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : ""));
     LOG.info("Number of blocks being written    = " + nrUnderConstruction);
   }
 
+  /**
+   * Process a single possibly misreplicated block. This adds it to the
+   * appropriate queues if necessary, and returns a result code indicating
+   * what happened with it.
+   */
+  private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
+    INodeFile fileINode = block.getINode();
+    if (fileINode == null) {
+      // block does not belong to any file
+      addToInvalidates(block);
+      return MisReplicationResult.INVALID;
+    }
+    if (!block.isComplete()) {
+      // Incomplete blocks are never considered mis-replicated --
+      // they'll be reached when they are completed or recovered.
+      return MisReplicationResult.UNDER_CONSTRUCTION;
+    }
+    // calculate current replication
+    short expectedReplication = fileINode.getReplication();
+    NumberReplicas num = countNodes(block);
+    int numCurrentReplica = num.liveReplicas();
+    // add to under-replicated queue if need to be
+    if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+      if (neededReplications.add(block, numCurrentReplica, num
+          .decommissionedReplicas(), expectedReplication)) {
+        return MisReplicationResult.UNDER_REPLICATED;
+      }
+    }
+
+    if (numCurrentReplica > expectedReplication) {
+      if (num.replicasOnStaleNodes() > 0) {
+        // If any of the replicas of this block are on nodes that are
+        // considered "stale", then these replicas may in fact have
+        // already been deleted. So, we cannot safely act on the
+        // over-replication until a later point in time, when
+        // the "stale" nodes have block reported.
+        return MisReplicationResult.POSTPONE;
+      }
+      
+      // over-replicated block
+      processOverReplicatedBlock(block, expectedReplication, null, null);
+      return MisReplicationResult.OVER_REPLICATED;
+    }
+    
+    return MisReplicationResult.OK;
+  }
+  
   /** Set replication for the blocks. */
   public void setReplication(final short oldRepl, final short newRepl,
       final String src, final Block... blocks) throws IOException {
@@ -1925,6 +2215,14 @@ public class BlockManager {
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
          it.hasNext();) {
       DatanodeDescriptor cur = it.next();
+      if (cur.areBlockContentsStale()) {
+        LOG.info("BLOCK* processOverReplicatedBlock: " +
+            "Postponing processing of over-replicated block " +
+            block + " since datanode " + cur + " does not yet have up-to-date " +
+            "block information.");
+        postponeBlock(block);
+        return;
+      }
       LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
           .getStorageID());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
@@ -2151,13 +2449,19 @@ public class BlockManager {
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.remove(block);
-
+    processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
+        delHintNode);
+  }
+  
+  private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
+      ReplicaState reportedState, DatanodeDescriptor delHintNode)
+      throws IOException {
     // blockReceived reports a finalized block
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, block, ReplicaState.FINALIZED,
+    processReportedBlock(node, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2181,59 +2485,80 @@ public class BlockManager {
     }
   }
 
-  /** The given node is reporting that it received/deleted certain blocks. */
-  public void blockReceivedAndDeleted(final DatanodeID nodeID, 
+  /**
+   * The given node is reporting incremental information about some blocks.
+   * This includes blocks that are starting to be received, completed being
+   * received, or deleted.
+   */
+  public void processIncrementalBlockReport(final DatanodeID nodeID, 
      final String poolId, 
-     final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
+     final ReceivedDeletedBlockInfo blockInfos[]
   ) throws IOException {
     namesystem.writeLock();
     int received = 0;
     int deleted = 0;
+    int receiving = 0;
     try {
       final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
       if (node == null || !node.isAlive) {
         NameNode.stateChangeLog
-            .warn("BLOCK* blockReceivedDeleted"
+            .warn("BLOCK* processIncrementalBlockReport"
                 + " is received from dead or unregistered node "
                 + nodeID.getName());
         throw new IOException(
-            "Got blockReceivedDeleted message from unregistered or dead node");
+            "Got incremental block report from unregistered or dead node");
       }
 
-      for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
-        if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
-          removeStoredBlock(
-              receivedAndDeletedBlocks[i].getBlock(), node);
+      for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+        switch (rdbi.getStatus()) {
+        case DELETED_BLOCK:
+          removeStoredBlock(rdbi.getBlock(), node);
           deleted++;
-        } else {
-          addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
-              receivedAndDeletedBlocks[i].getDelHints());
+          break;
+        case RECEIVED_BLOCK:
+          addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
           received++;
+          break;
+        case RECEIVING_BLOCK:
+          receiving++;
+          processAndHandleReportedBlock(node, rdbi.getBlock(),
+              ReplicaState.RBW, null);
+          break;
+        default:
+          String msg = 
+            "Unknown block status code reported by " + nodeID.getName() +
+            ": " + rdbi;
+          NameNode.stateChangeLog.warn(msg);
+          assert false : msg; // if assertions are enabled, throw.
+          break;
         }
         if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* block"
-              + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
-                  : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+          NameNode.stateChangeLog.debug("BLOCK* block "
+              + (rdbi.getStatus()) + ": " + rdbi.getBlock()
               + " is received from " + nodeID.getName());
         }
       }
     } finally {
       namesystem.writeUnlock();
       NameNode.stateChangeLog
-          .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
-              + nodeID.getName() + " received: " + received + ", "
+          .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+              + nodeID.getName()
+              +  " receiving: " + receiving + ", "
+              + " received: " + received + ", "
               + " deleted: " + deleted);
     }
   }
 
   /**
-   * Return the number of nodes that are live and decommissioned.
+   * Return the number of nodes hosting a given block, grouped
+   * by the state of those replicas.
    */
   public NumberReplicas countNodes(Block b) {
-    int count = 0;
+    int decommissioned = 0;
     int live = 0;
     int corrupt = 0;
     int excess = 0;
+    int stale = 0;
     Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     while (nodeIter.hasNext()) {
@@ -2241,7 +2566,7 @@ public class BlockManager {
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
         corrupt++;
       } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-        count++;
+        decommissioned++;
       } else {
         LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
             .getStorageID());
@@ -2251,8 +2576,11 @@ public class BlockManager {
           live++;
         }
       }
+      if (node.areBlockContentsStale()) {
+        stale++;
+      }
     }
-    return new NumberReplicas(live, count, corrupt, excess);
+    return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
   }
 
   /** 
@@ -2379,7 +2707,7 @@ public class BlockManager {
   }
 
   public int getActiveBlockCount() {
-    return blocksMap.size() - (int)invalidateBlocks.numBlocks();
+    return blocksMap.size();
   }
 
   public DatanodeDescriptor[] getNodes(BlockInfo block) {
@@ -2397,10 +2725,17 @@ public class BlockManager {
   }
 
   public void removeBlock(Block block) {
+    assert namesystem.hasWriteLock();
+    // No need to ACK blocks that are being removed entirely
+    // from the namespace, since the removal of the associated
+    // file already removes them from the block map below.
     block.setNumBytes(BlockCommand.NO_ACK);
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);
+    if (postponedMisreplicatedBlocks.remove(block)) {
+      postponedMisreplicatedBlocksCount--;
+    }
   }
 
   public BlockInfo getStoredBlock(Block block) {
@@ -2412,6 +2747,9 @@ public class BlockManager {
       final int curReplicasDelta, int expectedReplicasDelta) {
     namesystem.writeLock();
     try {
+      if (!namesystem.isPopulatingReplQueues()) {
+        return;
+      }
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
       if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
@@ -2461,8 +2799,10 @@ public class BlockManager {
     namesystem.writeLock();
     try {
       // blocks should not be replicated or removed if safe mode is on
-      if (namesystem.isInSafeMode())
+      if (namesystem.isInSafeMode()) {
+        LOG.debug("In safemode, not computing replication work");
         return 0;
+      }
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
       return invalidateBlocks.invalidateWork(nodeId);
@@ -2645,6 +2985,19 @@ public class BlockManager {
     return workFound;
   }
 
+  /**
+   * Clear all queues that hold decisions previously made by
+   * this NameNode.
+   */
+  public void clearQueues() {
+    neededReplications.clear();
+    pendingReplications.clear();
+    excessReplicateMap.clear();
+    invalidateBlocks.clear();
+    datanodeManager.clearPendingQueues();
+  };
+  
+
   private static class ReplicationWork {
 
     private Block block;
@@ -2675,4 +3028,24 @@ public class BlockManager {
       this.targets = null;
     }
   }
+
+  /**
+   * A simple result enum for the result of
+   * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
+   */
+  enum MisReplicationResult {
+    /** The block should be invalidated since it belongs to a deleted file. */
+    INVALID,
+    /** The block is currently under-replicated. */
+    UNDER_REPLICATED,
+    /** The block is currently over-replicated. */
+    OVER_REPLICATED,
+    /** A decision can't currently be made about this block. */
+    POSTPONE,
+    /** The block is under construction, so should be ignored */
+    UNDER_CONSTRUCTION,
+    /** The block is properly replicated */
+    OK
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Sat Mar  3 00:42:49 2012
@@ -63,7 +63,7 @@ public class BlockPlacementPolicyDefault
     initialize(conf, stats, clusterMap);
   }
 
-  BlockPlacementPolicyDefault() {
+  protected BlockPlacementPolicyDefault() {
   }
     
   @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Sat Mar  3 00:42:49 2012
@@ -94,6 +94,10 @@ public class DatanodeDescriptor extends 
     boolean contains(E e) {
       return blockq.contains(e);
     }
+
+    synchronized void clear() {
+      blockq.clear();
+    }
   }
 
   private volatile BlockInfo blockList = null;
@@ -103,6 +107,24 @@ public class DatanodeDescriptor extends 
   public boolean isAlive = false;
   public boolean needKeyUpdate = false;
 
+  /**
+   * Set to false on any NN failover, and reset to true
+   * whenever a block report is received.
+   */
+  private boolean heartbeatedSinceFailover = false;
+  
+  /**
+   * At startup or at any failover, the DNs in the cluster may
+   * have pending block deletions from a previous incarnation
+   * of the NameNode. Thus, we consider their block contents
+   * stale until we have received a block report. When a DN
+   * is considered stale, any replicas on it are transitively
+   * considered stale. If any block has at least one stale replica,
+   * then no invalidations will be processed for this block.
+   * See HDFS-1972.
+   */
+  private boolean blockContentsStale = true;
+  
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
   // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -129,6 +151,10 @@ public class DatanodeDescriptor extends 
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
+  
+  /** Set to false after processing first block report */
+  private boolean firstBlockReport = true;
+  
   /** 
    * When set to true, the node is not in include list and is not allowed
    * to communicate with the namenode
@@ -281,6 +307,14 @@ public class DatanodeDescriptor extends 
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
   }
+  
+  public void clearBlockQueues() {
+    synchronized (invalidateBlocks) {
+      this.invalidateBlocks.clear();
+      this.recoverBlocks.clear();
+      this.replicateBlocks.clear();
+    }
+  }
 
   public int numBlocks() {
     return numBlocks;
@@ -298,6 +332,7 @@ public class DatanodeDescriptor extends 
     this.lastUpdate = System.currentTimeMillis();
     this.xceiverCount = xceiverCount;
     this.volumeFailures = volFailures;
+    this.heartbeatedSinceFailover = true;
     rollBlocksScheduled(lastUpdate);
   }
 
@@ -564,5 +599,41 @@ public class DatanodeDescriptor extends 
     this.bandwidth = bandwidth;
   }
 
+  public boolean areBlockContentsStale() {
+    return blockContentsStale;
+  }
+
+  public void markStaleAfterFailover() {
+    heartbeatedSinceFailover = false;
+    blockContentsStale = true;
+  }
+
+  public void receivedBlockReport() {
+    if (heartbeatedSinceFailover) {
+      blockContentsStale = false;
+    }
+    firstBlockReport = false;
+  }
+  
+  boolean isFirstBlockReport() {
+    return firstBlockReport;
+  }
 
+  @Override
+  public String dumpDatanode() {
+    StringBuilder sb = new StringBuilder(super.dumpDatanode());
+    int repl = replicateBlocks.size();
+    if (repl > 0) {
+      sb.append(" ").append(repl).append(" blocks to be replicated;");
+    }
+    int inval = invalidateBlocks.size();
+    if (inval > 0) {
+      sb.append(" ").append(inval).append(" blocks to be invalidated;");      
+    }
+    int recover = recoverBlocks.size();
+    if (recover > 0) {
+      sb.append(" ").append(recover).append(" blocks to be recovered;");
+    }
+    return sb.toString();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Sat Mar  3 00:42:49 2012
@@ -923,7 +923,7 @@ public class DatanodeManager {
       }
     }
 
-    return null;
+    return new DatanodeCommand[0];
   }
 
   /**
@@ -947,4 +947,27 @@ public class DatanodeManager {
       }
     }
   }
+  
+  public void markAllDatanodesStale() {
+    LOG.info("Marking all datandoes as stale");
+    synchronized (datanodeMap) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.markStaleAfterFailover();
+      }
+    }
+  }
+
+  /**
+   * Clear any actions that are queued up to be sent to the DNs
+   * on their next heartbeats. This includes block invalidations,
+   * recoveries, and replication requests.
+   */
+  public void clearPendingQueues() {
+    synchronized (datanodeMap) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.clearBlockQueues();
+      }
+    }
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Sat Mar  3 00:42:49 2012
@@ -160,4 +160,9 @@ class InvalidateBlocks {
     numBlocks -= toInvalidate.size();
     return toInvalidate;
   }
+  
+  synchronized void clear() {
+    node2blocks.clear();
+    numBlocks = 0;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java Sat Mar  3 00:42:49 2012
@@ -26,20 +26,22 @@ public class NumberReplicas {
   private int decommissionedReplicas;
   private int corruptReplicas;
   private int excessReplicas;
+  private int replicasOnStaleNodes;
 
   NumberReplicas() {
-    initialize(0, 0, 0, 0);
+    initialize(0, 0, 0, 0, 0);
   }
 
-  NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
-    initialize(live, decommissioned, corrupt, excess);
+  NumberReplicas(int live, int decommissioned, int corrupt, int excess, int stale) {
+    initialize(live, decommissioned, corrupt, excess, stale);
   }
 
-  void initialize(int live, int decommissioned, int corrupt, int excess) {
+  void initialize(int live, int decommissioned, int corrupt, int excess, int stale) {
     liveReplicas = live;
     decommissionedReplicas = decommissioned;
     corruptReplicas = corrupt;
     excessReplicas = excess;
+    replicasOnStaleNodes = stale;
   }
 
   public int liveReplicas() {
@@ -54,4 +56,13 @@ public class NumberReplicas {
   public int excessReplicas() {
     return excessReplicas;
   }
+  
+  /**
+   * @return the number of replicas which are on stale nodes.
+   * This is not mutually exclusive with the other counts -- ie a
+   * replica may count as both "live" and "stale".
+   */
+  public int replicasOnStaleNodes() {
+    return replicasOnStaleNodes;
+  }
 } 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Sat Mar  3 00:42:49 2012
@@ -104,6 +104,14 @@ class PendingReplicationBlocks {
     }
   }
 
+
+  public void clear() {
+    synchronized (pendingReplications) {
+      pendingReplications.clear();
+      timedOutItems.clear();
+    }
+  }
+
   /**
    * The total number of blocks that are undergoing replication
    */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Sat Mar  3 00:42:49 2012
@@ -568,7 +568,7 @@ public abstract class Storage extends St
      * <p> Locking is not supported by all file systems.
      * E.g., NFS does not consistently support exclusive locks.
      * 
-     * <p> If locking is supported we guarantee exculsive access to the
+     * <p> If locking is supported we guarantee exclusive access to the
      * storage directory. Otherwise, no guarantee is given.
      * 
      * @throws IOException if locking fails

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java?rev=1296534&r1=1296533&r2=1296534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java Sat Mar  3 00:42:49 2012
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -97,9 +98,9 @@ public final class Util {
    * @param names collection of strings to convert to URIs
    * @return collection of URIs
    */
-  public static Collection<URI> stringCollectionAsURIs(
+  public static List<URI> stringCollectionAsURIs(
                                   Collection<String> names) {
-    Collection<URI> uris = new ArrayList<URI>(names.size());
+    List<URI> uris = new ArrayList<URI>(names.size());
     for(String name : names) {
       try {
         uris.add(stringAsURI(name));



Mime
View raw message