Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 971072F3B for ; Fri, 29 Apr 2011 18:17:37 +0000 (UTC) Received: (qmail 21664 invoked by uid 500); 29 Apr 2011 18:17:37 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 21639 invoked by uid 500); 29 Apr 2011 18:17:37 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 21631 invoked by uid 99); 29 Apr 2011 18:17:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2011 18:17:37 +0000 X-ASF-Spam-Status: No, hits=-1996.5 required=5.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2011 18:17:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CC33A2388C1C; Fri, 29 Apr 2011 18:16:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1097905 [12/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/j... Date: Fri, 29 Apr 2011 18:16:38 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110429181641.CC33A2388C1C@eris.apache.org> Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Apr 29 18:16:32 2011 @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Map; import java.util.Random; import javax.management.NotCompliantMBeanException; @@ -34,8 +35,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice; +import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -91,13 +95,14 @@ public class SimulatedFSDataset impleme SimulatedOutputStream oStream = null; private long bytesAcked; private long bytesRcvd; - BInfo(Block b, boolean forWriting) throws IOException { + BInfo(String bpid, Block b, boolean forWriting) throws IOException { theBlock = new Block(b); if (theBlock.getNumBytes() < 0) { theBlock.setNumBytes(0); } - if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may - // be more - we find out at finalize + if (!storage.alloc(bpid, theBlock.getNumBytes())) { + // expected length - actual length may + // be more - we find out at finalize DataNode.LOG.warn("Lack of free storage on a block alloc"); throw new IOException("Creating block, no free space available"); } @@ -140,7 +145,8 @@ public class SimulatedFSDataset impleme } } - synchronized void finalizeBlock(long finalSize) throws IOException { + synchronized void finalizeBlock(String bpid, long finalSize) + throws IOException { if (finalized) { throw new IOException( "Finalizing a block that has already been finalized" + @@ -161,12 +167,12 @@ public class SimulatedFSDataset impleme // adjust if necessary long extraLen = finalSize - theBlock.getNumBytes(); if (extraLen > 0) { - if (!storage.alloc(extraLen)) { + if (!storage.alloc(bpid,extraLen)) { DataNode.LOG.warn("Lack of free storage on a block alloc"); throw new IOException("Creating block, no free space available"); } } else { - storage.free(-extraLen); + storage.free(bpid, -extraLen); } theBlock.setNumBytes(finalSize); @@ -259,12 +265,41 @@ public class SimulatedFSDataset impleme } } - static private class SimulatedStorage { - private long capacity; // in bytes + /** + * Class is used for tracking block pool storage utilization similar + * to {@link BlockPoolSlice} + */ + private static class SimulatedBPStorage { private long used; // in bytes + long getUsed() { + return used; + } + + void alloc(long amount) { + used += amount; + } + + void free(long amount) { + used -= amount; + } + + SimulatedBPStorage() { + used = 0; + } + } + + /** + * Class used for tracking datanode level storage utilization similar + * to {@link FSVolumeSet} + */ + private static class SimulatedStorage { + private Map map = + new HashMap(); + private long capacity; // in bytes + synchronized long getFree() { - return capacity - used; + return capacity - getUsed(); } synchronized long getCapacity() { @@ -272,29 +307,55 @@ public class SimulatedFSDataset impleme } synchronized long getUsed() { + long used = 0; + for (SimulatedBPStorage bpStorage : map.values()) { + used += bpStorage.getUsed(); + } return used; } - synchronized boolean alloc(long amount) { + synchronized long getBlockPoolUsed(String bpid) throws IOException { + return getBPStorage(bpid).getUsed(); + } + + synchronized boolean alloc(String bpid, long amount) throws IOException { if (getFree() >= amount) { - used += amount; + getBPStorage(bpid).alloc(amount); return true; - } else { - return false; } + return false; } - synchronized void free(long amount) { - used -= amount; + synchronized void free(String bpid, long amount) throws IOException { + getBPStorage(bpid).free(amount); } SimulatedStorage(long cap) { capacity = cap; - used = 0; + } + + synchronized void addBlockPool(String bpid) { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage != null) { + return; + } + map.put(bpid, new SimulatedBPStorage()); + } + + synchronized void removeBlockPool(String bpid) { + map.remove(bpid); + } + + private SimulatedBPStorage getBPStorage(String bpid) throws IOException { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage == null) { + throw new IOException("block pool " + bpid + " not found"); + } + return bpStorage; } } - private HashMap blockMap = null; + private Map> blockMap = null; private SimulatedStorage storage = null; private String storageId; @@ -302,7 +363,9 @@ public class SimulatedFSDataset impleme setConf(conf); } - private SimulatedFSDataset() { // real construction when setConf called.. Uggg + // Constructor used for constructing the object using reflection + @SuppressWarnings("unused") + private SimulatedFSDataset() { // real construction when setConf called.. } public Configuration getConf() { @@ -316,14 +379,12 @@ public class SimulatedFSDataset impleme registerMBean(storageId); storage = new SimulatedStorage( conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); - //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + - // "Used = " + getDfsUsed() + "Free =" + getRemaining()); - - blockMap = new HashMap(); + blockMap = new HashMap>(); } - public synchronized void injectBlocks(Iterable injectBlocks) - throws IOException { + public synchronized void injectBlocks(String bpid, + Iterable injectBlocks) throws IOException { + ExtendedBlock blk = new ExtendedBlock(); if (injectBlocks != null) { int numInjectedBlocks = 0; for (Block b: injectBlocks) { // if any blocks in list is bad, reject list @@ -331,69 +392,95 @@ public class SimulatedFSDataset impleme if (b == null) { throw new NullPointerException("Null blocks in block list"); } - if (isValidBlock(b)) { + blk.set(bpid, b); + if (isValidBlock(blk)) { throw new IOException("Block already exists in block list"); } } - HashMap oldBlockMap = blockMap; - blockMap = new HashMap( - numInjectedBlocks + oldBlockMap.size()); - blockMap.putAll(oldBlockMap); + Map map = blockMap.get(bpid); + if (map == null) { + map = new HashMap(); + blockMap.put(bpid, map); + } + for (Block b: injectBlocks) { - BInfo binfo = new BInfo(b, false); - blockMap.put(binfo.theBlock, binfo); + BInfo binfo = new BInfo(bpid, b, false); + map.put(binfo.theBlock, binfo); } } } + + /** Get a map for a given block pool Id */ + private Map getMap(String bpid) throws IOException { + final Map map = blockMap.get(bpid); + if (map == null) { + throw new IOException("Non existent blockpool " + bpid); + } + return map; + } - @Override - public synchronized void finalizeBlock(Block b) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } - binfo.finalizeBlock(b.getNumBytes()); - + binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); } - @Override - public synchronized void unfinalizeBlock(Block b) throws IOException { + @Override // FSDatasetInterface + public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { if (isValidRbw(b)) { - blockMap.remove(b); + blockMap.remove(b.getLocalBlock()); } } @Override - public synchronized BlockListAsLongs getBlockReport() { - Block[] blockTable = new Block[blockMap.size()]; - int count = 0; - for (BInfo b : blockMap.values()) { - if (b.isFinalized()) { - blockTable[count++] = b.theBlock; + public synchronized BlockListAsLongs getBlockReport(String bpid) { + final Map map = blockMap.get(bpid); + Block[] blockTable = new Block[map.size()]; + if (map != null) { + int count = 0; + for (BInfo b : map.values()) { + if (b.isFinalized()) { + blockTable[count++] = b.theBlock; + } } - } - if (count != blockTable.length) { - blockTable = Arrays.copyOf(blockTable, count); + if (count != blockTable.length) { + blockTable = Arrays.copyOf(blockTable, count); + } + } else { + blockTable = new Block[0]; } return new BlockListAsLongs( new ArrayList(Arrays.asList(blockTable)), null); } + @Override // FSDatasetMBean public long getCapacity() throws IOException { return storage.getCapacity(); } + @Override // FSDatasetMBean public long getDfsUsed() throws IOException { return storage.getUsed(); } + @Override // FSDatasetMBean + public long getBlockPoolUsed(String bpid) throws IOException { + return storage.getBlockPoolUsed(bpid); + } + + @Override // FSDatasetMBean public long getRemaining() throws IOException { return storage.getFree(); } - @Override - public synchronized long getLength(Block b) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized long getLength(ExtendedBlock b) throws IOException { + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } @@ -402,65 +489,84 @@ public class SimulatedFSDataset impleme @Override @Deprecated - public Replica getReplica(long blockId) { - return blockMap.get(new Block(blockId)); + public Replica getReplica(String bpid, long blockId) { + final Map map = blockMap.get(bpid); + if (map != null) { + return map.get(new Block(blockId)); + } + return null; } @Override - public synchronized String getReplicaString(long blockId) { - final Replica r = blockMap.get(new Block(blockId)); + public synchronized String getReplicaString(String bpid, long blockId) { + Replica r = null; + final Map map = blockMap.get(bpid); + if (map != null) { + r = map.get(new Block(blockId)); + } return r == null? "null": r.toString(); } - @Override - public Block getStoredBlock(long blkid) throws IOException { - Block b = new Block(blkid); - BInfo binfo = blockMap.get(b); - if (binfo == null) { - return null; + @Override // FSDatasetInterface + public Block getStoredBlock(String bpid, long blkid) throws IOException { + final Map map = blockMap.get(bpid); + if (map != null) { + BInfo binfo = map.get(new Block(blkid)); + if (binfo == null) { + return null; + } + return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); } - b.setGenerationStamp(binfo.getGenerationStamp()); - b.setNumBytes(binfo.getNumBytes()); - return b; + return null; } - @Override - public synchronized void invalidate(Block[] invalidBlks) throws IOException { + @Override // FSDatasetInterface + public synchronized void invalidate(String bpid, Block[] invalidBlks) + throws IOException { boolean error = false; if (invalidBlks == null) { return; } + final Map map = getMap(bpid); for (Block b: invalidBlks) { if (b == null) { continue; } - BInfo binfo = blockMap.get(b); + BInfo binfo = map.get(b); if (binfo == null) { error = true; DataNode.LOG.warn("Invalidate: Missing block"); continue; } - storage.free(binfo.getNumBytes()); + storage.free(bpid, binfo.getNumBytes()); blockMap.remove(b); } - if (error) { - throw new IOException("Invalidate: Missing blocks."); - } + if (error) { + throw new IOException("Invalidate: Missing blocks."); + } } - @Override - public synchronized boolean isValidBlock(Block b) { - // return (blockMap.containsKey(b)); - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized boolean isValidBlock(ExtendedBlock b) { + final Map map = blockMap.get(b.getBlockPoolId()); + if (map == null) { + return false; + } + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { return false; } return binfo.isFinalized(); } + /* check if a block is created but not finalized */ @Override - public synchronized boolean isValidRbw(Block b) { - BInfo binfo = blockMap.get(b); + public synchronized boolean isValidRbw(ExtendedBlock b) { + final Map map = blockMap.get(b.getBlockPoolId()); + if (map == null) { + return false; + } + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { return false; } @@ -472,10 +578,11 @@ public class SimulatedFSDataset impleme return getStorageInfo(); } - @Override - public synchronized ReplicaInPipelineInterface append(Block b, + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - BInfo binfo = blockMap.get(b); + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null || !binfo.isFinalized()) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); @@ -484,10 +591,11 @@ public class SimulatedFSDataset impleme return binfo; } - @Override - public synchronized ReplicaInPipelineInterface recoverAppend(Block b, + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - BInfo binfo = blockMap.get(b); + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); @@ -495,32 +603,34 @@ public class SimulatedFSDataset impleme if (binfo.isFinalized()) { binfo.unfinalizeBlock(); } - blockMap.remove(b); + map.remove(b); binfo.theBlock.setGenerationStamp(newGS); - blockMap.put(binfo.theBlock, binfo); + map.put(binfo.theBlock, binfo); return binfo; } - @Override - public void recoverClose(Block b, long newGS, - long expectedBlockLen) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + throws IOException { + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); } if (!binfo.isFinalized()) { - binfo.finalizeBlock(binfo.getNumBytes()); + binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes()); } - blockMap.remove(b); + map.remove(b.getLocalBlock()); binfo.theBlock.setGenerationStamp(newGS); - blockMap.put(binfo.theBlock, binfo); + map.put(binfo.theBlock, binfo); } - @Override - public synchronized ReplicaInPipelineInterface recoverRbw(Block b, + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - BInfo binfo = blockMap.get(b); + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if ( binfo == null) { throw new ReplicaNotFoundException("Block " + b + " does not exist, and cannot be appended to."); @@ -529,20 +639,20 @@ public class SimulatedFSDataset impleme throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); } - blockMap.remove(b); + map.remove(b); binfo.theBlock.setGenerationStamp(newGS); - blockMap.put(binfo.theBlock, binfo); + map.put(binfo.theBlock, binfo); return binfo; } - @Override - public synchronized ReplicaInPipelineInterface createRbw(Block b) + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { return createTemporary(b); } - @Override - public synchronized ReplicaInPipelineInterface createTemporary(Block b) + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -552,35 +662,36 @@ public class SimulatedFSDataset impleme throw new ReplicaAlreadyExistsException("Block " + b + " is being written, and cannot be written to."); } - BInfo binfo = new BInfo(b, true); - blockMap.put(binfo.theBlock, binfo); + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); + map.put(binfo.theBlock, binfo); return binfo; } - @Override - public synchronized InputStream getBlockInputStream(Block b) - throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized InputStream getBlockInputStream(ExtendedBlock b) + throws IOException { + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } - //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len); return binfo.getIStream(); } - @Override - public synchronized InputStream getBlockInputStream(Block b, long seekOffset) - throws IOException { + @Override // FSDatasetInterface + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { InputStream result = getBlockInputStream(b); result.skip(seekOffset); return result; } /** Not supported */ - @Override - public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff - ) throws IOException { + @Override // FSDatasetInterface + public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + long ckoff) throws IOException { throw new IOException("Not supported"); } @@ -591,9 +702,10 @@ public class SimulatedFSDataset impleme * @throws IOException - block does not exist or problems accessing * the meta file */ - private synchronized InputStream getMetaDataInStream(Block b) + private synchronized InputStream getMetaDataInStream(ExtendedBlock b) throws IOException { - BInfo binfo = blockMap.get(b); + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -604,9 +716,11 @@ public class SimulatedFSDataset impleme return binfo.getMetaIStream(); } - @Override - public synchronized long getMetaDataLength(Block b) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized long getMetaDataLength(ExtendedBlock b) + throws IOException { + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -617,16 +731,15 @@ public class SimulatedFSDataset impleme return binfo.getMetaIStream().getLength(); } - @Override - public MetaDataInputStream getMetaDataInputStream(Block b) - throws IOException { - - return new MetaDataInputStream(getMetaDataInStream(b), - getMetaDataLength(b)); + @Override // FSDatasetInterface + public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) + throws IOException { + return new MetaDataInputStream(getMetaDataInStream(b), + getMetaDataLength(b)); } - @Override - public synchronized boolean metaFileExists(Block b) throws IOException { + @Override // FSDatasetInterface + public synchronized boolean metaFileExists(ExtendedBlock b) throws IOException { if (!isValidBlock(b)) { throw new IOException("Block " + b + " is valid, and cannot be written to."); @@ -638,8 +751,8 @@ public class SimulatedFSDataset impleme // nothing to check for simulated data set } - @Override - public synchronized void adjustCrcChannelPosition(Block b, + @Override // FSDatasetInterface + public synchronized void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream, int checksumSize) throws IOException { @@ -812,8 +925,9 @@ public class SimulatedFSDataset impleme @Override public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) throws IOException { - Block b = rBlock.getBlock(); - BInfo binfo = blockMap.get(b); + ExtendedBlock b = rBlock.getBlock(); + final Map map = getMap(b.getBlockPoolId()); + BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -824,22 +938,44 @@ public class SimulatedFSDataset impleme } @Override // FSDatasetInterface - public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock, + public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newlength) throws IOException { return new FinalizedReplica( oldBlock.getBlockId(), newlength, recoveryId, null, null); } - @Override - public long getReplicaVisibleLength(Block block) throws IOException { + @Override // FSDatasetInterface + public long getReplicaVisibleLength(ExtendedBlock block) throws IOException { return block.getNumBytes(); } + @Override // FSDatasetInterface + public void addBlockPool(String bpid, Configuration conf) { + Map map = new HashMap(); + blockMap.put(bpid, map); + storage.addBlockPool(bpid); + } + + @Override // FSDatasetInterface + public void shutdownBlockPool(String bpid) { + blockMap.remove(bpid); + storage.removeBlockPool(bpid); + } + + @Override // FSDatasetInterface + public void deleteBlockPool(String bpid, boolean force) { + return; + } + @Override - public ReplicaInPipelineInterface convertTemporaryToRbw(Block temporary) + public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary) throws IOException { - final BInfo r = blockMap.get(temporary); + final Map map = blockMap.get(temporary.getBlockPoolId()); + if (map == null) { + throw new IOException("Block pool not found, temporary=" + temporary); + } + final BInfo r = map.get(temporary.getLocalBlock()); if (r == null) { throw new IOException("Block not found, temporary=" + temporary); } else if (r.isFinalized()) { Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Fri Apr 29 18:16:32 2011 @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.REPLACE_BLOCK; import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*; import java.io.DataInputStream; @@ -41,9 +40,9 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -51,7 +50,6 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; /** * This class tests if block replacement request to data nodes work correctly. @@ -120,7 +118,7 @@ public class TestBlockReplacement extend LocatedBlock block = locatedBlocks.get(0); DatanodeInfo[] oldNodes = block.getLocations(); assertEquals(oldNodes.length, 3); - Block b = block.getBlock(); + ExtendedBlock b = block.getBlock(); // add a new datanode to the cluster cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS); @@ -161,11 +159,11 @@ public class TestBlockReplacement extend // start to replace the block // case 1: proxySource does not contain the block LOG.info("Testcase 1: Proxy " + newNode.getName() - + " does not contain the block " + b.getBlockName() ); + + " does not contain the block " + b); assertFalse(replaceBlock(b, source, newNode, proxies.get(0))); // case 2: destination contains the block LOG.info("Testcase 2: Destination " + proxies.get(1).getName() - + " contains the block " + b.getBlockName() ); + + " contains the block " + b); assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1))); // case 3: correct case LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" + @@ -224,7 +222,7 @@ public class TestBlockReplacement extend * * Return true if a block is successfully copied; otherwise false. */ - private boolean replaceBlock( Block block, DatanodeInfo source, + private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( @@ -232,13 +230,8 @@ public class TestBlockReplacement extend sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - REPLACE_BLOCK.write(out); - out.writeLong(block.getBlockId()); - out.writeLong(block.getGenerationStamp()); - Text.writeString(out, source.getStorageID()); - sourceProxy.write(out); - BlockTokenSecretManager.DUMMY_TOKEN.write(out); + DataTransferProtocol.Sender.opReplaceBlock(out, block, source + .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Fri Apr 29 18:16:32 2011 @@ -28,12 +28,14 @@ import org.apache.hadoop.hdfs.Distribute import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; @@ -67,6 +69,7 @@ public class TestBlockReport { static final int BLOCK_SIZE = 1024; static final int NUM_BLOCKS = 10; static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1; + static String bpid; private MiniDFSCluster cluster; private DistributedFileSystem fs; @@ -85,6 +88,7 @@ public class TestBlockReport { REPL_FACTOR = 1; //Reset if case a test has modified the value cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build(); fs = (DistributedFileSystem) cluster.getFileSystem(); + bpid = cluster.getNamesystem().getBlockPoolId(); } @After @@ -130,8 +134,11 @@ public class TestBlockReport { b.getNumBytes()); } } - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N0).dnRegistration, + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N0); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + cluster.getNameNode().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); List blocksAfterReport = @@ -143,7 +150,7 @@ public class TestBlockReport { } for (int i = 0; i < blocksAfterReport.size(); i++) { - Block b = blocksAfterReport.get(i).getBlock(); + ExtendedBlock b = blocksAfterReport.get(i).getBlock(); assertEquals("Length of " + i + "th block is incorrect", oldLengths[i], b.getNumBytes()); } @@ -171,7 +178,7 @@ public class TestBlockReport { File dataDir = new File(cluster.getDataDirectory()); assertTrue(dataDir.isDirectory()); - List blocks2Remove = new ArrayList(); + List blocks2Remove = new ArrayList(); List removedIndex = new ArrayList(); List lBlocks = cluster.getNameNode().getBlockLocations( filePath.toString(), FILE_START, @@ -192,7 +199,7 @@ public class TestBlockReport { LOG.debug("Number of blocks allocated " + lBlocks.size()); } - for (Block b : blocks2Remove) { + for (ExtendedBlock b : blocks2Remove) { if(LOG.isDebugEnabled()) { LOG.debug("Removing the block " + b.getBlockName()); } @@ -206,8 +213,11 @@ public class TestBlockReport { waitTil(DN_RESCAN_EXTRA_WAIT); - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N0).dnRegistration, + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N0); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + cluster.getNameNode().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); cluster.getNamesystem().computeDatanodeWork(); @@ -241,9 +251,12 @@ public class TestBlockReport { blocks.get(0).setGenerationStamp(rand.nextLong()); // This new block is unknown to NN and will be mark for deletion. blocks.add(new Block()); - DatanodeCommand dnCmd = - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N0).dnRegistration, + + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N0); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + DatanodeCommand dnCmd = cluster.getNameNode().blockReport(dnR, poolId, new BlockListAsLongs(blocks, null).getBlockListAsLongs()); if(LOG.isDebugEnabled()) { LOG.debug("Got the command: " + dnCmd); @@ -291,9 +304,12 @@ public class TestBlockReport { ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); startDNandWait(filePath, true); - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N1).dnRegistration, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N1); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + cluster.getNameNode().blockReport(dnR, poolId, + new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); @@ -327,8 +343,7 @@ public class TestBlockReport { int randIndex = rand.nextInt(blocks.size()); // Get a block and screw its GS Block corruptedBlock = blocks.get(randIndex); - String secondNode = cluster.getDataNodes().get(DN_N1). - getDatanodeRegistration().getStorageID(); + String secondNode = cluster.getDataNodes().get(DN_N1).getStorageId(); if(LOG.isDebugEnabled()) { LOG.debug("Working with " + secondNode); LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp()); @@ -338,9 +353,12 @@ public class TestBlockReport { LOG.debug("BlockGS after " + blocks.get(randIndex).getGenerationStamp()); LOG.debug("Done corrupting GS of " + corruptedBlock.getBlockName()); } - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N1).dnRegistration, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N1); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + cluster.getNameNode().blockReport(dnR, poolId, + new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of Corrupted blocks", 1, cluster.getNamesystem().getCorruptReplicaBlocks() + @@ -360,9 +378,9 @@ public class TestBlockReport { if(LOG.isDebugEnabled()) { LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName()); } - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N1).dnRegistration, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + + cluster.getNameNode().blockReport(dnR, poolId, + new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of Corrupted blocks", @@ -406,10 +424,13 @@ public class TestBlockReport { bc.start(); waitForTempReplica(bl, DN_N1); - - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N1).dnRegistration, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N1); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + cluster.getNameNode().blockReport(dnR, poolId, + new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); @@ -450,9 +471,12 @@ public class TestBlockReport { waitForTempReplica(bl, DN_N1); - cluster.getNameNode().blockReport( - cluster.getDataNodes().get(DN_N1).dnRegistration, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + // all blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(DN_N1); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + cluster.getNameNode().blockReport(dnR, poolId, + new BlockListAsLongs(blocks, null).getBlockListAsLongs()); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks()); @@ -465,7 +489,7 @@ public class TestBlockReport { } } - private void waitForTempReplica(Block bl, int DN_N1) { + private void waitForTempReplica(Block bl, int DN_N1) throws IOException { final boolean tooLongWait = false; final int TIMEOUT = 40000; @@ -478,16 +502,18 @@ public class TestBlockReport { if(LOG.isDebugEnabled()) { LOG.debug("Total number of DNs " + cluster.getDataNodes().size()); } + cluster.waitActive(); + // Look about specified DN for the replica of the block from 1st DN - Replica r; - r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). - fetchReplicaInfo(bl.getBlockId()); + String bpid = cluster.getNamesystem().getBlockPoolId(); + Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). + fetchReplicaInfo(bpid, bl.getBlockId()); long start = System.currentTimeMillis(); int count = 0; while (r == null) { waitTil(5); r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). - fetchReplicaInfo(bl.getBlockId()); + fetchReplicaInfo(bpid, bl.getBlockId()); long waiting_period = System.currentTimeMillis() - start; if (count++ % 100 == 0) if(LOG.isDebugEnabled()) { @@ -548,8 +574,8 @@ public class TestBlockReport { if(LOG.isDebugEnabled()) { LOG.debug("New datanode " - + cluster.getDataNodes().get(datanodes.size() - 1) - .getDatanodeRegistration() + " has been started"); + + cluster.getDataNodes().get(datanodes.size() - 1).getMachineName() + + " has been started"); } if (waitReplicas) DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR); } @@ -593,7 +619,7 @@ public class TestBlockReport { } continue; } - newList.add(new Block(locatedBlks.get(i).getBlock())); + newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock())); } return newList; } @@ -685,7 +711,8 @@ public class TestBlockReport { // Get block from the first DN ret = cluster.getDataNodes().get(DN_N0). - data.getStoredBlock(lb.getBlock().getBlockId()); + data.getStoredBlock(lb.getBlock() + .getBlockPoolId(), lb.getBlock().getBlockId()); return ret; } Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java Fri Apr 29 18:16:32 2011 @@ -44,6 +44,9 @@ public class TestDataNodeMXBean { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxbeanName = new ObjectName("HadoopInfo:type=DataNodeInfo"); + // get attribute "ClusterId" + String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId"); + Assert.assertEquals(datanode.getClusterId(), clusterId); // get attribute "Version" String version = (String)mbs.getAttribute(mxbeanName, "Version"); Assert.assertEquals(datanode.getVersion(),version); @@ -53,10 +56,10 @@ public class TestDataNodeMXBean { // get attribute "HttpPort" String httpPort = (String)mbs.getAttribute(mxbeanName, "HttpPort"); Assert.assertEquals(datanode.getHttpPort(),httpPort); - // get attribute "NamenodeAddress" - String namenodeAddress = (String)mbs.getAttribute(mxbeanName, - "NamenodeAddress"); - Assert.assertEquals(datanode.getNamenodeAddress(),namenodeAddress); + // get attribute "NamenodeAddresses" + String namenodeAddresses = (String)mbs.getAttribute(mxbeanName, + "NamenodeAddresses"); + Assert.assertEquals(datanode.getNamenodeAddresses(),namenodeAddresses); // get attribute "getVolumeInfo" String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo"); Assert.assertEquals(replaceDigits(datanode.getVolumeInfo()), Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Apr 29 18:16:32 2011 @@ -33,12 +33,13 @@ import org.apache.hadoop.hdfs.BlockReade import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.net.NetUtils; @@ -119,7 +120,8 @@ public class TestDataNodeVolumeFailure { // fail the volume // delete/make non-writable one of the directories (failed volume) data_fail = new File(dataDir, "data3"); - failedDir = new File(data_fail, MiniDFSCluster.FINALIZED_DIR_NAME); + failedDir = MiniDFSCluster.getFinalizedDir(dataDir, + cluster.getNamesystem().getBlockPoolId()); if (failedDir.exists() && //!FileUtil.fullyDelete(failedDir) !deteteBlocks(failedDir) @@ -137,8 +139,10 @@ public class TestDataNodeVolumeFailure { // make sure a block report is sent DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3 - long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs(); - cluster.getNameNode().blockReport(dn.dnRegistration, bReport); + String bpid = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); + long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs(); + cluster.getNameNode().blockReport(dnR, bpid, bReport); // verify number of blocks and files... verify(filename, filesize); @@ -216,7 +220,7 @@ public class TestDataNodeVolumeFailure { for (LocatedBlock lb : locatedBlocks) { DatanodeInfo dinfo = lb.getLocations()[1]; - Block b = lb.getBlock(); + ExtendedBlock b = lb.getBlock(); try { accessBlock(dinfo, lb); } catch (IOException e) { @@ -254,8 +258,7 @@ public class TestDataNodeVolumeFailure { throws IOException { InetSocketAddress targetAddr = null; Socket s = null; - BlockReader blockReader = null; - Block block = lblock.getBlock(); + ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getName()); @@ -263,8 +266,10 @@ public class TestDataNodeVolumeFailure { s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, block.getBlockId()); - blockReader = + String file = BlockReader.getFileName(targetAddr, + "test-blockpoolid", + block.getBlockId()); + BlockReader blockReader = BlockReader.newBlockReader(s, file, block, lblock .getBlockToken(), 0, -1, 4096); @@ -314,9 +319,11 @@ public class TestDataNodeVolumeFailure { */ private int countRealBlocks(Map map) { int total = 0; + final String bpid = cluster.getNamesystem().getBlockPoolId(); for(int i=0; i replicasItor = - ((FSDataset)dn.data).volumeMap.replicas().iterator(); + ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator(); ReplicaInfo replica = replicasItor.next(); createUnlinkTmpFile(replica, true, true); // rename block file createUnlinkTmpFile(replica, false, true); // rename meta file @@ -165,7 +167,7 @@ public class TestDatanodeRestart { // check volumeMap: 4 finalized replica Collection replicas = - ((FSDataset)(dn.data)).volumeMap.replicas(); + ((FSDataset)(dn.data)).volumeMap.replicas(bpid); Assert.assertEquals(4, replicas.size()); replicasItor = replicas.iterator(); while (replicasItor.hasNext()) { Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Fri Apr 29 18:16:32 2011 @@ -21,6 +21,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; +import java.util.LinkedList; +import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; @@ -48,6 +50,7 @@ public class TestDirectoryScanner extend private static final int DEFAULT_GEN_STAMP = 9999; private MiniDFSCluster cluster; + private String bpid; private FSDataset fds = null; private DirectoryScanner scanner = null; private Random rand = new Random(); @@ -69,7 +72,7 @@ public class TestDirectoryScanner extend /** Truncate a block file */ private long truncateBlockFile() throws IOException { synchronized (fds) { - for (ReplicaInfo b : fds.volumeMap.replicas()) { + for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); // Truncate a block file that has a corresponding metadata file @@ -88,7 +91,7 @@ public class TestDirectoryScanner extend /** Delete a block file */ private long deleteBlockFile() { synchronized(fds) { - for (ReplicaInfo b : fds.volumeMap.replicas()) { + for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); // Delete a block file that has corresponding metadata file @@ -104,7 +107,7 @@ public class TestDirectoryScanner extend /** Delete block meta file */ private long deleteMetaFile() { synchronized(fds) { - for (ReplicaInfo b : fds.volumeMap.replicas()) { + for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) { File file = b.getMetaFile(); // Delete a metadata file if (file.exists() && file.delete()) { @@ -121,7 +124,7 @@ public class TestDirectoryScanner extend long id = rand.nextLong(); while (true) { id = rand.nextLong(); - if (fds.fetchReplicaInfo(id) == null) { + if (fds.fetchReplicaInfo(bpid, id) == null) { break; } } @@ -139,10 +142,11 @@ public class TestDirectoryScanner extend /** Create a block file in a random volume*/ private long createBlockFile() throws IOException { - FSVolume[] volumes = fds.volumes.volumes; - int index = rand.nextInt(volumes.length - 1); + List volumes = fds.volumes.getVolumes(); + int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); - File file = new File(volumes[index].getDir().getPath(), getBlockFile(id)); + File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir(); + File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); } @@ -151,10 +155,11 @@ public class TestDirectoryScanner extend /** Create a metafile in a random volume*/ private long createMetaFile() throws IOException { - FSVolume[] volumes = fds.volumes.volumes; - int index = rand.nextInt(volumes.length - 1); + List volumes = fds.volumes.getVolumes(); + int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); - File file = new File(volumes[index].getDir().getPath(), getMetaFile(id)); + File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir(); + File file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } @@ -163,10 +168,11 @@ public class TestDirectoryScanner extend /** Create block file and corresponding metafile in a rondom volume */ private long createBlockMetaFile() throws IOException { - FSVolume[] volumes = fds.volumes.volumes; - int index = rand.nextInt(volumes.length - 1); + List volumes = fds.volumes.getVolumes(); + int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); - File file = new File(volumes[index].getDir().getPath(), getBlockFile(id)); + File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir(); + File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); @@ -185,7 +191,7 @@ public class TestDirectoryScanner extend LOG.info("Created extraneous file " + name2); } - file = new File(volumes[index].getDir().getPath(), getMetaFile(id)); + file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } @@ -196,12 +202,18 @@ public class TestDirectoryScanner extend private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks) { scanner.reconcile(); - assertEquals(totalBlocks, scanner.totalBlocks); - assertEquals(diffsize, scanner.diff.size()); - assertEquals(missingMetaFile, scanner.missingMetaFile); - assertEquals(missingBlockFile, scanner.missingBlockFile); - assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks); - assertEquals(mismatchBlocks, scanner.mismatchBlocks); + + assertTrue(scanner.diffs.containsKey(bpid)); + LinkedList diff = scanner.diffs.get(bpid); + assertTrue(scanner.stats.containsKey(bpid)); + DirectoryScanner.Stats stats = scanner.stats.get(bpid); + + assertEquals(diffsize, diff.size()); + assertEquals(totalBlocks, stats.totalBlocks); + assertEquals(missingMetaFile, stats.missingMetaFile); + assertEquals(missingBlockFile, stats.missingBlockFile); + assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks); + assertEquals(mismatchBlocks, stats.mismatchBlocks); } public void testDirectoryScanner() throws Exception { @@ -215,10 +227,12 @@ public class TestDirectoryScanner extend cluster = new MiniDFSCluster.Builder(CONF).build(); try { cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset(); CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, parallelism); scanner = new DirectoryScanner(fds, CONF); + scanner.setRetainDiffs(true); // Add files with 100 blocks createFile("/tmp/t1", 10000); @@ -318,19 +332,26 @@ public class TestDirectoryScanner extend truncateBlockFile(); scan(totalBlocks+3, 6, 2, 2, 3, 2); scan(totalBlocks+1, 0, 0, 0, 0, 0); + + // Test14: validate clean shutdown of DirectoryScanner + ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + } finally { + scanner.shutdown(); cluster.shutdown(); } } private void verifyAddition(long blockId, long genStamp, long size) { final ReplicaInfo replicainfo; - replicainfo = fds.fetchReplicaInfo(blockId); + replicainfo = fds.fetchReplicaInfo(bpid, blockId); assertNotNull(replicainfo); // Added block has the same file as the one created by the test File file = new File(getBlockFile(blockId)); - assertEquals(file.getName(), fds.findBlockFile(blockId).getName()); + assertEquals(file.getName(), fds.findBlockFile(bpid, blockId).getName()); // Generation stamp is same as that of created file assertEquals(genStamp, replicainfo.getGenerationStamp()); @@ -341,12 +362,12 @@ public class TestDirectoryScanner extend private void verifyDeletion(long blockId) { // Ensure block does not exist in memory - assertNull(fds.fetchReplicaInfo(blockId)); + assertNull(fds.fetchReplicaInfo(bpid, blockId)); } private void verifyGenStamp(long blockId, long genStamp) { final ReplicaInfo memBlock; - memBlock = fds.fetchReplicaInfo(blockId); + memBlock = fds.fetchReplicaInfo(bpid, blockId); assertNotNull(memBlock); assertEquals(genStamp, memBlock.getGenerationStamp()); } Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Fri Apr 29 18:16:32 2011 @@ -51,7 +51,6 @@ public class TestDiskError { private FileSystem fs; private MiniDFSCluster cluster; private Configuration conf; - private String dataDir; @Before public void setUp() throws Exception { @@ -60,7 +59,6 @@ public class TestDiskError { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); fs = cluster.getFileSystem(); - dataDir = cluster.getDataDirectory(); } @After @@ -86,8 +84,11 @@ public class TestDiskError { cluster.startDataNodes(conf, 2, true, null, null); cluster.waitActive(); final int dnIndex = 0; - File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "current/rbw"); - File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "current/rbw"); + String bpid = cluster.getNamesystem().getBlockPoolId(); + File storageDir = MiniDFSCluster.getStorageDir(dnIndex, 0); + File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid); + storageDir = MiniDFSCluster.getStorageDir(dnIndex, 1); + File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid); try { // make the data directory of the first datanode to be readonly assertTrue("Couldn't chmod local vol", dir1.setReadOnly()); @@ -95,7 +96,7 @@ public class TestDiskError { // create files and make sure that first datanode will be down DataNode dn = cluster.getDataNodes().get(dnIndex); - for (int i=0; DataNode.isDatanodeUp(dn); i++) { + for (int i=0; dn.isDatanodeUp(); i++) { Path fileName = new Path("/test.txt"+i); DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L); DFSTestUtil.waitReplication(fs, fileName, (short)2); @@ -152,9 +153,11 @@ public class TestDiskError { out.close(); // the temporary block & meta files should be deleted - String dataDir = cluster.getDataDirectory(); - File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "current/rbw"); - File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "current/rbw"); + String bpid = cluster.getNamesystem().getBlockPoolId(); + File storageDir = MiniDFSCluster.getStorageDir(sndNode, 0); + File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid); + storageDir = MiniDFSCluster.getStorageDir(sndNode, 1); + File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid); while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) { Thread.sleep(100); } Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Fri Apr 29 18:16:32 2011 @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -46,8 +47,8 @@ import org.junit.Test; * This tests InterDataNodeProtocol for block handling. */ public class TestInterDatanodeProtocol { - public static void checkMetaInfo(Block b, DataNode dn) throws IOException { - Block metainfo = dn.data.getStoredBlock(b.getBlockId()); + public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException { + Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); Assert.assertEquals(b.getBlockId(), metainfo.getBlockId()); Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes()); } @@ -97,10 +98,12 @@ public class TestInterDatanodeProtocol { assertTrue(datanode != null); //stop block scanner, so we could compare lastScanTime - datanode.blockScannerThread.interrupt(); + if (datanode.blockScanner != null) { + datanode.blockScanner.shutdown(); + } //verify BlockMetaDataInfo - Block b = locatedblock.getBlock(); + ExtendedBlock b = locatedblock.getBlock(); InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass()); checkMetaInfo(b, datanode); long recoveryId = b.getGenerationStamp() + 1; @@ -108,7 +111,7 @@ public class TestInterDatanodeProtocol { new RecoveringBlock(b, locatedblock.getLocations(), recoveryId)); //verify updateBlock - Block newblock = new Block( + ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1); idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes()); checkMetaInfo(newblock, datanode); @@ -129,44 +132,47 @@ public class TestInterDatanodeProtocol { Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState()); } - /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */ + /** Test + * {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)} + */ @Test public void testInitReplicaRecovery() throws IOException { final long firstblockid = 10000L; final long gs = 7777L; final long length = 22L; - final ReplicasMap map = new ReplicasMap(); + final ReplicasMap map = new ReplicasMap(this); + String bpid = "BP-TEST"; final Block[] blocks = new Block[5]; for(int i = 0; i < blocks.length; i++) { blocks[i] = new Block(firstblockid + i, length, gs); - map.add(createReplicaInfo(blocks[i])); + map.add(bpid, createReplicaInfo(blocks[i])); } { //normal case final Block b = blocks[0]; - final ReplicaInfo originalInfo = map.get(b); + final ReplicaInfo originalInfo = map.get(bpid, b); final long recoveryid = gs + 1; - final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid); + final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid); assertEquals(originalInfo, recoveryInfo); - final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b); + final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b); Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId()); Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID()); //recover one more time final long recoveryid2 = gs + 2; - final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2); + final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2); assertEquals(originalInfo, recoveryInfo2); - final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b); + final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b); Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId()); Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID()); //case RecoveryInProgressException try { - FSDataset.initReplicaRecovery(map, b, recoveryid); + FSDataset.initReplicaRecovery(bpid, map, b, recoveryid); Assert.fail(); } catch(RecoveryInProgressException ripe) { @@ -177,7 +183,7 @@ public class TestInterDatanodeProtocol { { // BlockRecoveryFI_01: replica not found final long recoveryid = gs + 1; final Block b = new Block(firstblockid - 1, length, gs); - ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(map, b, recoveryid); + ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid); Assert.assertNull("Data-node should not have this replica.", r); } @@ -185,7 +191,7 @@ public class TestInterDatanodeProtocol { final long recoveryid = gs - 1; final Block b = new Block(firstblockid + 1, length, gs); try { - FSDataset.initReplicaRecovery(map, b, recoveryid); + FSDataset.initReplicaRecovery(bpid, map, b, recoveryid); Assert.fail(); } catch(IOException ioe) { @@ -198,7 +204,7 @@ public class TestInterDatanodeProtocol { final long recoveryid = gs + 1; final Block b = new Block(firstblockid, length, gs+1); try { - FSDataset.initReplicaRecovery(map, b, recoveryid); + FSDataset.initReplicaRecovery(bpid, map, b, recoveryid); fail("InitReplicaRecovery should fail because replica's " + "gs is less than the block's gs"); } catch (IOException e) { @@ -208,7 +214,10 @@ public class TestInterDatanodeProtocol { } } - /** Test {@link FSDataset#updateReplicaUnderRecovery(ReplicaUnderRecovery, long, long)} */ + /** + * Test for + * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)} + * */ @Test public void testUpdateReplicaUnderRecovery() throws IOException { final Configuration conf = new HdfsConfiguration(); @@ -217,6 +226,7 @@ public class TestInterDatanodeProtocol { try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); + String bpid = cluster.getNamesystem().getBlockPoolId(); //create a file DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); @@ -237,14 +247,14 @@ public class TestInterDatanodeProtocol { final FSDataset fsdataset = (FSDataset)datanode.data; //initReplicaRecovery - final Block b = locatedblock.getBlock(); + final ExtendedBlock b = locatedblock.getBlock(); final long recoveryid = b.getGenerationStamp() + 1; final long newlength = b.getNumBytes() - 1; final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery( new RecoveringBlock(b, null, recoveryid)); //check replica - final ReplicaInfo replica = fsdataset.fetchReplicaInfo(b.getBlockId()); + final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId()); Assert.assertEquals(ReplicaState.RUR, replica.getState()); //check meta data before update @@ -254,8 +264,8 @@ public class TestInterDatanodeProtocol { //with (block length) != (stored replica's on disk length). { //create a block with same id and gs but different length. - final Block tmp = new Block(rri.getBlockId(), rri.getNumBytes() - 1, - rri.getGenerationStamp()); + final ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri + .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp()); try { //update should fail fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength); @@ -267,7 +277,7 @@ public class TestInterDatanodeProtocol { //update final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery( - rri, recoveryid, newlength); + new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength); //check meta data after update FSDataset.checkReplicaFiles(finalized); Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java Fri Apr 29 18:16:32 2011 @@ -26,12 +26,13 @@ import org.junit.Test; * Unit test for ReplicasMap class */ public class TestReplicasMap { - private static final ReplicasMap map = new ReplicasMap(); + private static final ReplicasMap map = new ReplicasMap(TestReplicasMap.class); + private static final String bpid = "BP-TEST"; private static final Block block = new Block(1234, 1234, 1234); @BeforeClass public static void setup() { - map.add(new FinalizedReplica(block, null, null)); + map.add(bpid, new FinalizedReplica(block, null, null)); } /** @@ -41,35 +42,35 @@ public class TestReplicasMap { public void testGet() { // Test 1: null argument throws invalid argument exception try { - map.get(null); + map.get(bpid, null); fail("Expected exception not thrown"); } catch (IllegalArgumentException expected) { } // Test 2: successful lookup based on block - assertNotNull(map.get(block)); + assertNotNull(map.get(bpid, block)); // Test 3: Lookup failure - generation stamp mismatch Block b = new Block(block); b.setGenerationStamp(0); - assertNull(map.get(b)); + assertNull(map.get(bpid, b)); // Test 4: Lookup failure - blockID mismatch b.setGenerationStamp(block.getGenerationStamp()); b.setBlockId(0); - assertNull(map.get(b)); + assertNull(map.get(bpid, b)); // Test 5: successful lookup based on block ID - assertNotNull(map.get(block.getBlockId())); + assertNotNull(map.get(bpid, block.getBlockId())); // Test 6: failed lookup for invalid block ID - assertNull(map.get(0)); + assertNull(map.get(bpid, 0)); } @Test public void testAdd() { // Test 1: null argument throws invalid argument exception try { - map.add(null); + map.add(bpid, null); fail("Expected exception not thrown"); } catch (IllegalArgumentException expected) { } } @@ -78,28 +79,28 @@ public class TestReplicasMap { public void testRemove() { // Test 1: null argument throws invalid argument exception try { - map.remove(null); + map.remove(bpid, null); fail("Expected exception not thrown"); } catch (IllegalArgumentException expected) { } // Test 2: remove failure - generation stamp mismatch Block b = new Block(block); b.setGenerationStamp(0); - assertNull(map.remove(b)); + assertNull(map.remove(bpid, b)); // Test 3: remove failure - blockID mismatch b.setGenerationStamp(block.getGenerationStamp()); b.setBlockId(0); - assertNull(map.remove(b)); + assertNull(map.remove(bpid, b)); // Test 4: remove success - assertNotNull(map.remove(block)); + assertNotNull(map.remove(bpid, block)); // Test 5: remove failure - invalid blockID - assertNull(map.remove(0)); + assertNull(map.remove(bpid, 0)); // Test 6: remove success - map.add(new FinalizedReplica(block, null, null)); - assertNotNull(map.remove(block.getBlockId())); + map.add(bpid, new FinalizedReplica(block, null, null)); + assertNotNull(map.remove(bpid, block.getBlockId())); } } Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java?rev=1097905&r1=1097904&r2=1097905&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java Fri Apr 29 18:16:32 2011 @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import junit.framework.Assert; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.util.ReflectionUtils; @@ -29,28 +32,28 @@ public class TestRoundRobinVolumesPolicy // Test the Round-Robin block-volume choosing algorithm. @Test public void testRR() throws Exception { - FSVolume[] volumes = new FSVolume[2]; + final List volumes = new ArrayList(); // First volume, with 100 bytes of space. - volumes[0] = Mockito.mock(FSVolume.class); - Mockito.when(volumes[0].getAvailable()).thenReturn(100L); + volumes.add(Mockito.mock(FSVolume.class)); + Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L); // Second volume, with 200 bytes of space. - volumes[1] = Mockito.mock(FSVolume.class); - Mockito.when(volumes[1].getAvailable()).thenReturn(200L); + volumes.add(Mockito.mock(FSVolume.class)); + Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance( RoundRobinVolumesPolicy.class, null); // Test two rounds of round-robin choosing - Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes[0], policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); // The first volume has only 100L space, so the policy should // wisely choose the second one in case we ask for more. - Assert.assertEquals(volumes[1], policy.chooseVolume(volumes, 150)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150)); // Fail if no volume can be chosen? try {