hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r814751 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Mon, 14 Sep 2009 17:44:05 GMT
Author: shv
Date: Mon Sep 14 17:44:04 2009
New Revision: 814751

URL: http://svn.apache.org/viewvc?rev=814751&view=rev
Log:
HDFS-604. Block report processing for append. Contributed by Konstantin Shvachko.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=814751&r1=814750&r2=814751&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Mon Sep 14 17:44:04 2009
@@ -25,6 +25,8 @@
     HDFS-585. Datanode should serve up to visible length of a replica for read
     requests.  (szetszwo)
 
+    HDFS-604. Block report processing for append. (shv)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=814751&r1=814750&r2=814751&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
(original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
Mon Sep 14 17:44:04 2009
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -35,7 +37,7 @@
    * Block replicas as assigned when the block was allocated.
    * This defines the pipeline order.
    */
-  private ReplicaUnderConstruction[] replicas;
+  private List<ReplicaUnderConstruction> replicas;
 
   /** A data-node responsible for block recovery. */
   private int primaryNodeIndex = -1;
@@ -81,6 +83,13 @@
     }
 
     /**
+     * Set replica state.
+     */
+    void setState(ReplicaState s) {
+      state = s;
+    }
+
+    /**
      * Is data-node the replica belongs to alive.
      */
     boolean isAlive() {
@@ -137,10 +146,10 @@
 
   void setLocations(DatanodeDescriptor[] targets) {
     int numLocations = targets == null ? 0 : targets.length;
-    this.replicas = new ReplicaUnderConstruction[numLocations];
+    this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
     for(int i = 0; i < numLocations; i++)
-      replicas[i] =
-        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW);
+      replicas.add(
+        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
   }
 
   /**
@@ -148,15 +157,15 @@
    * (as has been assigned by chooseTargets()).
    */
   private DatanodeDescriptor[] getExpectedLocations() {
-    int numLocations = replicas == null ? 0 : replicas.length;
+    int numLocations = replicas == null ? 0 : replicas.size();
     DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
     for(int i = 0; i < numLocations; i++)
-      locations[i] = replicas[i].getExpectedLocation();
+      locations[i] = replicas.get(i).getExpectedLocation();
     return locations;
   }
 
   int getNumLocations() {
-    return replicas == null ? 0 : replicas.length;
+    return replicas == null ? 0 : replicas.size();
   }
 
   /**
@@ -191,18 +200,18 @@
    * Find the first alive data-node starting from the previous primary.
    */
   void assignPrimaryDatanode() {
-    if (replicas.length == 0) {
+    if (replicas.size() == 0) {
       NameNode.stateChangeLog.warn("BLOCK*"
         + " INodeFileUnderConstruction.initLeaseRecovery:"
         + " No blocks found, lease removed.");
     }
 
     int previous = primaryNodeIndex;
-    for(int i = 1; i <= replicas.length; i++) {
-      int j = (previous + i)%replicas.length;
-      if (replicas[j].isAlive()) {
+    for(int i = 1; i <= replicas.size(); i++) {
+      int j = (previous + i)%replicas.size();
+      if (replicas.get(j).isAlive()) {
         primaryNodeIndex = j;
-        DatanodeDescriptor primary = replicas[j].getExpectedLocation(); 
+        DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); 
         primary.addBlockToBeRecovered(this, getExpectedLocations());
         NameNode.stateChangeLog.info("BLOCK* " + this
           + " recovery started, primary=" + primary);
@@ -223,6 +232,15 @@
     return expired;
   }
 
+  void addReplicaIfNotPresent(DatanodeDescriptor dn,
+                     Block block,
+                     ReplicaState rState) {
+    for(ReplicaUnderConstruction r : replicas)
+      if(r.getExpectedLocation() == dn)
+        return;
+    replicas.add(new ReplicaUnderConstruction(block, dn, rState));
+  }
+
   @Override // BlockInfo
   // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
   public int hashCode() {

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=814751&r1=814750&r2=814751&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
(original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
Mon Sep 14 17:44:04 2009
@@ -37,6 +37,9 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.security.AccessTokenHandler;
@@ -248,13 +251,15 @@
                        Block commitBlock) throws IOException {
     if(commitBlock == null)
       return; // not committing, this is a block allocation retry
-    BlockInfoUnderConstruction lastBlock = fileINode.getLastBlock();
+    BlockInfo lastBlock = fileINode.getLastBlock();
     if(lastBlock == null)
       return; // no blocks in file yet
+    if(!lastBlock.isUnderConstruction())
+      return; // already completed (e.g. by syncBlock)
     assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
-    lastBlock.commitBlock(commitBlock);
+    ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
   }
 
   /**
@@ -264,12 +269,13 @@
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  void completeBlock(INodeFile fileINode, int blkIndex) throws IOException {
+  BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
+  throws IOException {
     if(blkIndex < 0)
-      return;
+      return null;
     BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
     if(!curBlock.isUnderConstruction())
-      return;
+      return curBlock;
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
     if(ucBlock.numNodes() < minReplication)
       throw new IOException("Cannot complete block: " +
@@ -278,7 +284,17 @@
     // replace penultimate block in file
     fileINode.setBlock(blkIndex, completeBlock);
     // replace block in the blocksMap
-    blocksMap.replaceBlock(completeBlock);
+    return blocksMap.replaceBlock(completeBlock);
+  }
+
+  BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
+  throws IOException {
+    BlockInfo[] fileBlocks = fileINode.getBlocks();
+    for(int idx = 0; idx < fileBlocks.length; idx++)
+      if(fileBlocks[idx] == block) {
+        return completeBlock(fileINode, idx);
+      }
+    return block;
   }
 
   /**
@@ -430,7 +446,7 @@
       pendingDeletionBlocksCount++;
       if (log) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-            + b.getBlockName() + " to " + dn.getName());
+            + b + " to " + dn.getName());
       }
     }
   }
@@ -460,7 +476,7 @@
     }
     if (datanodes.length() != 0) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-          + b.getBlockName() + " to " + datanodes.toString());
+          + b + " to " + datanodes.toString());
     }
   }
 
@@ -943,7 +959,8 @@
     Collection<Block> toAdd = new LinkedList<Block>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
-    node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
 
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -957,6 +974,9 @@
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -966,7 +986,8 @@
    */
   private Block addStoredBlock(final Block block,
                                DatanodeDescriptor node,
-                               DatanodeDescriptor delNodeHint) {
+                               DatanodeDescriptor delNodeHint)
+  throws IOException {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -1081,11 +1102,12 @@
     // check whether safe replication is reached for the block
     namesystem.incrementSafeBlockCount(numCurrentReplica);
 
-    //
-    // if file is being actively written to, then do not check
-    // replication-factor here. It will be checked when the file is closed.
-    //
+    // if file is under construction, then check whether the block
+    // can be completed
     if (fileINode.isUnderConstruction()) {
+      if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
+          numLiveReplicas >= minReplication)
+        storedBlock = completeBlock(fileINode, storedBlock);
       return storedBlock;
     }
 
@@ -1311,7 +1333,30 @@
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.remove(block);
-    addStoredBlock(block, node, delHintNode);
+
+    // blockReceived reports a finalized block
+    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.processReportedBlock(this, block, ReplicaState.FINALIZED,
+                              toAdd, toInvalidate, toCorrupt);
+    // the block is only in one of the lists
+    // if it is in none then data-node already has it
+    assert toAdd.size() + toInvalidate.size() <= 1 :
+      "The block should be only in one of the lists.";
+
+    for (Block b : toAdd) {
+      addStoredBlock(b, node, delHintNode);
+    }
+    for (Block b : toInvalidate) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+          + b + " on " + node.getName() + " size " + b.getNumBytes()
+          + " does not belong to any file.");
+      addToInvalidates(b, node);
+    }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -1409,6 +1454,14 @@
     return blocksMap.getStoredBlock(block);
   }
 
+  /**
+   * Find the block by block ID.
+   */
+  BlockInfo findStoredBlock(long blockId) {
+    Block wildcardBlock = new Block(blockId, 0, GenerationStamp.WILDCARD_STAMP);
+    return blocksMap.getStoredBlock(wildcardBlock);
+  }
+
   /* updates a block in under replication queue */
   void updateNeededReplications(Block block, int curReplicasDelta,
       int expectedReplicasDelta) {

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=814751&r1=814750&r2=814751&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
(original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
Mon Sep 14 17:44:04 2009
@@ -378,11 +378,12 @@
     return blockarray;
   }
 
-  void reportDiff(BlocksMap blocksMap,
+  void reportDiff(BlockManager blockManager,
                   BlockListAsLongs newReport,
                   Collection<Block> toAdd,    // add to DatanodeDescriptor
                   Collection<Block> toRemove, // remove from DatanodeDescriptor
-                  Collection<Block> toInvalidate) { // should be removed from DN
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -390,44 +391,16 @@
     assert added : "Delimiting block cannot be present in the node";
     if(newReport == null)
       newReport = new BlockListAsLongs();
-    // scan the report and collect newly reported blocks
-    // Note we are taking special precaution to limit tmp blocks allocated
-    // as part this block report - which why block list is stored as longs
+    // scan the report and process newly reported blocks
     BlockReportIterator itBR = newReport.getBlockReportIterator();
-    Block iblk = null;
-    ReplicaState iState;
     while(itBR.hasNext()) {
-      iblk = itBR.next();
-      iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-      if(storedBlock == null) {
-        // If block is not in blocksMap it does not belong to any file
-        toInvalidate.add(new Block(iblk));
-        continue;
-      }
-      switch(iState) {
-      case FINALIZED:
-      case RWR:
-        break;
-      case RBW: // ignore these replicas for now to provide
-      case RUR: // compatibility with current block report processing
-      case TEMPORARY:
-      default:
-        continue;
-      }
-      if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
-        // if the size differs from what is in the blockmap, then return
-        // the new block. addStoredBlock will then pick up the right size of this
-        // block and will update the block object in the BlocksMap
-        if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
-          toAdd.add(new Block(iblk));
-        } else {
-          toAdd.add(storedBlock);
-        }
-        continue;
-      }
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
+                                               toAdd, toInvalidate, toCorrupt);
       // move block to the head of the list
-      this.moveBlockToHead(storedBlock);
+      if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
+        this.moveBlockToHead(storedBlock);
     }
     // collect blocks that have not been reported
     // all of them are next to the delimiter
@@ -437,6 +410,105 @@
     this.removeBlock(delimiter);
   }
 
+  /**
+   * Process a block replica reported by the data-node.
+   * 
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location is
+   * added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * </ol>
+   * 
+   * @param blockManager
+   * @param block reported block replica
+   * @param rState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @return
+   */
+  BlockInfo processReportedBlock(
+                  BlockManager blockManager,
+                  Block block,                // reported block replica
+                  ReplicaState rState,        // reported replica state
+                  Collection<Block> toAdd,    // add to DatanodeDescriptor
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+    FSNamesystem.LOG.debug("Reported block " + block
+        + " on " + getName() + " size " + block.getNumBytes()
+        + "replicaState = " + rState);
+
+    // find block by blockId
+    BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId());
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
+
+    FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState());
+
+    // Block is on the DN
+    boolean isCorrupt = false;
+    switch(rState) {
+    case FINALIZED:
+      switch(storedBlock.getBlockUCState()) {
+      case COMPLETE:
+      case COMMITTED:
+        // This is a temporary hack until Block.equals() and compareTo()
+        // are changed not to take into account the generation stamp for searching
+        // in  blocksMap
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()) {
+          toInvalidate.add(new Block(block));
+          return storedBlock;
+        }
+
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
+            || storedBlock.getNumBytes() != block.getNumBytes())
+          isCorrupt = true;
+        break;
+      case UNDER_CONSTRUCTION:
+      case UNDER_RECOVERY:
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+            this, block, rState);
+      }
+      if(!isCorrupt && storedBlock.findDatanode(this) < 0)
+        if (storedBlock.getNumBytes() != block.getNumBytes()) {
+          toAdd.add(new Block(block));
+        } else {
+          toAdd.add(storedBlock);
+        }
+      break;
+    case RBW:
+    case RWR:
+      if(storedBlock.isUnderConstruction())
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+                                                      this, block, rState);
+      else
+        isCorrupt = true;
+      break;
+    case RUR:       // should not be reported
+    case TEMPORARY: // should not be reported
+    default:
+      FSNamesystem.LOG.warn("Unexpected replica state " + rState
+          + " for block: " + storedBlock + 
+          " on " + getName() + " size " + storedBlock.getNumBytes());
+      break;
+    }
+    if(isCorrupt)
+        toCorrupt.add(storedBlock);
+    return storedBlock;
+  }
+
   /** Serialization for FSEditLog */
   void readFieldsFromFSEditLog(DataInput in) throws IOException {
     this.name = DeprecatedUTF8.readString(in);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java?rev=814751&r1=814750&r2=814751&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
(original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
Mon Sep 14 17:44:04 2009
@@ -30,6 +30,8 @@
 import org.apache.log4j.Level;
 
 public class TestRenameWhileOpen extends junit.framework.TestCase {
+  private static final long LEASE_PERIOD = 500L;
+
   {
     ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
@@ -52,7 +54,7 @@
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
     conf.setInt("heartbeat.recheck.interval", 1000);
     conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt("dfs.safemode.threshold.pct", 1);
+    conf.setFloat("dfs.safemode.threshold.pct", 0.5f);
     conf.setBoolean("dfs.support.append", true);
 
     // create cluster
@@ -104,7 +106,7 @@
       try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {}
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
                                    null, null, null);
-      cluster.waitActive();
+      waitLeaseRecovery(cluster);
 
       // restart cluster yet again. This triggers the code to read in
       // persistent leases from fsimage.
@@ -136,7 +138,7 @@
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
     conf.setInt("heartbeat.recheck.interval", 1000);
     conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt("dfs.safemode.threshold.pct", 1);
+    conf.setFloat("dfs.safemode.threshold.pct", 0.5f);
     conf.setBoolean("dfs.support.append", true);
     System.out.println("Test 2************************************");
 
@@ -176,7 +178,7 @@
       try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {}
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
                                    null, null, null);
-      cluster.waitActive();
+      waitLeaseRecovery(cluster);
 
       // restart cluster yet again. This triggers the code to read in
       // persistent leases from fsimage.
@@ -209,7 +211,7 @@
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
     conf.setInt("heartbeat.recheck.interval", 1000);
     conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt("dfs.safemode.threshold.pct", 1);
+    conf.setFloat("dfs.safemode.threshold.pct", 0.5f);
     conf.setBoolean("dfs.support.append", true);
     System.out.println("Test 3************************************");
 
@@ -241,7 +243,7 @@
       try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {}
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
                                    null, null, null);
-      cluster.waitActive();
+      waitLeaseRecovery(cluster);
 
       // restart cluster yet again. This triggers the code to read in
       // persistent leases from fsimage.
@@ -272,7 +274,7 @@
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
     conf.setInt("heartbeat.recheck.interval", 1000);
     conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt("dfs.safemode.threshold.pct", 1);
+    conf.setFloat("dfs.safemode.threshold.pct", 0.5f);
     conf.setBoolean("dfs.support.append", true);
     System.out.println("Test 4************************************");
 
@@ -303,7 +305,7 @@
       try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {}
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
                                    null, null, null);
-      cluster.waitActive();
+      waitLeaseRecovery(cluster);
 
       // restart cluster yet again. This triggers the code to read in
       // persistent leases from fsimage.
@@ -323,4 +325,13 @@
       cluster.shutdown();
     }
   }
+
+  void waitLeaseRecovery(MiniDFSCluster cluster) {
+    cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
+    // wait for the lease to expire
+    try {
+      Thread.sleep(5 * LEASE_PERIOD);
+    } catch (InterruptedException e) {
+    }
+  }
 }



Mime
View raw message