Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 4718 invoked from network); 14 Sep 2009 17:45:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 14 Sep 2009 17:45:01 -0000 Received: (qmail 54736 invoked by uid 500); 14 Sep 2009 17:45:01 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 54693 invoked by uid 500); 14 Sep 2009 17:45:00 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 54673 invoked by uid 99); 14 Sep 2009 17:45:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2009 17:45:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2009 17:44:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3CD7523888CD; Mon, 14 Sep 2009 17:44:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r814751 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ Date: Mon, 14 Sep 2009 17:44:05 -0000 To: hdfs-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090914174405.3CD7523888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Mon Sep 14 17:44:04 2009 New Revision: 814751 URL: http://svn.apache.org/viewvc?rev=814751&view=rev Log: HDFS-604. Block report processing for append. Contributed by Konstantin Shvachko. Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=814751&r1=814750&r2=814751&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original) +++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Mon Sep 14 17:44:04 2009 @@ -25,6 +25,8 @@ HDFS-585. Datanode should serve up to visible length of a replica for read requests. (szetszwo) + HDFS-604. Block report processing for append. (shv) + IMPROVEMENTS HDFS-509. Redesign DataNode volumeMap to include all types of Replicas. Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=814751&r1=814750&r2=814751&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java Mon Sep 14 17:44:04 2009 @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; @@ -35,7 +37,7 @@ * Block replicas as assigned when the block was allocated. * This defines the pipeline order. */ - private ReplicaUnderConstruction[] replicas; + private List replicas; /** A data-node responsible for block recovery. */ private int primaryNodeIndex = -1; @@ -81,6 +83,13 @@ } /** + * Set replica state. + */ + void setState(ReplicaState s) { + state = s; + } + + /** * Is data-node the replica belongs to alive. */ boolean isAlive() { @@ -137,10 +146,10 @@ void setLocations(DatanodeDescriptor[] targets) { int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ReplicaUnderConstruction[numLocations]; + this.replicas = new ArrayList(numLocations); for(int i = 0; i < numLocations; i++) - replicas[i] = - new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW); + replicas.add( + new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW)); } /** @@ -148,15 +157,15 @@ * (as has been assigned by chooseTargets()). */ private DatanodeDescriptor[] getExpectedLocations() { - int numLocations = replicas == null ? 0 : replicas.length; + int numLocations = replicas == null ? 0 : replicas.size(); DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations]; for(int i = 0; i < numLocations; i++) - locations[i] = replicas[i].getExpectedLocation(); + locations[i] = replicas.get(i).getExpectedLocation(); return locations; } int getNumLocations() { - return replicas == null ? 0 : replicas.length; + return replicas == null ? 0 : replicas.size(); } /** @@ -191,18 +200,18 @@ * Find the first alive data-node starting from the previous primary. */ void assignPrimaryDatanode() { - if (replicas.length == 0) { + if (replicas.size() == 0) { NameNode.stateChangeLog.warn("BLOCK*" + " INodeFileUnderConstruction.initLeaseRecovery:" + " No blocks found, lease removed."); } int previous = primaryNodeIndex; - for(int i = 1; i <= replicas.length; i++) { - int j = (previous + i)%replicas.length; - if (replicas[j].isAlive()) { + for(int i = 1; i <= replicas.size(); i++) { + int j = (previous + i)%replicas.size(); + if (replicas.get(j).isAlive()) { primaryNodeIndex = j; - DatanodeDescriptor primary = replicas[j].getExpectedLocation(); + DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); primary.addBlockToBeRecovered(this, getExpectedLocations()); NameNode.stateChangeLog.info("BLOCK* " + this + " recovery started, primary=" + primary); @@ -223,6 +232,15 @@ return expired; } + void addReplicaIfNotPresent(DatanodeDescriptor dn, + Block block, + ReplicaState rState) { + for(ReplicaUnderConstruction r : replicas) + if(r.getExpectedLocation() == dn) + return; + replicas.add(new ReplicaUnderConstruction(block, dn, rState)); + } + @Override // BlockInfo // BlockInfoUnderConstruction participates in maps the same way as BlockInfo public int hashCode() { Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=814751&r1=814750&r2=814751&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Mon Sep 14 17:44:04 2009 @@ -37,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator; import org.apache.hadoop.security.AccessTokenHandler; @@ -248,13 +251,15 @@ Block commitBlock) throws IOException { if(commitBlock == null) return; // not committing, this is a block allocation retry - BlockInfoUnderConstruction lastBlock = fileINode.getLastBlock(); + BlockInfo lastBlock = fileINode.getLastBlock(); if(lastBlock == null) return; // no blocks in file yet + if(!lastBlock.isUnderConstruction()) + return; // already completed (e.g. by syncBlock) assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() : "commitBlock length is less than the stored one " + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes(); - lastBlock.commitBlock(commitBlock); + ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock); } /** @@ -264,12 +269,13 @@ * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - void completeBlock(INodeFile fileINode, int blkIndex) throws IOException { + BlockInfo completeBlock(INodeFile fileINode, int blkIndex) + throws IOException { if(blkIndex < 0) - return; + return null; BlockInfo curBlock = fileINode.getBlocks()[blkIndex]; if(!curBlock.isUnderConstruction()) - return; + return curBlock; BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock; if(ucBlock.numNodes() < minReplication) throw new IOException("Cannot complete block: " + @@ -278,7 +284,17 @@ // replace penultimate block in file fileINode.setBlock(blkIndex, completeBlock); // replace block in the blocksMap - blocksMap.replaceBlock(completeBlock); + return blocksMap.replaceBlock(completeBlock); + } + + BlockInfo completeBlock(INodeFile fileINode, BlockInfo block) + throws IOException { + BlockInfo[] fileBlocks = fileINode.getBlocks(); + for(int idx = 0; idx < fileBlocks.length; idx++) + if(fileBlocks[idx] == block) { + return completeBlock(fileINode, idx); + } + return block; } /** @@ -430,7 +446,7 @@ pendingDeletionBlocksCount++; if (log) { NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: " - + b.getBlockName() + " to " + dn.getName()); + + b + " to " + dn.getName()); } } } @@ -460,7 +476,7 @@ } if (datanodes.length() != 0) { NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: " - + b.getBlockName() + " to " + datanodes.toString()); + + b + " to " + datanodes.toString()); } } @@ -943,7 +959,8 @@ Collection toAdd = new LinkedList(); Collection toRemove = new LinkedList(); Collection toInvalidate = new LinkedList(); - node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate); + Collection toCorrupt = new LinkedList(); + node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt); for (Block b : toRemove) { removeStoredBlock(b, node); @@ -957,6 +974,9 @@ + " does not belong to any file."); addToInvalidates(b, node); } + for (BlockInfo b : toCorrupt) { + markBlockAsCorrupt(b, node); + } } /** @@ -966,7 +986,8 @@ */ private Block addStoredBlock(final Block block, DatanodeDescriptor node, - DatanodeDescriptor delNodeHint) { + DatanodeDescriptor delNodeHint) + throws IOException { BlockInfo storedBlock = blocksMap.getStoredBlock(block); if (storedBlock == null || storedBlock.getINode() == null) { // If this block does not belong to anyfile, then we are done. @@ -1081,11 +1102,12 @@ // check whether safe replication is reached for the block namesystem.incrementSafeBlockCount(numCurrentReplica); - // - // if file is being actively written to, then do not check - // replication-factor here. It will be checked when the file is closed. - // + // if file is under construction, then check whether the block + // can be completed if (fileINode.isUnderConstruction()) { + if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && + numLiveReplicas >= minReplication) + storedBlock = completeBlock(fileINode, storedBlock); return storedBlock; } @@ -1311,7 +1333,30 @@ // Modify the blocks->datanode map and node's map. // pendingReplications.remove(block); - addStoredBlock(block, node, delHintNode); + + // blockReceived reports a finalized block + Collection toAdd = new LinkedList(); + Collection toInvalidate = new LinkedList(); + Collection toCorrupt = new LinkedList(); + node.processReportedBlock(this, block, ReplicaState.FINALIZED, + toAdd, toInvalidate, toCorrupt); + // the block is only in one of the lists + // if it is in none then data-node already has it + assert toAdd.size() + toInvalidate.size() <= 1 : + "The block should be only in one of the lists."; + + for (Block b : toAdd) { + addStoredBlock(b, node, delHintNode); + } + for (Block b : toInvalidate) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block " + + b + " on " + node.getName() + " size " + b.getNumBytes() + + " does not belong to any file."); + addToInvalidates(b, node); + } + for (BlockInfo b : toCorrupt) { + markBlockAsCorrupt(b, node); + } } /** @@ -1409,6 +1454,14 @@ return blocksMap.getStoredBlock(block); } + /** + * Find the block by block ID. + */ + BlockInfo findStoredBlock(long blockId) { + Block wildcardBlock = new Block(blockId, 0, GenerationStamp.WILDCARD_STAMP); + return blocksMap.getStoredBlock(wildcardBlock); + } + /* updates a block in under replication queue */ void updateNeededReplications(Block block, int curReplicasDelta, int expectedReplicasDelta) { Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=814751&r1=814750&r2=814751&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Mon Sep 14 17:44:04 2009 @@ -378,11 +378,12 @@ return blockarray; } - void reportDiff(BlocksMap blocksMap, + void reportDiff(BlockManager blockManager, BlockListAsLongs newReport, Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor - Collection toInvalidate) { // should be removed from DN + Collection toInvalidate, // should be removed from DN + Collection toCorrupt) {// add to corrupt replicas // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); @@ -390,44 +391,16 @@ assert added : "Delimiting block cannot be present in the node"; if(newReport == null) newReport = new BlockListAsLongs(); - // scan the report and collect newly reported blocks - // Note we are taking special precaution to limit tmp blocks allocated - // as part this block report - which why block list is stored as longs + // scan the report and process newly reported blocks BlockReportIterator itBR = newReport.getBlockReportIterator(); - Block iblk = null; - ReplicaState iState; while(itBR.hasNext()) { - iblk = itBR.next(); - iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); - if(storedBlock == null) { - // If block is not in blocksMap it does not belong to any file - toInvalidate.add(new Block(iblk)); - continue; - } - switch(iState) { - case FINALIZED: - case RWR: - break; - case RBW: // ignore these replicas for now to provide - case RUR: // compatibility with current block report processing - case TEMPORARY: - default: - continue; - } - if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN - // if the size differs from what is in the blockmap, then return - // the new block. addStoredBlock will then pick up the right size of this - // block and will update the block object in the BlocksMap - if (storedBlock.getNumBytes() != iblk.getNumBytes()) { - toAdd.add(new Block(iblk)); - } else { - toAdd.add(storedBlock); - } - continue; - } + Block iblk = itBR.next(); + ReplicaState iState = itBR.getCurrentReplicaState(); + BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState, + toAdd, toInvalidate, toCorrupt); // move block to the head of the list - this.moveBlockToHead(storedBlock); + if(storedBlock != null && storedBlock.findDatanode(this) >= 0) + this.moveBlockToHead(storedBlock); } // collect blocks that have not been reported // all of them are next to the delimiter @@ -437,6 +410,105 @@ this.removeBlock(delimiter); } + /** + * Process a block replica reported by the data-node. + * + *
    + *
  1. If the block is not known to the system (not in blocksMap) then the + * data-node should be notified to invalidate this block.
  2. + *
  3. If the reported replica is valid that is has the same generation stamp + * and length as recorded on the name-node, then the replica location is + * added to the name-node.
  4. + *
  5. If the reported replica is not valid, then it is marked as corrupt, + * which triggers replication of the existing valid replicas. + * Corrupt replicas are removed from the system when the block + * is fully replicated.
  6. + *
+ * + * @param blockManager + * @param block reported block replica + * @param rState reported replica state + * @param toAdd add to DatanodeDescriptor + * @param toInvalidate missing blocks (not in the blocks map) + * should be removed from the data-node + * @param toCorrupt replicas with unexpected length or generation stamp; + * add to corrupt replicas + * @return + */ + BlockInfo processReportedBlock( + BlockManager blockManager, + Block block, // reported block replica + ReplicaState rState, // reported replica state + Collection toAdd, // add to DatanodeDescriptor + Collection toInvalidate, // should be removed from DN + Collection toCorrupt) {// add to corrupt replicas + FSNamesystem.LOG.debug("Reported block " + block + + " on " + getName() + " size " + block.getNumBytes() + + "replicaState = " + rState); + + // find block by blockId + BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId()); + if(storedBlock == null) { + // If blocksMap does not contain reported block id, + // the replica should be removed from the data-node. + toInvalidate.add(new Block(block)); + return null; + } + + FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState()); + + // Block is on the DN + boolean isCorrupt = false; + switch(rState) { + case FINALIZED: + switch(storedBlock.getBlockUCState()) { + case COMPLETE: + case COMMITTED: + // This is a temporary hack until Block.equals() and compareTo() + // are changed not to take into account the generation stamp for searching + // in blocksMap + if(storedBlock.getGenerationStamp() != block.getGenerationStamp()) { + toInvalidate.add(new Block(block)); + return storedBlock; + } + + if(storedBlock.getGenerationStamp() != block.getGenerationStamp() + || storedBlock.getNumBytes() != block.getNumBytes()) + isCorrupt = true; + break; + case UNDER_CONSTRUCTION: + case UNDER_RECOVERY: + ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( + this, block, rState); + } + if(!isCorrupt && storedBlock.findDatanode(this) < 0) + if (storedBlock.getNumBytes() != block.getNumBytes()) { + toAdd.add(new Block(block)); + } else { + toAdd.add(storedBlock); + } + break; + case RBW: + case RWR: + if(storedBlock.isUnderConstruction()) + ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( + this, block, rState); + else + isCorrupt = true; + break; + case RUR: // should not be reported + case TEMPORARY: // should not be reported + default: + FSNamesystem.LOG.warn("Unexpected replica state " + rState + + " for block: " + storedBlock + + " on " + getName() + " size " + storedBlock.getNumBytes()); + break; + } + if(isCorrupt) + toCorrupt.add(storedBlock); + return storedBlock; + } + /** Serialization for FSEditLog */ void readFieldsFromFSEditLog(DataInput in) throws IOException { this.name = DeprecatedUTF8.readString(in); Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java?rev=814751&r1=814750&r2=814751&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java Mon Sep 14 17:44:04 2009 @@ -30,6 +30,8 @@ import org.apache.log4j.Level; public class TestRenameWhileOpen extends junit.framework.TestCase { + private static final long LEASE_PERIOD = 500L; + { ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); @@ -52,7 +54,7 @@ conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); conf.setInt("heartbeat.recheck.interval", 1000); conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt("dfs.safemode.threshold.pct", 1); + conf.setFloat("dfs.safemode.threshold.pct", 0.5f); conf.setBoolean("dfs.support.append", true); // create cluster @@ -104,7 +106,7 @@ try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {} cluster = new MiniDFSCluster(nnport, conf, 1, false, true, null, null, null); - cluster.waitActive(); + waitLeaseRecovery(cluster); // restart cluster yet again. This triggers the code to read in // persistent leases from fsimage. @@ -136,7 +138,7 @@ conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); conf.setInt("heartbeat.recheck.interval", 1000); conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt("dfs.safemode.threshold.pct", 1); + conf.setFloat("dfs.safemode.threshold.pct", 0.5f); conf.setBoolean("dfs.support.append", true); System.out.println("Test 2************************************"); @@ -176,7 +178,7 @@ try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {} cluster = new MiniDFSCluster(nnport, conf, 1, false, true, null, null, null); - cluster.waitActive(); + waitLeaseRecovery(cluster); // restart cluster yet again. This triggers the code to read in // persistent leases from fsimage. @@ -209,7 +211,7 @@ conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); conf.setInt("heartbeat.recheck.interval", 1000); conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt("dfs.safemode.threshold.pct", 1); + conf.setFloat("dfs.safemode.threshold.pct", 0.5f); conf.setBoolean("dfs.support.append", true); System.out.println("Test 3************************************"); @@ -241,7 +243,7 @@ try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {} cluster = new MiniDFSCluster(nnport, conf, 1, false, true, null, null, null); - cluster.waitActive(); + waitLeaseRecovery(cluster); // restart cluster yet again. This triggers the code to read in // persistent leases from fsimage. @@ -272,7 +274,7 @@ conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); conf.setInt("heartbeat.recheck.interval", 1000); conf.setInt("dfs.heartbeat.interval", 1); - conf.setInt("dfs.safemode.threshold.pct", 1); + conf.setFloat("dfs.safemode.threshold.pct", 0.5f); conf.setBoolean("dfs.support.append", true); System.out.println("Test 4************************************"); @@ -303,7 +305,7 @@ try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {} cluster = new MiniDFSCluster(nnport, conf, 1, false, true, null, null, null); - cluster.waitActive(); + waitLeaseRecovery(cluster); // restart cluster yet again. This triggers the code to read in // persistent leases from fsimage. @@ -323,4 +325,13 @@ cluster.shutdown(); } } + + void waitLeaseRecovery(MiniDFSCluster cluster) { + cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD); + // wait for the lease to expire + try { + Thread.sleep(5 * LEASE_PERIOD); + } catch (InterruptedException e) { + } + } }