From hdfs-commits-return-1116-apmail-hadoop-hdfs-commits-archive=hadoop.apache.org@hadoop.apache.org Fri Feb 25 22:43:12 2011 Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 87050 invoked from network); 25 Feb 2011 22:43:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 25 Feb 2011 22:43:12 -0000 Received: (qmail 60436 invoked by uid 500); 25 Feb 2011 22:43:12 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 60411 invoked by uid 500); 25 Feb 2011 22:43:12 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 60403 invoked by uid 99); 25 Feb 2011 22:43:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Feb 2011 22:43:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Feb 2011 22:43:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 751E123889E3; Fri, 25 Feb 2011 22:42:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1074727 [1/2] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/security/token/block/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/... Date: Fri, 25 Feb 2011 22:42:40 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110225224241.751E123889E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Fri Feb 25 22:42:39 2011 New Revision: 1074727 URL: http://svn.apache.org/viewvc?rev=1074727&view=rev Log: HDFS-1639. Add block pool management to FSDataset. Contributed by Suresh Srinivas. Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original) +++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Fri Feb 25 22:42:39 2011 @@ -39,6 +39,8 @@ Trunk (unreleased changes) HDFS-1647. Federation: Multiple namenode configuration. (jitendra) + HDFS-1639. Add block pool management to FSDataset. (suresh) + IMPROVEMENTS HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel) Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Fri Feb 25 22:42:39 2011 @@ -49,16 +49,6 @@ public class ExtendedBlock implements Wr this(null, 0, 0, 0); } - // TODO:FEDERATION To remove when block pool ID related coding is complete - public ExtendedBlock(final Block b) { - this("TODO", b); - } - - // TODO:FEDERATION To remove when block pool ID related coding is complete - public ExtendedBlock(final long blkId) { - this("TODO", new Block(blkId)); - } - public ExtendedBlock(final ExtendedBlock b) { this(b.poolId, b.block); } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Feb 25 22:42:39 2011 @@ -55,9 +55,8 @@ public class LocatedBlock implements Wri this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false); } - // TODO:FEDERATION To remove when block pool ID related coding is complete - public LocatedBlock(Block b, DatanodeInfo[] locs) { - this(new ExtendedBlock(b), locs, -1, false); // startOffset is unknown + public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) { + this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown } public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Feb 25 22:42:39 2011 @@ -195,7 +195,7 @@ public class BlockTokenSecretManager ext */ public void checkAccess(Token blockToken, String userId, Block block, AccessMode mode) throws InvalidToken { - checkAccess(blockToken, userId, new ExtendedBlock(block), mode); + checkAccess(blockToken, userId, new ExtendedBlock("TODO", block), mode); } /** Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Feb 25 22:42:39 2011 @@ -379,7 +379,7 @@ public class Balancer implements Tool { } // TODO:FEDERATION use ExtendedBlock in BalancerBlock DataTransferProtocol.Sender.opReplaceBlock(out, - new ExtendedBlock(block.getBlock()), source.getStorageID(), + new ExtendedBlock("TODO", block.getBlock()), source.getStorageID(), proxySource.getDatanode(), accessToken); } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Feb 25 22:42:39 2011 @@ -94,7 +94,8 @@ class BlockSender implements java.io.Clo try { this.block = block; synchronized(datanode.data) { - this.replica = datanode.data.getReplica(block.getBlockId()); + this.replica = datanode.data.getReplica(block.getPoolId(), + block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Fri Feb 25 22:42:39 2011 @@ -213,8 +213,9 @@ class DataBlockScanner implements Runnab private void init() { + // TODO:FEDERATION block scanner must work one BP at a time // get the list of blocks and arrange them in random order - List arr = dataset.getFinalizedBlocks(); + List arr = dataset.getFinalizedBlocks("TODO"); Collections.shuffle(arr); blockInfoSet = new TreeSet(); @@ -231,6 +232,8 @@ class DataBlockScanner implements Runnab /* Pick the first directory that has any existing scanner log. * otherwise, pick the first directory. */ + // TODO:FEDERATION currently picking only one block pool directory + // This needs to change to include all the block pool directories File dir = null; FSDataset.FSVolume[] volumes = dataset.volumes.volumes; for(FSDataset.FSVolume vol : volumes) { @@ -462,7 +465,7 @@ class DataBlockScanner implements Runnab updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false); // If the block does not exists anymore, then its not an error - if ( dataset.getFile(block.getLocalBlock()) == null ) { + if ( dataset.getFile(block.getPoolId(), block.getLocalBlock()) == null ) { LOG.info("Verification failed for " + block + ". Its ok since " + "it not in datanode dataset anymore."); deleteBlock(block.getPoolId(), block.getLocalBlock()); @@ -506,7 +509,7 @@ class DataBlockScanner implements Runnab if ( block != null ) { // TODO:FEDERATION blockInfoSet should use ExtendedBlock - verifyBlock(new ExtendedBlock(block)); + verifyBlock(new ExtendedBlock("TODO", block)); } } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Feb 25 22:42:39 2011 @@ -581,7 +581,7 @@ public class DataNode extends Configured bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId)); } initFsDataSet(conf, dataDirs); - //data.addStorage(blockPoolId, storage); + data.addBlockPool(blockPoolId, conf); } /** @@ -681,9 +681,7 @@ public class DataNode extends Configured // and can be safely GC'ed. // long brStartTime = now(); - BlockListAsLongs bReport = data.getBlockReport(/* TODO:FEDERATION pass blockPoolId*/); - - // TODO:FEDERATION add support for pool ID + BlockListAsLongs bReport = data.getBlockReport(blockPoolId); cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport .getBlockListAsLongs()); long brTime = now() - brStartTime; Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Feb 25 22:42:39 2011 @@ -173,8 +173,9 @@ public class DirectoryScanner { void reconcile() { scan(); for (ScanInfo info : diff) { - dataset.checkAndUpdate(info.getBlockId(), info.getBlockFile(), info - .getMetaFile(), info.getVolume()); + // TODO:FEDERATION use right block pool Id + dataset.checkAndUpdate("TODO", info.getBlockId(), info.getBlockFile(), + info.getMetaFile(), info.getVolume()); } } @@ -188,7 +189,8 @@ public class DirectoryScanner { // Hold FSDataset lock to prevent further changes to the block map synchronized(dataset) { - Block[] memReport = dataset.getBlockList(false); + // TODO:FEDERATION use right block pool Id + Block[] memReport = dataset.getBlockList("TODO", false); Arrays.sort(memReport); // Sort based on blockId int d = 0; // index for diskReport Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Feb 25 22:42:39 2011 @@ -31,9 +31,13 @@ import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.Map.Entry; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -128,8 +132,8 @@ public class FSDataset implements FSCons boolean resetIdx) throws IOException { if (numBlocks < maxBlocksPerDir) { File dest = new File(dir, b.getBlockName()); - File metaData = getMetaFile( src, b ); - File newmeta = getMetaFile(dest, b); + File metaData = getMetaFile(src, b.getGenerationStamp()); + File newmeta = getMetaFile(dest, b.getGenerationStamp()); if ( ! metaData.renameTo( newmeta ) || ! src.renameTo( dest ) ) { throw new IOException( "could not move files for " + b + @@ -180,16 +184,16 @@ public class FSDataset implements FSCons return children[ lastChildIdx ].addBlock(b, src, true, false); } - void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) + void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume) throws IOException { if (children != null) { for (int i = 0; i < children.length; i++) { - children[i].getVolumeMap(volumeMap, volume); + children[i].getVolumeMap(bpid, volumeMap, volume); } } recoverTempUnlinkedBlock(); - volume.addToReplicasMap(volumeMap, dir, true); + volume.addToReplicasMap(bpid, volumeMap, dir, true); } /** @@ -298,18 +302,32 @@ public class FSDataset implements FSCons } } - class FSVolume { - private File currentDir; - private FSDir dataDir; // directory store Finalized replica - private File rbwDir; // directory store RBW replica - private File tmpDir; // directory store Temporary replica - private DF usage; - private DU dfsUsage; - private long reserved; - + /** + * Manages block pool directory under {@link FSVolume} + */ + class BlockPool { + private final String bpid; + private final FSVolume volume; // volume to which this BlockPool belongs to + private final File currentDir; // StorageDirectory/current/bpid/current + private final FSDir finalizedDir; // directory store Finalized replica + private final File rbwDir; // directory store RBW replica + private final File tmpDir; // directory store Temporary replica - FSVolume(File currentDir, Configuration conf) throws IOException { - this.reserved = conf.getLong("dfs.datanode.du.reserved", 0); + // TODO:FEDERATION scalability issue - a thread per DU is needed + private final DU dfsUsage; + + /** + * + * @param bpid Block pool Id + * @param volume {@link FSVolume} to which this BlockPool belongs to + * @param currentDir currentDir corresponding to the BlockPool + * @param conf + * @throws IOException + */ + BlockPool(String bpid, FSVolume volume, File currentDir, Configuration conf) + throws IOException { + this.bpid = bpid; + this.volume = volume; this.currentDir = currentDir; File parent = currentDir.getParentFile(); final File finalizedDir = new File( @@ -328,7 +346,7 @@ public class FSDataset implements FSCons if (rbwDir.exists() && !supportAppends) { FileUtil.fullyDelete(rbwDir); } - this.dataDir = new FSDir(finalizedDir); + this.finalizedDir = new FSDir(finalizedDir); if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + rbwDir.toString()); @@ -339,15 +357,26 @@ public class FSDataset implements FSCons throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } } - this.usage = new DF(parent, conf); this.dfsUsage = new DU(parent, conf); this.dfsUsage.start(); } + File getDirectory() { + return currentDir.getParentFile(); + } + File getCurrentDir() { return currentDir; } + File getFinalizedDir() { + return finalizedDir.dir; + } + + File getRbwDir() { + return rbwDir; + } + void decDfsUsed(long value) { // The caller to this method (BlockFileDeleteTask.run()) does // not have locked FSDataset.this yet. @@ -361,42 +390,12 @@ public class FSDataset implements FSCons } /** - * Calculate the capacity of the filesystem, after removing any - * reserved capacity. - * @return the unreserved number of bytes left in this filesystem. May be zero. - */ - long getCapacity() throws IOException { - long remaining = usage.getCapacity() - reserved; - return remaining > 0 ? remaining : 0; - } - - long getAvailable() throws IOException { - long remaining = getCapacity()-getDfsUsed(); - long available = usage.getAvailable(); - if (remaining>available) { - remaining = available; - } - return (remaining > 0) ? remaining : 0; - } - - long getReserved(){ - return reserved; - } - - String getMount() throws IOException { - return usage.getMount(); - } - - File getDir() { - return dataDir.dir; - } - - /** * Temporary files. They get moved to the finalized block directory when * the block is finalized. */ File createTmpFile(Block b) throws IOException { File f = new File(tmpDir, b.getBlockName()); + DataNode.LOG.info("SURESH creating temporary file " + f); return FSDataset.createTmpFile(b, f); } @@ -410,21 +409,21 @@ public class FSDataset implements FSCons } File addBlock(Block b, File f) throws IOException { - File blockFile = dataDir.addBlock(b, f); - File metaFile = getMetaFile( blockFile , b); + File blockFile = finalizedDir.addBlock(b, f); + File metaFile = getMetaFile(blockFile , b.getGenerationStamp()); dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); return blockFile; } void checkDirs() throws DiskErrorException { - dataDir.checkDirTree(); + finalizedDir.checkDirTree(); DiskChecker.checkDir(tmpDir); DiskChecker.checkDir(rbwDir); } void getVolumeMap(ReplicasMap volumeMap) throws IOException { // add finalized replicas - dataDir.getVolumeMap(volumeMap, this); + finalizedDir.getVolumeMap(bpid, volumeMap, volume); // add rbw replicas addToReplicasMap(volumeMap, rbwDir, false); } @@ -448,14 +447,14 @@ public class FSDataset implements FSCons ReplicaInfo newReplica = null; if (isFinalized) { newReplica = new FinalizedReplica(blockId, - blockFile.length(), genStamp, this, blockFile.getParentFile()); + blockFile.length(), genStamp, volume, blockFile.getParentFile()); } else { newReplica = new ReplicaWaitingToBeRecovered(blockId, validateIntegrity(blockFile, genStamp), - genStamp, this, blockFile.getParentFile()); + genStamp, volume, blockFile.getParentFile()); } - ReplicaInfo oldReplica = volumeMap.add(newReplica); + ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); if (oldReplica != null) { DataNode.LOG.warn("Two block files with the same block id exist " + "on disk: " + oldReplica.getBlockFile() + @@ -533,11 +532,182 @@ public class FSDataset implements FSCons } void clearPath(File f) { - dataDir.clearPath(f); + finalizedDir.clearPath(f); + } + + public String toString() { + return currentDir.getAbsolutePath(); + } + + public void shutdown() { + dfsUsage.shutdown(); + } + } + + class FSVolume { + private final Map map = new HashMap(); + private final File currentDir; // /current + private final DF usage; + private final long reserved; + + FSVolume(File currentDir, Configuration conf) throws IOException { + this.reserved = conf.getLong("dfs.datanode.du.reserved", 0); + this.currentDir = currentDir; + File parent = currentDir.getParentFile(); + this.usage = new DF(parent, conf); + } + + /** Return storage directory corresponding to the volume */ + public File getDir() { + return currentDir.getParentFile(); + } + + public File getCurrentDir() { + return currentDir; + } + + public File getRbwDir(String bpid) throws IOException { + BlockPool bp = getBlockPool(bpid); + return bp.getRbwDir(); + } + + void decDfsUsed(String bpid, long value) { + // The caller to this method (BlockFileDeleteTask.run()) does + // not have locked FSDataset.this yet. + synchronized(FSDataset.this) { + BlockPool bp = map.get(bpid); + if (bp != null) { + bp.decDfsUsed(value); + } + } + } + + long getDfsUsed() throws IOException { + // TODO valid synchronization + long dfsUsed = 0; + Set> set = map.entrySet(); + for (Entry entry : set) { + dfsUsed += entry.getValue().getDfsUsed(); + } + return dfsUsed; + } + + /** + * Calculate the capacity of the filesystem, after removing any + * reserved capacity. + * @return the unreserved number of bytes left in this filesystem. May be zero. + */ + long getCapacity() throws IOException { + long remaining = usage.getCapacity() - reserved; + return remaining > 0 ? remaining : 0; + } + + long getAvailable() throws IOException { + long remaining = getCapacity()-getDfsUsed(); + long available = usage.getAvailable(); + if (remaining>available) { + remaining = available; + } + return (remaining > 0) ? remaining : 0; + } + + long getReserved(){ + return reserved; + } + + String getMount() throws IOException { + return usage.getMount(); + } + + private BlockPool getBlockPool(String bpid) throws IOException { + BlockPool bp = map.get(bpid); + if (bp == null) { + // TODO:FEDERATION cleanup this exception + throw new IOException("block pool " + bpid + " not found"); + } + return bp; + } + + /** + * Temporary files. They get moved to the finalized block directory when + * the block is finalized. + */ + File createTmpFile(String bpid, Block b) throws IOException { + BlockPool bp = getBlockPool(bpid); + return bp.createTmpFile(b); + } + + /** + * RBW files. They get moved to the finalized block directory when + * the block is finalized. + */ + File createRbwFile(String bpid, Block b) throws IOException { + BlockPool bp = getBlockPool(bpid); + return bp.createRbwFile(b); + } + + File addBlock(String bpid, Block b, File f) throws IOException { + BlockPool bp = getBlockPool(bpid); + return bp.addBlock(b, f); + } + + void checkDirs() throws DiskErrorException { + // TODO:FEDERATION valid synchronization + Set> set = map.entrySet(); + for (Entry entry : set) { + entry.getValue().checkDirs(); + } + } + + void getVolumeMap(ReplicasMap volumeMap) throws IOException { + Set> set = map.entrySet(); + for (Entry entry : set) { + entry.getValue().getVolumeMap(volumeMap); + } + } + + void getVolumeMap(String bpid, ReplicasMap volumeMap) throws IOException { + BlockPool bp = getBlockPool(bpid); + bp.getVolumeMap(volumeMap); + } + + /** + * Add replicas under the given directory to the volume map + * @param volumeMap the replicas map + * @param dir an input directory + * @param isFinalized true if the directory has finalized replicas; + * false if the directory has rbw replicas + * @throws IOException + */ + private void addToReplicasMap(String bpid, ReplicasMap volumeMap, + File dir, boolean isFinalized) throws IOException { + BlockPool bp = getBlockPool(bpid); + // TODO move this up + // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); + bp.addToReplicasMap(volumeMap, dir, isFinalized); + } + + void clearPath(String bpid, File f) throws IOException { + BlockPool bp = getBlockPool(bpid); + bp.clearPath(f); } public String toString() { - return getDir().getAbsolutePath(); + return currentDir.getAbsolutePath(); + } + + public void shutdown() { + Set> set = map.entrySet(); + for (Entry entry : set) { + entry.getValue().shutdown(); + } + } + + public void addBlockPool(String bpid, Configuration conf) + throws IOException { + File bpdir = new File(currentDir, bpid); + BlockPool bp = new BlockPool(bpid, this, bpdir, conf); + map.put(bpid, bp); } } @@ -606,6 +776,13 @@ public class FSDataset implements FSCons volumes[idx].getVolumeMap(volumeMap); } } + + synchronized void getVolumeMap(String bpid, ReplicasMap volumeMap) + throws IOException { + for (int idx = 0; idx < volumes.length; idx++) { + volumes[idx].getVolumeMap(bpid, volumeMap); + } + } /** * goes over all the volumes and checkDir eachone of them @@ -667,6 +844,13 @@ public class FSDataset implements FSCons } return false; } + + public void addBlockPool(String bpid, Configuration conf) + throws IOException { + for (FSVolume v : volumes) { + v.addBlockPool(bpid, conf); + } + } } ////////////////////////////////////////////////////// @@ -699,12 +883,12 @@ public class FSDataset implements FSCons return blockFileName + "_" + genStamp + METADATA_EXTENSION; } - static File getMetaFile(File f , Block b) { - return new File(getMetaFileName(f.getAbsolutePath(), - b.getGenerationStamp())); + static File getMetaFile(File f , long genStamp) { + return new File(getMetaFileName(f.getAbsolutePath(), genStamp)); } - protected File getMetaFile(Block b) throws IOException { - return getMetaFile(getBlockFile(b), b); + + protected File getMetaFile(ExtendedBlock b) throws IOException { + return getMetaFile(getBlockFile(b), b.getGenerationStamp()); } /** Find the metadata file for the specified block file. @@ -763,15 +947,15 @@ public class FSDataset implements FSCons } /** Return the block file for the given ID */ - public File findBlockFile(long blockId) { - return getFile(blockId); + public File findBlockFile(String bpid, long blockId) { + return getFile(bpid, blockId); } @Override // FSDatasetInterface public synchronized Block getStoredBlock(String bpid, long blkid) throws IOException { // TODO:FEDERATION use extended block - File blockfile = findBlockFile(blkid); + File blockfile = findBlockFile(bpid, blkid); if (blockfile == null) { return null; } @@ -786,8 +970,8 @@ public class FSDataset implements FSCons * @param blockId * @return */ - synchronized ReplicaInfo fetchReplicaInfo(long blockId) { - ReplicaInfo r = volumeMap.get(blockId); + synchronized ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { + ReplicaInfo r = volumeMap.get(bpid, blockId); if(r == null) return null; switch(r.getState()) { @@ -808,19 +992,19 @@ public class FSDataset implements FSCons @Override // FSDatasetInterface public boolean metaFileExists(ExtendedBlock b) throws IOException { // TODO:FEDERATION use ExtendedBlock - return getMetaFile(b.getLocalBlock()).exists(); + return getMetaFile(b).exists(); } @Override // FSDatasetInterface public long getMetaDataLength(ExtendedBlock b) throws IOException { - File checksumFile = getMetaFile(b.getLocalBlock()); + File checksumFile = getMetaFile(b); return checksumFile.length(); } @Override // FSDatasetInterface public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { - File checksumFile = getMetaFile(b.getLocalBlock()); + File checksumFile = getMetaFile(b); return new MetaDataInputStream(new FileInputStream(checksumFile), checksumFile.length()); } @@ -933,14 +1117,22 @@ public class FSDataset implements FSCons */ @Override // FSDatasetInterface public long getLength(ExtendedBlock b) throws IOException { - return getBlockFile(b.getLocalBlock()).length(); + return getBlockFile(b).length(); } /** * Get File name for a given block. */ - public synchronized File getBlockFile(Block b) throws IOException { - File f = validateBlockFile(b); + public File getBlockFile(ExtendedBlock b) throws IOException { + return getBlockFile(b.getPoolId(), b.getLocalBlock()); + } + + /** + * Get File name for a given block. + */ + public synchronized File getBlockFile(String bpid, Block b) + throws IOException { + File f = validateBlockFile(bpid, b); if(f == null) { if (InterDatanodeProtocol.LOG.isDebugEnabled()) { InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap); @@ -953,13 +1145,13 @@ public class FSDataset implements FSCons @Override // FSDatasetInterface public synchronized InputStream getBlockInputStream(ExtendedBlock b) throws IOException { - return new FileInputStream(getBlockFile(b.getLocalBlock())); + return new FileInputStream(getBlockFile(b)); } @Override // FSDatasetInterface public synchronized InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { - File blockFile = getBlockFile(b.getLocalBlock()); + File blockFile = getBlockFile(b); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (seekOffset > 0) { blockInFile.seek(seekOffset); @@ -971,11 +1163,20 @@ public class FSDataset implements FSCons * Get the meta info of a block stored in volumeMap * @param b block * @return the meta replica information + */ + private ReplicaInfo getReplicaInfo(ExtendedBlock b) { + return volumeMap.get(b.getPoolId(), b.getLocalBlock()); + } + + /** + * Get the meta info of a block stored in volumeMap + * @param b block + * @return the meta replica information * @throws IOException if no entry is in the map or * there is a generation stamp mismatch */ - private ReplicaInfo getReplicaInfo(Block b) throws IOException { - ReplicaInfo info = volumeMap.get(b); + private ReplicaInfo getReplicaInfo(String bpid, Block b) throws IOException { + ReplicaInfo info = volumeMap.get(bpid, b); if (info == null) { throw new IOException("Block " + b + " does not exist in volumeMap."); } @@ -988,7 +1189,7 @@ public class FSDataset implements FSCons @Override // FSDatasetInterface public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { - ReplicaInfo info = getReplicaInfo(b.getLocalBlock()); + ReplicaInfo info = getReplicaInfo(b); File blockFile = info.getBlockFile(); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (blkOffset > 0) { @@ -1013,7 +1214,7 @@ public class FSDataset implements FSCons * @return - true if the specified block was unlinked or the block * is not in any snapshot. */ - public boolean unlinkBlock(Block block, int numLinks) throws IOException { + public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException { ReplicaInfo info = null; synchronized (this) { @@ -1102,7 +1303,7 @@ public class FSDataset implements FSCons " should be greater than the replica " + b + "'s generation stamp"); } // TODO:FEDERATION use ExtendedBlock - ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock()); + ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo == null) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); @@ -1118,13 +1319,15 @@ public class FSDataset implements FSCons " expected length is " + expectedBlockLen); } - return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); + return append(b.getPoolId(), (FinalizedReplica)replicaInfo, newGS, + b.getNumBytes()); } /** Append to a finalized replica * Change a finalized replica to be a RBW replica and * bump its generation stamp to be the newGS * + * @param bpid block pool Id * @param replicaInfo a finalized replica * @param newGS new generation stamp * @param estimateBlockLen estimate generation stamp @@ -1132,8 +1335,9 @@ public class FSDataset implements FSCons * @throws IOException if moving the replica from finalized directory * to rbw directory fails */ - private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo, - long newGS, long estimateBlockLen) throws IOException { + private synchronized ReplicaBeingWritten append(String bpid, + FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) + throws IOException { // unlink the finalized replica replicaInfo.unlinkBlock(1); @@ -1144,7 +1348,7 @@ public class FSDataset implements FSCons throw new DiskOutOfSpaceException("Insufficient space for appending to " + replicaInfo); } - File newBlkFile = new File(v.rbwDir, replicaInfo.getBlockName()); + File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); File oldmeta = replicaInfo.getMetaFile(); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, @@ -1177,14 +1381,14 @@ public class FSDataset implements FSCons } // Replace finalized replica by a RBW replica in replicas map - volumeMap.add(newReplicaInfo); + volumeMap.add(bpid, newReplicaInfo); return newReplicaInfo; } - private ReplicaInfo recoverCheck(Block b, long newGS, + private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); + ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo == null) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); @@ -1239,12 +1443,12 @@ public class FSDataset implements FSCons DataNode.LOG.info("Recover failed append to " + b); // TODO:FEDERATION use ExtendedBlock - ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS, - expectedBlockLen); + ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED ) { - return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); + return append(b.getPoolId(), (FinalizedReplica) replicaInfo, newGS, + b.getNumBytes()); } else { //RBW bumpReplicaGS(replicaInfo, newGS); return (ReplicaBeingWritten)replicaInfo; @@ -1256,13 +1460,13 @@ public class FSDataset implements FSCons long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed close " + b); // check replica's state - ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS, + ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS bumpReplicaGS(replicaInfo, newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeReplica(replicaInfo); + finalizeReplica(b.getPoolId(), replicaInfo); } } @@ -1296,7 +1500,7 @@ public class FSDataset implements FSCons @Override // FSDatasetInterface public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); + ReplicaInfo replicaInfo = volumeMap.get(b.getPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + @@ -1305,10 +1509,10 @@ public class FSDataset implements FSCons // create a new block FSVolume v = volumes.getNextVolume(b.getNumBytes()); // create a rbw file to hold block in the designated volume - File f = v.createRbwFile(b.getLocalBlock()); + File f = v.createRbwFile(b.getPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); - volumeMap.add(newReplicaInfo); + volumeMap.add(b.getPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -1318,7 +1522,7 @@ public class FSDataset implements FSCons throws IOException { DataNode.LOG.info("Recover the RBW replica " + b); - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); + ReplicaInfo replicaInfo = volumeMap.get(b.getPoolId(), b.getBlockId()); if (replicaInfo == null) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); @@ -1364,7 +1568,7 @@ public class FSDataset implements FSCons @Override // FSDatasetInterface public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); + ReplicaInfo replicaInfo = volumeMap.get(b.getPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + @@ -1373,10 +1577,10 @@ public class FSDataset implements FSCons FSVolume v = volumes.getNextVolume(b.getNumBytes()); // create a temporary file to hold block in the designated volume - File f = v.createTmpFile(b.getLocalBlock()); + File f = v.createTmpFile(b.getPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); - volumeMap.add(newReplicaInfo); + volumeMap.add(b.getPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -1397,14 +1601,14 @@ public class FSDataset implements FSCons channel.position(newPos); } - synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException { + synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException { if ( vol == null ) { - vol = getReplicaInfo( blk ).getVolume(); + vol = getReplicaInfo(bpid, blk).getVolume(); if ( vol == null ) { throw new IOException("Could not find volume for block " + blk); } } - return vol.createTmpFile(blk); + return vol.createTmpFile(bpid, blk); } // @@ -1420,17 +1624,17 @@ public class FSDataset implements FSCons */ @Override // FSDatasetInterface public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = getReplicaInfo(b.getLocalBlock()); + ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { // this is legal, when recovery happens on a file that has // been opened for append but never modified return; } - finalizeReplica(replicaInfo); + finalizeReplica(b.getPoolId(), replicaInfo); } - private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo) - throws IOException { + private synchronized FinalizedReplica finalizeReplica(String bpid, + ReplicaInfo replicaInfo) throws IOException { FinalizedReplica newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == @@ -1445,10 +1649,10 @@ public class FSDataset implements FSCons " for block " + replicaInfo); } - File dest = v.addBlock(replicaInfo, f); + File dest = v.addBlock(bpid, replicaInfo, f); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); } - volumeMap.add(newReplicaInfo); + volumeMap.add(bpid, newReplicaInfo); return newReplicaInfo; } @@ -1457,10 +1661,10 @@ public class FSDataset implements FSCons */ @Override // FSDatasetInterface public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock()); + ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap - volumeMap.remove(b.getLocalBlock()); + volumeMap.remove(b.getPoolId(), b.getLocalBlock()); // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), @@ -1500,12 +1704,17 @@ public class FSDataset implements FSCons * Generates a block report from the in-memory block map. */ @Override // FSDatasetInterface - public BlockListAsLongs getBlockReport() { - ArrayList finalized = - new ArrayList(volumeMap.size()); + public BlockListAsLongs getBlockReport(String bpid) { + // TODO:FEDERATION volumeMap.size() has not been synchronized - old code + int size = volumeMap.size(bpid); + ArrayList finalized = new ArrayList(size); ArrayList uc = new ArrayList(); + if (size == 0) { + return new BlockListAsLongs(finalized, uc); + } + synchronized(this) { - for (ReplicaInfo b : volumeMap.replicas()) { + for (ReplicaInfo b : volumeMap.replicas(bpid)) { switch(b.getState()) { case FINALIZED: finalized.add(b); @@ -1529,13 +1738,15 @@ public class FSDataset implements FSCons } /** - * Get the block list from in-memory blockmap. Note if - * is false, reference to the block in the volumeMap is returned. This block - * should not be changed. Suitable synchronization using {@link FSDataset} - * is needed to handle concurrent modification to the block. - */ - synchronized Block[] getBlockList(boolean deepcopy) { - Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]); + * Get the block list from in-memory blockmap for a block pool. + * + * Note if is false, reference to the block in the volumeMap is + * returned. This block should not be changed. Suitable synchronization using + * {@link FSDataset} is needed to handle concurrent modification to the block. + */ + synchronized Block[] getBlockList(String bpid, boolean deepcopy) { + Block[] list = volumeMap.replicas(bpid).toArray( + new Block[volumeMap.size(bpid)]); if (deepcopy) { for (int i = 0; i < list.length; i++) { list[i] = new Block(list[i]); @@ -1545,11 +1756,11 @@ public class FSDataset implements FSCons } /** - * Get the list of finalized blocks from in-memory blockmap. + * Get the list of finalized blocks from in-memory blockmap for a block pool. */ - synchronized List getFinalizedBlocks() { - ArrayList finalized = new ArrayList(volumeMap.size()); - for (ReplicaInfo b : volumeMap.replicas()) { + synchronized List getFinalizedBlocks(String bpid) { + ArrayList finalized = new ArrayList(volumeMap.size(bpid)); + for (ReplicaInfo b : volumeMap.replicas(bpid)) { if(b.getState() == ReplicaState.FINALIZED) { finalized.add(new Block(b)); } @@ -1563,7 +1774,7 @@ public class FSDataset implements FSCons */ @Override // FSDatasetInterface public boolean isValidBlock(ExtendedBlock b) { - ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock()); + ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo == null || replicaInfo.getState() != ReplicaState.FINALIZED) { return false; @@ -1574,9 +1785,9 @@ public class FSDataset implements FSCons /** * Find the file corresponding to the block and return it if it exists. */ - File validateBlockFile(Block b) throws IOException { + File validateBlockFile(String bpid, Block b) throws IOException { //Should we check for metadata file too? - File f = getFile(b); + File f = getFile(bpid, b); if(f != null ) { if(f.exists()) @@ -1606,7 +1817,7 @@ public class FSDataset implements FSCons } //check replica's meta file - final File metafile = getMetaFile(f, r); + final File metafile = getMetaFile(f, r.getGenerationStamp()); if (!metafile.exists()) { throw new IOException("Metafile " + metafile + " does not exist, r=" + r); } @@ -1627,8 +1838,8 @@ public class FSDataset implements FSCons File f = null; FSVolume v; synchronized (this) { - f = getFile(invalidBlks[i]); - ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]); + f = getFile(bpid, invalidBlks[i]); + ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]); if (dinfo == null || dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) { DataNode.LOG.warn("Unexpected error trying to delete block " @@ -1667,15 +1878,16 @@ public class FSDataset implements FSCons (replicaState == ReplicaState.RUR && ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() == ReplicaState.FINALIZED)) { - v.clearPath(parent); + v.clearPath(bpid, parent); } - volumeMap.remove(invalidBlks[i]); + volumeMap.remove(bpid, invalidBlks[i]); } - File metaFile = getMetaFile( f, invalidBlks[i] ); + File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp()); long dfsBytes = f.length() + metaFile.length(); // Delete the block asynchronously to make sure we can do it fast enough - asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString()); + asyncDiskService.deleteAsync(v, bpid, f, metaFile, dfsBytes, + invalidBlks[i].toString()); } if (error) { throw new IOException("Error in deleting blocks."); @@ -1685,17 +1897,18 @@ public class FSDataset implements FSCons /** * Turn the block identifier into a filename; ignore generation stamp!!! */ - public synchronized File getFile(Block b) { - return getFile(b.getBlockId()); + public synchronized File getFile(String bpid, Block b) { + return getFile(bpid, b.getBlockId()); } /** * Turn the block identifier into a filename + * @param bpid Block pool Id * @param blockId a block's id * @return on disk data file path; null if the replica does not exist */ - private File getFile(long blockId) { - ReplicaInfo info = volumeMap.get(blockId); + private File getFile(String bpid, long blockId) { + ReplicaInfo info = volumeMap.get(bpid, blockId); if (info != null) { return info.getBlockFile(); } @@ -1720,19 +1933,19 @@ public class FSDataset implements FSCons // remove related blocks long mlsec = System.currentTimeMillis(); synchronized (this) { - Iterator ib = volumeMap.replicas().iterator(); - while(ib.hasNext()) { - ReplicaInfo b = ib.next(); - total_blocks ++; - // check if the volume block belongs to still valid - FSVolume vol = b.getVolume(); - for(FSVolume fv: failed_vols) { - if(vol == fv) { - DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol " - + vol.dataDir.dir.getAbsolutePath()); - ib.remove(); - removed_blocks++; - break; + for(FSVolume fv: failed_vols) { + for (String bpid : fv.map.keySet()) { + Iterator ib = volumeMap.replicas(bpid).iterator(); + while(ib.hasNext()) { + ReplicaInfo b = ib.next(); + total_blocks ++; + // check if the volume block belongs to still valid + if(b.getVolume() == fv) { + DataNode.LOG.warn("removing " + bpid + ":" + b.getBlockId() + + " from vol " + fv.currentDir.getAbsolutePath()); + ib.remove(); + removed_blocks++; + } } } } @@ -1744,7 +1957,7 @@ public class FSDataset implements FSCons // report the error StringBuilder sb = new StringBuilder(); for(FSVolume fv : failed_vols) { - sb.append(fv.dataDir.dir.getAbsolutePath() + ";"); + sb.append(fv.currentDir.getAbsolutePath() + ";"); } throw new DiskErrorException("DataNode failed volumes:" + sb); @@ -1797,7 +2010,7 @@ public class FSDataset implements FSCons if(volumes != null) { for (FSVolume volume : volumes.volumes) { if(volume != null) { - volume.dfsUsage.shutdown(); + volume.shutdown(); } } } @@ -1830,13 +2043,13 @@ public class FSDataset implements FSCons * @param diskMetaFile Metadata file from on the disk * @param vol Volume of the block file */ - public void checkAndUpdate(long blockId, File diskFile, + public void checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FSVolume vol) { DataNode datanode = DataNode.getDataNode(); Block corruptBlock = null; ReplicaInfo memBlockInfo; synchronized (this) { - memBlockInfo = volumeMap.get(blockId); + memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { // Block is not finalized - ignore the difference return; @@ -1860,7 +2073,7 @@ public class FSDataset implements FSCons if (!memBlockInfo.getBlockFile().exists()) { // Block is in memory and not on the disk // Remove the block from volumeMap - volumeMap.remove(blockId); + volumeMap.remove(bpid, blockId); if (datanode.blockScanner != null) { // TODO:FEDERATION pass the right bpid datanode.blockScanner.deleteBlock("TODO", new Block(blockId)); @@ -1883,9 +2096,9 @@ public class FSDataset implements FSCons // Block is missing in memory - add the block to volumeMap ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); - volumeMap.add(diskBlockInfo); + volumeMap.add(bpid, diskBlockInfo); if (datanode.blockScanner != null) { - datanode.blockScanner.addBlock(new ExtendedBlock(diskBlockInfo)); + datanode.blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); } DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo); return; @@ -1921,7 +2134,8 @@ public class FSDataset implements FSCons // Compare generation stamp if (memBlockInfo.getGenerationStamp() != diskGS) { - File memMetaFile = getMetaFile(diskFile, memBlockInfo); + File memMetaFile = getMetaFile(diskFile, + memBlockInfo.getGenerationStamp()); if (memMetaFile.exists()) { if (memMetaFile.compareTo(diskMetaFile) != 0) { DataNode.LOG.warn("Metadata file in memory " @@ -1957,7 +2171,7 @@ public class FSDataset implements FSCons // Send corrupt block report outside the lock if (corruptBlock != null) { DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) }; - LocatedBlock[] blocks = { new LocatedBlock(corruptBlock, dnArr) }; + LocatedBlock[] blocks = { new LocatedBlock(bpid, corruptBlock, dnArr) }; try { datanode.namenode.reportBadBlocks(blocks); DataNode.LOG.warn("Reporting the block " + corruptBlock @@ -1970,26 +2184,26 @@ public class FSDataset implements FSCons } /** - * @deprecated use {@link #fetchReplicaInfo(long)} instead. + * @deprecated use {@link #fetchReplicaInfo(String, long)} instead. */ @Override // FSDatasetInterface @Deprecated - public ReplicaInfo getReplica(long blockId) { + public ReplicaInfo getReplica(String bpid, long blockId) { assert(Thread.holdsLock(this)); - return volumeMap.get(blockId); + return volumeMap.get(bpid, blockId); } @Override // FSDatasetInterface public synchronized ReplicaRecoveryInfo initReplicaRecovery( RecoveringBlock rBlock) throws IOException { - return initReplicaRecovery( + return initReplicaRecovery(rBlock.getBlock().getPoolId(), volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp()); } /** static version of {@link #initReplicaRecovery(Block, long)}. */ - static ReplicaRecoveryInfo initReplicaRecovery( + static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicasMap map, Block block, long recoveryId) throws IOException { - final ReplicaInfo replica = map.get(block.getBlockId()); + final ReplicaInfo replica = map.get(bpid, block.getBlockId()); DataNode.LOG.info("initReplicaRecovery: block=" + block + ", recoveryId=" + recoveryId + ", replica=" + replica); @@ -2044,7 +2258,7 @@ public class FSDataset implements FSCons } else { rur = new ReplicaUnderRecovery(replica, recoveryId); - map.add(rur); + map.add(bpid, rur); DataNode.LOG.info("initReplicaRecovery: changing replica state for " + block + " from " + replica.getState() + " to " + rur.getState()); @@ -2058,7 +2272,8 @@ public class FSDataset implements FSCons final long recoveryId, final long newlength) throws IOException { //get replica - final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockId()); + final ReplicaInfo replica = volumeMap.get(oldBlock.getPoolId(), + oldBlock.getBlockId()); DataNode.LOG.info("updateReplica: block=" + oldBlock + ", recoveryId=" + recoveryId + ", length=" + newlength @@ -2086,8 +2301,8 @@ public class FSDataset implements FSCons checkReplicaFiles(replica); //update replica - final FinalizedReplica finalized = updateReplicaUnderRecovery( - (ReplicaUnderRecovery)replica, recoveryId, newlength); + final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock + .getPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength); //check replica files after update checkReplicaFiles(finalized); @@ -2095,6 +2310,7 @@ public class FSDataset implements FSCons } private FinalizedReplica updateReplicaUnderRecovery( + String bpid, ReplicaUnderRecovery rur, long recoveryId, long newlength) throws IOException { @@ -2121,13 +2337,13 @@ public class FSDataset implements FSCons } // finalize the block - return finalizeReplica(rur); + return finalizeReplica(bpid, rur); } @Override // FSDatasetInterface public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - final Replica replica = volumeMap.get(block.getBlockId()); + final Replica replica = volumeMap.get(block.getPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } @@ -2138,6 +2354,14 @@ public class FSDataset implements FSCons } return replica.getVisibleLength(); } + + public synchronized void addBlockPool(String bpid, Configuration conf) + throws IOException { + DataNode.LOG.info("Adding block pool " + bpid); + volumes.addBlockPool(bpid, conf); + volumes.getVolumeMap(bpid, volumeMap); + } + /** * Class for representing the Datanode volume information */ Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Fri Feb 25 22:42:39 2011 @@ -147,12 +147,12 @@ class FSDatasetAsyncDiskService { * Delete the block file and meta file from the disk asynchronously, adjust * dfsUsed statistics accordingly. */ - void deleteAsync(FSDataset.FSVolume volume, File blockFile, + void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile, File metaFile, long dfsBytes, String blockName) { DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile + " for deletion"); ReplicaFileDeleteTask deletionTask = - new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes, + new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, dfsBytes, blockName); execute(volume.getCurrentDir(), deletionTask); } @@ -161,16 +161,17 @@ class FSDatasetAsyncDiskService { * as decrement the dfs usage of the volume. */ static class ReplicaFileDeleteTask implements Runnable { - - FSDataset.FSVolume volume; - File blockFile; - File metaFile; - long dfsBytes; - String blockName; + final FSDataset.FSVolume volume; + final String blockPoolId; + final File blockFile; + final File metaFile; + final long dfsBytes; + final String blockName; - ReplicaFileDeleteTask(FSDataset.FSVolume volume, File blockFile, - File metaFile, long dfsBytes, String blockName) { + ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid, + File blockFile, File metaFile, long dfsBytes, String blockName) { this.volume = volume; + this.blockPoolId = bpid; this.blockFile = blockFile; this.metaFile = metaFile; this.dfsBytes = dfsBytes; @@ -184,18 +185,21 @@ class FSDatasetAsyncDiskService { @Override public String toString() { // Called in AsyncDiskService.execute for displaying error messages. - return "deletion of block " + blockName + " with block file " + blockFile - + " and meta file " + metaFile + " from volume " + volume; + return "deletion of block " + blockPoolId + " " + blockName + + " with block file " + blockFile + " and meta file " + metaFile + + " from volume " + volume; } @Override public void run() { if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) { DataNode.LOG.warn("Unexpected error trying to delete block " - + blockName + " at file " + blockFile + ". Ignored."); + + blockPoolId + " " + blockName + " at file " + blockFile + + ". Ignored."); } else { - volume.decDfsUsed(dfsBytes); - DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile); + volume.decDfsUsed(blockPoolId, dfsBytes); + DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName + + " at file " + blockFile); } } }; Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Fri Feb 25 22:42:39 2011 @@ -25,6 +25,7 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -103,12 +104,12 @@ public interface FSDatasetInterface exte * @return replica from the replicas map */ @Deprecated - public Replica getReplica(long blockId); + public Replica getReplica(String bpid, long blockId); /** * @return the generation stamp stored with the block. */ - public Block getStoredBlock(String poolId, long blkid) + public Block getStoredBlock(String bpid, long blkid) throws IOException; /** @@ -271,10 +272,12 @@ public interface FSDatasetInterface exte public void unfinalizeBlock(ExtendedBlock b) throws IOException; /** - * Returns the block report - the full list of blocks stored + * Returns the block report - the full list of blocks stored under a + * block pool + * @param bpid Block Pool Id * @return - the block report - the full list of blocks stored */ - public BlockListAsLongs getBlockReport(); + public BlockListAsLongs getBlockReport(String bpid); /** * Is the block valid? @@ -285,10 +288,11 @@ public interface FSDatasetInterface exte /** * Invalidates the specified blocks + * @param bpid Block pool Id * @param invalidBlks - the blocks to be invalidated * @throws IOException */ - public void invalidate(String poolId, Block invalidBlks[]) throws IOException; + public void invalidate(String bpid, Block invalidBlks[]) throws IOException; /** * Check if all the data directories are healthy @@ -330,12 +334,11 @@ public interface FSDatasetInterface exte /** * Initialize a replica recovery. - * * @return actual state of the replica on this data-node or * null if data-node does not have the replica. */ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) - throws IOException; + throws IOException; /** * Update replica's generation stamp and length and finalize it. @@ -344,4 +347,10 @@ public interface FSDatasetInterface exte ExtendedBlock oldBlock, long recoveryId, long newLength) throws IOException; + /** + * add new block pool ID + * @param bpid Block pool Id + * @param conf Configuration + */ + public void addBlockPool(String bpid, Configuration conf) throws IOException; } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java Fri Feb 25 22:42:39 2011 @@ -19,24 +19,39 @@ package org.apache.hadoop.hdfs.server.da import java.util.Collection; import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.hdfs.protocol.Block; class ReplicasMap { - // HashMap: maps a block id to the replica's meta info - private HashMap map = new HashMap(); + // Map of block pool Id to another map of block Id to ReplicaInfo. + private Map> map = + new HashMap>(); + + private void checkBlockPool(String bpid) { + if (bpid == null) { + throw new IllegalArgumentException("Block Pool Id is null"); + } + } + + private void checkBlock(Block b) { + if (b == null) { + throw new IllegalArgumentException("Block is null"); + } + } + /** * Get the meta information of the replica that matches both block id * and generation stamp + * @param bpid block pool id * @param block block with its id as the key * @return the replica's meta information - * @throws IllegalArgumentException if the input block is null + * @throws IllegalArgumentException if the input block or block pool is null */ - ReplicaInfo get(Block block) { - if (block == null) { - throw new IllegalArgumentException("Do not expect null block"); - } - ReplicaInfo replicaInfo = get(block.getBlockId()); + ReplicaInfo get(String bpid, Block block) { + checkBlockPool(bpid); + checkBlock(block); + ReplicaInfo replicaInfo = get(bpid, block.getBlockId()); if (replicaInfo != null && block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { return replicaInfo; @@ -44,72 +59,95 @@ class ReplicasMap { return null; } + /** * Get the meta information of the replica that matches the block id + * @param bpid block pool id * @param blockId a block's id * @return the replica's meta information */ - ReplicaInfo get(long blockId) { - return map.get(blockId); + ReplicaInfo get(String bpid, long blockId) { + checkBlockPool(bpid); + Map m = map.get(bpid); + return m != null ? m.get(blockId) : null; } /** * Add a replica's meta information into the map * + * @param bpid block pool id * @param replicaInfo a replica's meta information * @return previous meta information of the replica * @throws IllegalArgumentException if the input parameter is null */ - ReplicaInfo add(ReplicaInfo replicaInfo) { - if (replicaInfo == null) { - throw new IllegalArgumentException("Do not expect null block"); + ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { + checkBlockPool(bpid); + checkBlock(replicaInfo); + Map m = map.get(bpid); + if (m == null) { + // Add an entry for block pool if it does not exist already + m = new HashMap(); + map.put(bpid, m); } - return map.put(replicaInfo.getBlockId(), replicaInfo); + return m.put(replicaInfo.getBlockId(), replicaInfo); } /** * Remove the replica's meta information from the map that matches * the input block's id and generation stamp + * @param bpid block pool id * @param block block with its id as the key * @return the removed replica's meta information * @throws IllegalArgumentException if the input block is null */ - ReplicaInfo remove(Block block) { - if (block == null) { - throw new IllegalArgumentException("Do not expect null block"); - } - Long key = Long.valueOf(block.getBlockId()); - ReplicaInfo replicaInfo = map.get(key); - if (replicaInfo != null && - block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { - return remove(key); - } + ReplicaInfo remove(String bpid, Block block) { + checkBlockPool(bpid); + checkBlock(block); + Map m = map.get(bpid); + if (m != null) { + Long key = Long.valueOf(block.getBlockId()); + ReplicaInfo replicaInfo = m.get(key); + if (replicaInfo != null && + block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { + return m.remove(key); + } + } return null; } /** * Remove the replica's meta information from the map if present + * @param bpid block pool id * @param the block id of the replica to be removed * @return the removed replica's meta information */ - ReplicaInfo remove(long blockId) { - return map.remove(blockId); + ReplicaInfo remove(String bpid, long blockId) { + checkBlockPool(bpid); + Map m = map.get(bpid); + if (m != null) { + return m.remove(blockId); + } + return null; } /** - * Get the size of the map + * Get the size of the map for given block pool + * @param bpid block pool id * @return the number of replicas in the map */ - int size() { - return map.size(); + int size(String bpid) { + Map m = map.get(bpid); + return m != null ? m.size() : 0; } /** - * Get a collection of the replicas + * Get a collection of the replicas for given block pool + * @param bpid block pool id * @return a collection of the replicas */ - Collection replicas() { - return map.values(); + Collection replicas(String bpid) { + Map m = map.get(bpid); + return m != null ? m.values() : null; } -} +} \ No newline at end of file Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Feb 25 22:42:39 2011 @@ -354,16 +354,11 @@ public class DatanodeDescriptor extends return replicateBlocks.poll(maxTransfers); } - BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) { + BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { List blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) return null; - BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size()); - for(BlockInfoUnderConstruction b : blocks) { - brCommand.add(new RecoveringBlock( - new ExtendedBlock(b), b.getExpectedLocations(), b.getBlockRecoveryId())); - } - return brCommand; + return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]); } /** Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Feb 25 22:42:39 2011 @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -2731,9 +2733,17 @@ public class FSNamesystem implements FSC updateStats(nodeinfo, true); //check lease recovery - cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); - if (cmd != null) { - return new DatanodeCommand[] {cmd}; + BlockInfoUnderConstruction[] blocks = nodeinfo + .getLeaseRecoveryCommand(Integer.MAX_VALUE); + if (blocks != null) { + BlockRecoveryCommand brCommand = new BlockRecoveryCommand( + blocks.length); + for (BlockInfoUnderConstruction b : blocks) { + brCommand.add(new RecoveringBlock( + new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b + .getBlockRecoveryId())); + } + return new DatanodeCommand[] { brCommand }; } ArrayList cmds = new ArrayList(3); Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/TestFiHftp.java Fri Feb 25 22:42:39 2011 @@ -161,7 +161,7 @@ public class TestFiHftp { final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort()); LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")"); final FSDataset data = (FSDataset)dn.getFSDataset(); - final File blkfile = data.getBlockFile(blk.getLocalBlock()); + final File blkfile = data.getBlockFile(blk); Assert.assertTrue(blkfile.delete()); //read again by hftp, should get an exception Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Feb 25 22:42:39 2011 @@ -827,6 +827,7 @@ public class MiniDFSCluster { /* * Corrupt a block on a particular datanode + * Types: delete, write bad data, truncate */ boolean corruptBlockOnDataNode(int i, ExtendedBlock blk) throws Exception { Random random = new Random(); @@ -1094,11 +1095,12 @@ public class MiniDFSCluster { * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes() * @return the block report for the specified data node */ - public Iterable getBlockReport(int dataNodeIndex) { + public Iterable getBlockReport(String bpid, int dataNodeIndex) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } - return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(); + return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport( + bpid); } @@ -1107,11 +1109,11 @@ public class MiniDFSCluster { * @return block reports from all data nodes * BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes() */ - public Iterable[] getAllBlockReports() { + public Iterable[] getAllBlockReports(String bpid) { int numDataNodes = dataNodes.size(); Iterable[] result = new BlockListAsLongs[numDataNodes]; for (int i = 0; i < numDataNodes; ++i) { - result[i] = getBlockReport(i); + result[i] = getBlockReport(bpid, i); } return result; } Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Fri Feb 25 22:42:39 2011 @@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.shell.Count; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.FSDataset; import org.apache.hadoop.io.IOUtils; @@ -1108,14 +1107,12 @@ public class TestDFSShell extends TestCa static List getBlockFiles(MiniDFSCluster cluster) throws IOException { List files = new ArrayList(); List datanodes = cluster.getDataNodes(); - Iterable[] blocks = cluster.getAllBlockReports(); - ExtendedBlock blk = new ExtendedBlock(); String poolId = cluster.getNamesystem().getPoolId(); + Iterable[] blocks = cluster.getAllBlockReports(poolId); for(int i = 0; i < blocks.length; i++) { FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset(); for(Block b : blocks[i]) { - blk.set(poolId, b); - files.add(ds.getBlockFile(blk.getLocalBlock())); + files.add(ds.getBlockFile(poolId, b)); } } return files; Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Feb 25 22:42:39 2011 @@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -420,7 +419,8 @@ public class TestDataTransferProtocol ex /* Test OP_READ_BLOCK */ - ExtendedBlock blk = new ExtendedBlock(firstBlock.getLocalBlock()); + String bpid = cluster.getNamesystem().getPoolId(); + ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock()); long blkid = blk.getBlockId(); // bad block id sendBuf.reset(); Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Fri Feb 25 22:42:39 2011 @@ -135,7 +135,7 @@ public class TestFileAppend extends Test // for (int i = 0; i < blocks.size(); i = i + 2) { ExtendedBlock b = blocks.get(i).getBlock(); - File f = dataset.getFile(b.getLocalBlock()); + File f = dataset.getFile(b.getPoolId(), b.getLocalBlock()); File link = new File(f.toString() + ".link"); System.out.println("Creating hardlink for File " + f + " to " + link); HardLink.createHardLink(f, link); @@ -148,7 +148,7 @@ public class TestFileAppend extends Test ExtendedBlock b = blocks.get(i).getBlock(); System.out.println("testCopyOnWrite detaching block " + b); assertTrue("Detaching block " + b + " should have returned true", - dataset.unlinkBlock(b.getLocalBlock(), 1)); + dataset.unlinkBlock(b, 1)); } // Since the blocks were already detached earlier, these calls should @@ -158,7 +158,7 @@ public class TestFileAppend extends Test ExtendedBlock b = blocks.get(i).getBlock(); System.out.println("testCopyOnWrite detaching block " + b); assertTrue("Detaching block " + b + " should have returned false", - !dataset.unlinkBlock(b.getLocalBlock(), 1)); + !dataset.unlinkBlock(b, 1)); } } finally { Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Fri Feb 25 22:42:39 2011 @@ -199,7 +199,7 @@ public class TestFileAppend3 extends jun assertEquals(repl, datanodeinfos.length); final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort()); final FSDataset data = (FSDataset)dn.getFSDataset(); - final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk.getLocalBlock()), "rw"); + final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw"); AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")"); assertEquals(len1, raf.length()); raf.setLength(0); Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1074727&r1=1074726&r2=1074727&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Fri Feb 25 22:42:39 2011 @@ -27,12 +27,9 @@ import java.util.EnumSet; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.ParentNotDirectoryException; @@ -768,7 +765,7 @@ public class TestFileCreation extends ju FSDataset dataset = (FSDataset)datanode.data; ExtendedBlock blk = locatedblock.getBlock(); Block b = dataset.getStoredBlock(blk.getPoolId(), blk.getBlockId()); - File blockfile = dataset.findBlockFile(b.getBlockId()); + File blockfile = dataset.findBlockFile(blk.getPoolId(), b.getBlockId()); System.out.println("blockfile=" + blockfile); if (blockfile != null) { BufferedReader in = new BufferedReader(new FileReader(blockfile));