Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 61035 invoked from network); 22 Feb 2011 20:31:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Feb 2011 20:31:47 -0000 Received: (qmail 61765 invoked by uid 500); 22 Feb 2011 20:31:47 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 61737 invoked by uid 500); 22 Feb 2011 20:31:46 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 61729 invoked by uid 99); 22 Feb 2011 20:31:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Feb 2011 20:31:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Feb 2011 20:31:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EA2B2238896F; Tue, 22 Feb 2011 20:31:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1073489 [1/2] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/server/proto... Date: Tue, 22 Feb 2011 20:31:15 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110222203115.EA2B2238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Tue Feb 22 20:31:14 2011 New Revision: 1073489 URL: http://svn.apache.org/viewvc?rev=1073489&view=rev Log: HDFS-1450. Federation: Introduce block pool ID into FSDatasetInterface. (suresh) Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDatanodeDescriptor.java hadoop/hdfs/branches/HDFS-1052/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original) +++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Feb 22 20:31:14 2011 @@ -4,6 +4,9 @@ Trunk (unreleased changes) NEW FEATURES + HDFS-1450. Federation: Introduce block pool ID into FSDatasetInterface. + (suresh) + IMPROVEMENTS HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel) Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Tue Feb 22 20:31:14 2011 @@ -54,6 +54,11 @@ public class ExtendedBlock implements Wr this("TODO", b); } + // TODO:FEDERATION To remove when block pool ID related coding is complete + public ExtendedBlock(final long blkId) { + this("TODO", new Block(blkId)); + } + public ExtendedBlock(final ExtendedBlock b) { this(b.poolId, b.block); } @@ -128,9 +133,9 @@ public class ExtendedBlock implements Wr block.setNumBytes(len); } - public void set(String poolId, long blkid, long gs, long len) { + public void set(String poolId, Block blk) { this.poolId = poolId; - block.set(blkid, gs, len); + this.block = blk; } public static Block getLocalBlock(final ExtendedBlock b) { Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Feb 22 20:31:14 2011 @@ -35,7 +35,6 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.FSOutputSummer; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; @@ -98,30 +97,31 @@ class BlockReceiver implements java.io.C // // Open local disk out // - // TODO:FEDERATION use ExtendedBlock in the following method calls if (clientName.length() == 0) { //replication or move - replicaInfo = datanode.data.createTemporary(block.getLocalBlock()); + replicaInfo = datanode.data.createTemporary(block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(block.getLocalBlock()); + replicaInfo = datanode.data.createRbw(block); break; case PIPELINE_SETUP_STREAMING_RECOVERY: replicaInfo = datanode.data.recoverRbw( - block.getLocalBlock(), newGs, minBytesRcvd, maxBytesRcvd); + block, newGs, minBytesRcvd, maxBytesRcvd); block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND: - replicaInfo = datanode.data.append(block.getLocalBlock(), newGs, minBytesRcvd); + replicaInfo = datanode.data.append(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { // remove from block scanner - datanode.blockScanner.deleteBlock(block.getLocalBlock()); + datanode.blockScanner.deleteBlock(block.getPoolId(), + block.getLocalBlock()); } block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND_RECOVERY: - replicaInfo = datanode.data.recoverAppend(block.getLocalBlock(), newGs, minBytesRcvd); + replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { // remove from block scanner - datanode.blockScanner.deleteBlock(block.getLocalBlock()); + datanode.blockScanner.deleteBlock(block.getPoolId(), + block.getLocalBlock()); } block.setGenerationStamp(newGs); break; @@ -615,7 +615,7 @@ class BlockReceiver implements java.io.C try { if (clientName.length() > 0) { responder = new Daemon(datanode.threadGroup, - new PacketResponder(this, block.getLocalBlock(), mirrIn, replyOut, + new PacketResponder(this, block, mirrIn, replyOut, numTargets, Thread.currentThread())); responder.start(); // start thread to processes reponses } @@ -642,8 +642,7 @@ class BlockReceiver implements java.io.C // Finalize the block. Does this fsync()? block.setNumBytes(replicaInfo.getNumBytes()); - // TODO:FEDERATION use ExtendedBlock - datanode.data.finalizeBlock(block.getLocalBlock()); + datanode.data.finalizeBlock(block); datanode.myMetrics.blocksWritten.inc(); } @@ -675,8 +674,7 @@ class BlockReceiver implements java.io.C */ private void cleanupBlock() throws IOException { if (clientName.length() == 0) { // not client write - // TODO:FEDERATION use ExtendedBlock - datanode.data.unfinalizeBlock(block.getLocalBlock()); + datanode.data.unfinalizeBlock(block); } } @@ -693,8 +691,7 @@ class BlockReceiver implements java.io.C } // rollback the position of the meta file - // TODO:FEDERATION use ExtendedBlock - datanode.data.adjustCrcChannelPosition(block.getLocalBlock(), streams, checksumSize); + datanode.data.adjustCrcChannelPosition(block, streams, checksumSize); } /** @@ -722,8 +719,7 @@ class BlockReceiver implements java.io.C byte[] crcbuf = new byte[checksumSize]; FSDataset.BlockInputStreams instr = null; try { - // TODO:FEDERATION use ExtendedBlock - instr = datanode.data.getTmpInputStreams(block.getLocalBlock(), blkoff, ckoff); + instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff); IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier @@ -758,7 +754,7 @@ class BlockReceiver implements java.io.C //packet waiting for ack private LinkedList ackQueue = new LinkedList(); private volatile boolean running = true; - private Block block; + private ExtendedBlock block; DataInputStream mirrorIn; // input from downstream datanode DataOutputStream replyOut; // output to upstream datanode private int numTargets; // number of downstream datanodes including myself @@ -769,7 +765,7 @@ class BlockReceiver implements java.io.C return "PacketResponder " + numTargets + " for Block " + this.block; } - PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, + PacketResponder(BlockReceiver receiver, ExtendedBlock b, DataInputStream in, DataOutputStream out, int numTargets, Thread receiverThread) { this.receiverThread = receiverThread; Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Feb 22 20:31:14 2011 @@ -145,10 +145,9 @@ class BlockSender implements java.io.Clo this.transferToAllowed = datanode.transferToAllowed; this.clientTraceFmt = clientTraceFmt; - // TODO:FEDERATION metaFileExists and getMetaDataInputStream should take ExtendedBlock - if ( !corruptChecksumOk || datanode.data.metaFileExists(block.getLocalBlock()) ) { + if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { checksumIn = new DataInputStream(new BufferedInputStream(datanode.data - .getMetaDataInputStream(block.getLocalBlock()), BUFFER_SIZE)); + .getMetaDataInputStream(block), BUFFER_SIZE)); // read and handle the common header here. For now just a version BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); @@ -230,8 +229,7 @@ class BlockSender implements java.io.Clo DataNode.LOG.debug("replica=" + replica); } - // TODO:FEDERATION getBlockInputStream must acccept ExtendedBlock - blockIn = datanode.data.getBlockInputStream(block.getLocalBlock(), offset); // seek to offset + blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset } catch (IOException ioe) { IOUtils.closeStream(this); IOUtils.closeStream(blockIn); Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Feb 22 20:31:14 2011 @@ -267,18 +267,20 @@ class DataBlockScanner implements Runnab } /** Adds block to list of blocks */ - synchronized void addBlock(Block block) { + synchronized void addBlock(ExtendedBlock block) { if (!isInitialized()) { return; } - BlockScanInfo info = blockMap.get(block); + // TODO:FEDERATION use ExtendedBlock + BlockScanInfo info = blockMap.get(block.getLocalBlock()); if ( info != null ) { LOG.warn("Adding an already existing block " + block); delBlockInfo(info); } - info = new BlockScanInfo(block); + // TODO:FEDERATION use ExtendedBlock + info = new BlockScanInfo(block.getLocalBlock()); info.lastScanTime = getNewBlockScanTime(); addBlockInfo(info); @@ -286,10 +288,11 @@ class DataBlockScanner implements Runnab } /** Deletes the block from internal structures */ - synchronized void deleteBlock(Block block) { + synchronized void deleteBlock(String bpid, Block block) { if (!isInitialized()) { return; } + // FEDERATION:TODO use bpid BlockScanInfo info = blockMap.get(block); if ( info != null ) { delBlockInfo(info); @@ -306,9 +309,9 @@ class DataBlockScanner implements Runnab } /** Deletes blocks from internal structures */ - void deleteBlocks(Block[] blocks) { + void deleteBlocks(String bpid, Block[] blocks) { for ( Block b : blocks ) { - deleteBlock(b); + deleteBlock(bpid, b); } } @@ -359,7 +362,7 @@ class DataBlockScanner implements Runnab } } - private void handleScanFailure(Block block) { + private void handleScanFailure(ExtendedBlock block) { LOG.info("Reporting bad block " + block + " to namenode."); @@ -422,8 +425,7 @@ class DataBlockScanner implements Runnab throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE)); } - private void verifyBlock(Block block) { - + private void verifyBlock(ExtendedBlock block) { BlockSender blockSender = null; /* In case of failure, attempt to read second time to reduce @@ -436,9 +438,8 @@ class DataBlockScanner implements Runnab try { adjustThrottler(); - // TODO:FEDERATION use ExtendedBlock - blockSender = new BlockSender(new ExtendedBlock(block), 0, -1, false, - false, true, datanode); + blockSender = new BlockSender(block, 0, -1, false, false, true, + datanode); DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream()); @@ -452,18 +453,19 @@ class DataBlockScanner implements Runnab totalTransientErrors++; } - updateScanStatus(block, ScanType.VERIFICATION_SCAN, true); + // TODO:FEDERATION use Extended block + updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true); return; } catch (IOException e) { - - updateScanStatus(block, ScanType.VERIFICATION_SCAN, false); + // TODO:FEDERATION use Extended block + updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false); // If the block does not exists anymore, then its not an error - if ( dataset.getFile(block) == null ) { + if ( dataset.getFile(block.getLocalBlock()) == null ) { LOG.info("Verification failed for " + block + ". Its ok since " + "it not in datanode dataset anymore."); - deleteBlock(block); + deleteBlock(block.getPoolId(), block.getLocalBlock()); return; } @@ -503,7 +505,8 @@ class DataBlockScanner implements Runnab } if ( block != null ) { - verifyBlock(block); + // TODO:FEDERATION blockInfoSet should use ExtendedBlock + verifyBlock(new ExtendedBlock(block)); } } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Feb 22 20:31:14 2011 @@ -1008,7 +1008,7 @@ public class DataNode extends Configured switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - transferBlocks(bcmd.getBlocks(), bcmd.getTargets()); + transferBlocks(bcmd.getPoolId(), bcmd.getBlocks(), bcmd.getTargets()); myMetrics.blocksReplicated.inc(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: @@ -1019,9 +1019,9 @@ public class DataNode extends Configured Block toDelete[] = bcmd.getBlocks(); try { if (blockScanner != null) { - blockScanner.deleteBlocks(toDelete); + blockScanner.deleteBlocks(bcmd.getPoolId(), toDelete); } - data.invalidate(toDelete); + data.invalidate(bcmd.getPoolId(), toDelete); } catch(IOException e) { checkDiskError(); throw e; @@ -1171,8 +1171,7 @@ public class DataNode extends Configured private void transferBlock( ExtendedBlock block, DatanodeInfo xferTargets[] ) throws IOException { - // TODO:FEDERATION use ExtendedBlock - if (!data.isValidBlock(block.getLocalBlock())) { + if (!data.isValidBlock(block)) { // block does not exist or is under-construction String errStr = "Can't send invalid block " + block; LOG.info(errStr); @@ -1183,8 +1182,7 @@ public class DataNode extends Configured } // Check if NN recorded length matches on-disk length - // TODO:FEDERATION use ExtendedBlock - long onDiskLength = data.getLength(block.getLocalBlock()); + long onDiskLength = data.getLength(block); if (block.getNumBytes() > onDiskLength) { // Shorter on-disk len indicates corruption so report NN the corrupt block namenode.reportBadBlocks(new LocatedBlock[]{ @@ -1212,13 +1210,11 @@ public class DataNode extends Configured } } - private void transferBlocks( Block blocks[], - DatanodeInfo xferTargets[][] - ) { + private void transferBlocks(String poolId, Block blocks[], + DatanodeInfo xferTargets[][]) { for (int i = 0; i < blocks.length; i++) { try { - // TODO:FEDERATION cleanup - transferBlock(new ExtendedBlock(blocks[i]), xferTargets[i]); + transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -1230,13 +1226,15 @@ public class DataNode extends Configured * till namenode is informed before responding with success to the * client? For now we don't. */ - protected void notifyNamenodeReceivedBlock(Block block, String delHint) { + protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { if(block==null || delHint==null) { throw new IllegalArgumentException(block==null?"Block is null":"delHint is null"); } synchronized (receivedBlockList) { synchronized (delHints) { - receivedBlockList.add(block); + // TODO:FEDERATION receivedBlockList should be per block pool + // TODO:FEDERATION use ExtendedBlock + receivedBlockList.add(block.getLocalBlock()); delHints.add(delHint); receivedBlockList.notifyAll(); } @@ -1413,7 +1411,7 @@ public class DataNode extends Configured * @param block * @param delHint */ - void closeBlock(Block block, String delHint) { + void closeBlock(ExtendedBlock block, String delHint) { myMetrics.blocksWritten.inc(); notifyNamenodeReceivedBlock(block, delHint); if (blockScanner != null) { @@ -1736,7 +1734,7 @@ public class DataNode extends Configured public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newLength) throws IOException { - ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock.getLocalBlock(), + ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength); return new ExtendedBlock(oldBlock.getPoolId(), r); } @@ -1953,7 +1951,7 @@ public class DataNode extends Configured } } - return data.getReplicaVisibleLength(block.getLocalBlock()); + return data.getReplicaVisibleLength(block); } // Determine a Datanode's streaming address Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Feb 22 20:31:14 2011 @@ -274,7 +274,7 @@ class DataXceiver extends DataTransferPr client, srcDataNode, datanode); } else { // TODO:FEDERATION use ExtendedBlock - datanode.data.recoverClose(block.getLocalBlock(), newGs, minBytesRcvd); + datanode.data.recoverClose(block, newGs, minBytesRcvd); } // @@ -378,7 +378,7 @@ class DataXceiver extends DataTransferPr if (client.length() == 0 || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // TODO:FEDERATION use ExtendedBlock - datanode.closeBlock(block.getLocalBlock(), DataNode.EMPTY_DEL_HINT); + datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); LOG.info("Received block " + block + " src: " + remoteAddress + " dest: " + localAddress + @@ -431,9 +431,8 @@ class DataXceiver extends DataTransferPr } } - // TODO:FEDERATION use ExtendedBlock final MetaDataInputStream metadataIn = - datanode.data.getMetaDataInputStream(block.getLocalBlock()); + datanode.data.getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( metadataIn, BUFFER_SIZE)); @@ -620,7 +619,7 @@ class DataXceiver extends DataTransferPr // notify name node // TODO:FEDERATION use ExtendedBlock - datanode.notifyNamenodeReceivedBlock(block.getLocalBlock(), sourceID); + datanode.notifyNamenodeReceivedBlock(block, sourceID); LOG.info("Moved block " + block + " from " + s.getRemoteSocketAddress()); Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Feb 22 20:31:14 2011 @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -767,7 +768,9 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized Block getStoredBlock(long blkid) throws IOException { + public synchronized Block getStoredBlock(String bpid, long blkid) + throws IOException { + // TODO:FEDERATION use extended block File blockfile = findBlockFile(blkid); if (blockfile == null) { return null; @@ -803,20 +806,21 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public boolean metaFileExists(Block b) throws IOException { - return getMetaFile(b).exists(); + public boolean metaFileExists(ExtendedBlock b) throws IOException { + // TODO:FEDERATION use ExtendedBlock + return getMetaFile(b.getLocalBlock()).exists(); } @Override // FSDatasetInterface - public long getMetaDataLength(Block b) throws IOException { - File checksumFile = getMetaFile( b ); + public long getMetaDataLength(ExtendedBlock b) throws IOException { + File checksumFile = getMetaFile(b.getLocalBlock()); return checksumFile.length(); } @Override // FSDatasetInterface - public MetaDataInputStream getMetaDataInputStream(Block b) + public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { - File checksumFile = getMetaFile( b ); + File checksumFile = getMetaFile(b.getLocalBlock()); return new MetaDataInputStream(new FileInputStream(checksumFile), checksumFile.length()); } @@ -924,8 +928,8 @@ public class FSDataset implements FSCons * Find the block's on-disk length */ @Override // FSDatasetInterface - public long getLength(Block b) throws IOException { - return getBlockFile(b).length(); + public long getLength(ExtendedBlock b) throws IOException { + return getBlockFile(b.getLocalBlock()).length(); } /** @@ -943,14 +947,15 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized InputStream getBlockInputStream(Block b) throws IOException { - return new FileInputStream(getBlockFile(b)); + public synchronized InputStream getBlockInputStream(ExtendedBlock b) + throws IOException { + return new FileInputStream(getBlockFile(b.getLocalBlock())); } @Override // FSDatasetInterface - public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException { - - File blockFile = getBlockFile(b); + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + File blockFile = getBlockFile(b.getLocalBlock()); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (seekOffset > 0) { blockInFile.seek(seekOffset); @@ -977,10 +982,9 @@ public class FSDataset implements FSCons * Returns handles to the block file and its metadata file */ @Override // FSDatasetInterface - public synchronized BlockInputStreams getTmpInputStreams(Block b, + public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { - - ReplicaInfo info = getReplicaInfo(b); + ReplicaInfo info = getReplicaInfo(b.getLocalBlock()); File blockFile = info.getBlockFile(); RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); if (blkOffset > 0) { @@ -1080,7 +1084,7 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized ReplicaInPipelineInterface append(Block b, + public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for @@ -1093,7 +1097,8 @@ public class FSDataset implements FSCons throw new IOException("The new generation stamp " + newGS + " should be greater than the replica " + b + "'s generation stamp"); } - ReplicaInfo replicaInfo = volumeMap.get(b); + // TODO:FEDERATION use ExtendedBlock + ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock()); if (replicaInfo == null) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); @@ -1225,11 +1230,13 @@ public class FSDataset implements FSCons return replicaInfo; } @Override // FSDatasetInterface - public synchronized ReplicaInPipelineInterface recoverAppend(Block b, + public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed append to " + b); - ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); + // TODO:FEDERATION use ExtendedBlock + ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS, + expectedBlockLen); // change the replica's state/gs etc. if (replicaInfo.getState() == ReplicaState.FINALIZED ) { @@ -1241,16 +1248,17 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public void recoverClose(Block b, long newGS, + public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed close " + b); // check replica's state - ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); + ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS, + expectedBlockLen); // bump the replica's GS bumpReplicaGS(replicaInfo, newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeBlock(replicaInfo); + finalizeReplica(replicaInfo); } } @@ -1282,7 +1290,7 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized ReplicaInPipelineInterface createRbw(Block b) + public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); if (replicaInfo != null) { @@ -1293,7 +1301,7 @@ public class FSDataset implements FSCons // create a new block FSVolume v = volumes.getNextVolume(b.getNumBytes()); // create a rbw file to hold block in the designated volume - File f = v.createRbwFile(b); + File f = v.createRbwFile(b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(newReplicaInfo); @@ -1301,7 +1309,7 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized ReplicaInPipelineInterface recoverRbw(Block b, + public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { DataNode.LOG.info("Recover the RBW replica " + b); @@ -1350,7 +1358,7 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized ReplicaInPipelineInterface createTemporary(Block b) + public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); if (replicaInfo != null) { @@ -1361,7 +1369,7 @@ public class FSDataset implements FSCons FSVolume v = volumes.getNextVolume(b.getNumBytes()); // create a temporary file to hold block in the designated volume - File f = v.createTmpFile(b); + File f = v.createTmpFile(b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(newReplicaInfo); @@ -1374,7 +1382,7 @@ public class FSDataset implements FSCons * last checksum will be overwritten. */ @Override // FSDatasetInterface - public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams, + public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams, int checksumSize) throws IOException { FileOutputStream file = (FileOutputStream) streams.checksumOut; FileChannel channel = file.getChannel(); @@ -1407,8 +1415,8 @@ public class FSDataset implements FSCons * Complete the block write! */ @Override // FSDatasetInterface - public synchronized void finalizeBlock(Block b) throws IOException { - ReplicaInfo replicaInfo = getReplicaInfo(b); + public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { + ReplicaInfo replicaInfo = getReplicaInfo(b.getLocalBlock()); if (replicaInfo.getState() == ReplicaState.FINALIZED) { // this is legal, when recovery happens on a file that has // been opened for append but never modified @@ -1444,15 +1452,15 @@ public class FSDataset implements FSCons * Remove the temporary block file (if any) */ @Override // FSDatasetInterface - public synchronized void unfinalizeBlock(Block b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b); + public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { + ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock()); if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap - volumeMap.remove(b); + volumeMap.remove(b.getLocalBlock()); // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), - replicaInfo.getMetaFile(), b)) { + replicaInfo.getMetaFile(), b.getLocalBlock())) { DataNode.LOG.warn("Block " + b + " unfinalized and removed. " ); } } @@ -1550,8 +1558,8 @@ public class FSDataset implements FSCons * valid means finalized */ @Override // FSDatasetInterface - public boolean isValidBlock(Block b) { - ReplicaInfo replicaInfo = volumeMap.get(b); + public boolean isValidBlock(ExtendedBlock b) { + ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock()); if (replicaInfo == null || replicaInfo.getState() != ReplicaState.FINALIZED) { return false; @@ -1609,7 +1617,7 @@ public class FSDataset implements FSCons * just get rid of it. */ @Override // FSDatasetInterface - public void invalidate(Block invalidBlks[]) throws IOException { + public void invalidate(String bpid, Block invalidBlks[]) throws IOException { boolean error = false; for (int i = 0; i < invalidBlks.length; i++) { File f = null; @@ -1850,7 +1858,8 @@ public class FSDataset implements FSCons // Remove the block from volumeMap volumeMap.remove(blockId); if (datanode.blockScanner != null) { - datanode.blockScanner.deleteBlock(new Block(blockId)); + // TODO:FEDERATION pass the right bpid + datanode.blockScanner.deleteBlock("TODO", new Block(blockId)); } DataNode.LOG.warn("Removed block " + blockId + " from memory with missing block file on the disk"); @@ -1872,7 +1881,7 @@ public class FSDataset implements FSCons diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(diskBlockInfo); if (datanode.blockScanner != null) { - datanode.blockScanner.addBlock(diskBlockInfo); + datanode.blockScanner.addBlock(new ExtendedBlock(diskBlockInfo)); } DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo); return; @@ -2041,7 +2050,7 @@ public class FSDataset implements FSCons @Override // FSDatasetInterface public synchronized ReplicaInfo updateReplicaUnderRecovery( - final Block oldBlock, + final ExtendedBlock oldBlock, final long recoveryId, final long newlength) throws IOException { //get replica @@ -2112,7 +2121,7 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface - public synchronized long getReplicaVisibleLength(final Block block) + public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { final Replica replica = volumeMap.get(block.getBlockId()); if (replica == null) { Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Feb 22 20:31:14 2011 @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -50,7 +51,7 @@ public interface FSDatasetInterface exte * @return the length of the metadata file for the specified block. * @throws IOException */ - public long getMetaDataLength(Block b) throws IOException; + public long getMetaDataLength(ExtendedBlock b) throws IOException; /** * This class provides the input stream and length of the metadata @@ -75,7 +76,7 @@ public interface FSDatasetInterface exte * @return the metadata input stream; * @throws IOException */ - public MetaDataInputStream getMetaDataInputStream(Block b) + public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException; /** @@ -84,7 +85,7 @@ public interface FSDatasetInterface exte * @return true of the metafile for specified block exits * @throws IOException */ - public boolean metaFileExists(Block b) throws IOException; + public boolean metaFileExists(ExtendedBlock b) throws IOException; /** @@ -93,7 +94,7 @@ public interface FSDatasetInterface exte * @return the specified block's on-disk length (excluding metadta) * @throws IOException */ - public long getLength(Block b) throws IOException; + public long getLength(ExtendedBlock b) throws IOException; /** * Get reference to the replica meta info in the replicasMap. @@ -107,7 +108,8 @@ public interface FSDatasetInterface exte /** * @return the generation stamp stored with the block. */ - public Block getStoredBlock(long blkid) throws IOException; + public Block getStoredBlock(String poolId, long blkid) + throws IOException; /** * Returns an input stream to read the contents of the specified block @@ -115,7 +117,7 @@ public interface FSDatasetInterface exte * @return an input stream to read the contents of the specified block * @throws IOException */ - public InputStream getBlockInputStream(Block b) throws IOException; + public InputStream getBlockInputStream(ExtendedBlock b) throws IOException; /** * Returns an input stream at specified offset of the specified block @@ -125,7 +127,7 @@ public interface FSDatasetInterface exte * starting at the offset * @throws IOException */ - public InputStream getBlockInputStream(Block b, long seekOffset) + public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException; /** @@ -138,8 +140,8 @@ public interface FSDatasetInterface exte * starting at the offset * @throws IOException */ - public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff) - throws IOException; + public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + long ckoff) throws IOException; /** * @@ -188,7 +190,7 @@ public interface FSDatasetInterface exte * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(Block b) + public ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException; /** @@ -198,7 +200,7 @@ public interface FSDatasetInterface exte * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(Block b) throws IOException; + public ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica @@ -210,7 +212,7 @@ public interface FSDatasetInterface exte * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface recoverRbw(Block b, + public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException; @@ -223,7 +225,7 @@ public interface FSDatasetInterface exte * @return the meata info of the replica which is being written to * @throws IOException */ - public ReplicaInPipelineInterface append(Block b, + public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException; /** @@ -236,7 +238,7 @@ public interface FSDatasetInterface exte * @return the meta info of the replica which is being written to * @throws IOException */ - public ReplicaInPipelineInterface recoverAppend(Block b, + public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException; /** @@ -248,7 +250,7 @@ public interface FSDatasetInterface exte * @param expectedBlockLen the number of bytes the replica is expected to have * @throws IOException */ - public void recoverClose(Block b, + public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException; /** @@ -258,7 +260,7 @@ public interface FSDatasetInterface exte * @param b * @throws IOException */ - public void finalizeBlock(Block b) throws IOException; + public void finalizeBlock(ExtendedBlock b) throws IOException; /** * Unfinalizes the block previously opened for writing using writeToBlock. @@ -266,7 +268,7 @@ public interface FSDatasetInterface exte * @param b * @throws IOException */ - public void unfinalizeBlock(Block b) throws IOException; + public void unfinalizeBlock(ExtendedBlock b) throws IOException; /** * Returns the block report - the full list of blocks stored @@ -279,14 +281,14 @@ public interface FSDatasetInterface exte * @param b * @return - true if the specified block is valid */ - public boolean isValidBlock(Block b); + public boolean isValidBlock(ExtendedBlock b); /** * Invalidates the specified blocks * @param invalidBlks - the blocks to be invalidated * @throws IOException */ - public void invalidate(Block invalidBlks[]) throws IOException; + public void invalidate(String poolId, Block invalidBlks[]) throws IOException; /** * Check if all the data directories are healthy @@ -312,7 +314,7 @@ public interface FSDatasetInterface exte * @param checksumSize number of bytes each checksum has * @throws IOException */ - public void adjustCrcChannelPosition(Block b, BlockWriteStreams stream, + public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream, int checksumSize) throws IOException; /** @@ -324,7 +326,7 @@ public interface FSDatasetInterface exte /** * Get visible length of the specified replica. */ - long getReplicaVisibleLength(final Block block) throws IOException; + long getReplicaVisibleLength(final ExtendedBlock block) throws IOException; /** * Initialize a replica recovery. @@ -339,7 +341,7 @@ public interface FSDatasetInterface exte * Update replica's generation stamp and length and finalize it. */ public ReplicaInfo updateReplicaUnderRecovery( - Block oldBlock, + ExtendedBlock oldBlock, long recoveryId, long newLength) throws IOException; } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Tue Feb 22 20:31:14 2011 @@ -350,10 +350,8 @@ public class DatanodeDescriptor extends } } - BlockCommand getReplicationCommand(int maxTransfers) { - List blocktargetlist = replicateBlocks.poll(maxTransfers); - return blocktargetlist == null? null: - new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist); + List getReplicationCommand(int maxTransfers) { + return replicateBlocks.poll(maxTransfers); } BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) { @@ -371,10 +369,8 @@ public class DatanodeDescriptor extends /** * Remove the specified number of blocks to be invalidated */ - BlockCommand getInvalidateBlocks(int maxblocks) { - Block[] deleteList = getBlockArray(invalidateBlocks, maxblocks); - return deleteList == null? - null: new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList); + Block[] getInvalidateBlocks(int maxblocks) { + return getBlockArray(invalidateBlocks, maxblocks); } static private Block[] getBlockArray(Collection blocks, int max) { Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Feb 22 20:31:14 2011 @@ -52,10 +52,13 @@ import org.apache.hadoop.net.NetworkTopo import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; @@ -187,10 +190,10 @@ public class FSNamesystem implements FSC // Stores the correct file name hierarchy // public FSDirectory dir; - - // TODO:FEDERATION initialize from the persisted information - String poolId = "TODO"; BlockManager blockManager; + + // Block pool ID used by this namenode + String blockPoolId; /** * Stores the datanode -> block map. @@ -1525,11 +1528,11 @@ public class FSNamesystem implements FSC } ExtendedBlock getExtendedBlock(Block blk) { - return new ExtendedBlock(poolId, blk); + return new ExtendedBlock(blockPoolId, blk); } - + void setBlockPoolId(String bpid) { - poolId = bpid; + blockPoolId = bpid; } /** @@ -1773,7 +1776,7 @@ public class FSNamesystem implements FSC b.setGenerationStamp(getGenerationStamp()); b = dir.addBlock(src, inodes, b, targets); NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " - +src+ ". "+b); + +src+ ". " + blockPoolId + " "+ b); return b; } @@ -2734,14 +2737,17 @@ public class FSNamesystem implements FSC ArrayList cmds = new ArrayList(3); //check pending replication - cmd = nodeinfo.getReplicationCommand( + List pendingList = nodeinfo.getReplicationCommand( blockManager.maxReplicationStreams - xmitsInProgress); - if (cmd != null) { + if (pendingList != null) { + cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, + pendingList); cmds.add(cmd); } //check block invalidation - cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); - if (cmd != null) { + Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); + if (blks != null) { + cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks); cmds.add(cmd); } // check access key update @@ -3210,17 +3216,10 @@ public class FSNamesystem implements FSC } } - private void checkPoolId(String thatPoolId) throws IOException { - if (!this.poolId.equals(thatPoolId)) { - throw new IOException("PoolId " + thatPoolId - + " does not belong to expected pool " + poolId); - } - } - private void checkBlock(ExtendedBlock block) throws IOException { - if (block != null && !this.poolId.equals(block.getPoolId())) { - throw new IOException("Block " + block - + " does not belong to expected pool " + poolId); + if (block != null && !this.blockPoolId.equals(block.getPoolId())) { + throw new IOException("Unexpected BlockPoolId " + block.getPoolId() + + " - expected " + blockPoolId); } } @@ -5234,6 +5233,6 @@ public class FSNamesystem implements FSC } public String getPoolId() { - return poolId; + return blockPoolId; } } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Feb 22 20:31:14 2011 @@ -39,6 +39,7 @@ import org.apache.hadoop.io.*; @InterfaceAudience.Private @InterfaceStability.Evolving public class BlockCommand extends DatanodeCommand { + String poolId; Block blocks[]; DatanodeInfo targets[][]; @@ -48,9 +49,11 @@ public class BlockCommand extends Datano * Create BlockCommand for transferring blocks to another datanode * @param blocktargetlist blocks to be transferred */ - public BlockCommand(int action, List blocktargetlist) { + public BlockCommand(int action, String poolId, + List blocktargetlist) { super(action); + this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; for(int i = 0; i < blocks.length; i++) { @@ -66,12 +69,17 @@ public class BlockCommand extends Datano * Create BlockCommand for the given action * @param blocks blocks related to the action */ - public BlockCommand(int action, Block blocks[]) { + public BlockCommand(int action, String poolId, Block blocks[]) { super(action); + this.poolId = poolId; this.blocks = blocks; this.targets = EMPTY_TARGET; } + public String getPoolId() { + return poolId; + } + public Block[] getBlocks() { return blocks; } @@ -93,6 +101,7 @@ public class BlockCommand extends Datano public void write(DataOutput out) throws IOException { super.write(out); + Text.writeString(out, poolId); out.writeInt(blocks.length); for (int i = 0; i < blocks.length; i++) { blocks[i].write(out); @@ -108,6 +117,7 @@ public class BlockCommand extends Datano public void readFields(DataInput in) throws IOException { super.readFields(in); + this.poolId = Text.readString(in); this.blocks = new Block[in.readInt()]; for (int i = 0; i < blocks.length; i++) { blocks[i] = new Block(); Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Feb 22 20:31:14 2011 @@ -1134,8 +1134,9 @@ public class MiniDFSCluster { if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } + String bpid = getNamesystem().getPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; - sdataset.injectBlocks(blocksToInject); + sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0); } @@ -1148,7 +1149,8 @@ public class MiniDFSCluster { * if any of blocks already exist in the data nodes * Note the rest of the blocks are not injected. */ - public void injectBlocks(Iterable[] blocksToInject) throws IOException { + public void injectBlocks(Iterable[] blocksToInject) + throws IOException { if (blocksToInject.length > dataNodes.size()) { throw new IndexOutOfBoundsException(); } Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Tue Feb 22 20:31:14 2011 @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.shell.Count; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.FSDataset; import org.apache.hadoop.io.IOUtils; @@ -1108,10 +1109,13 @@ public class TestDFSShell extends TestCa List files = new ArrayList(); List datanodes = cluster.getDataNodes(); Iterable[] blocks = cluster.getAllBlockReports(); + ExtendedBlock blk = new ExtendedBlock(); + String poolId = cluster.getNamesystem().getPoolId(); for(int i = 0; i < blocks.length; i++) { FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset(); for(Block b : blocks[i]) { - files.add(ds.getBlockFile(b)); + blk.set(poolId, b); + files.add(ds.getBlockFile(blk.getLocalBlock())); } } return files; Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Feb 22 20:31:14 2011 @@ -268,7 +268,8 @@ public class TestFileAppend3 extends jun } for(DatanodeInfo datanodeinfo : lb.getLocations()) { final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort()); - final Block metainfo = dn.data.getStoredBlock(blk.getBlockId()); + final Block metainfo = dn.data.getStoredBlock(blk.getPoolId(), + blk.getBlockId()); assertEquals(size, metainfo.getNumBytes()); } } Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Feb 22 20:31:14 2011 @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -765,7 +766,8 @@ public class TestFileCreation extends ju for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) { DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort); FSDataset dataset = (FSDataset)datanode.data; - Block b = dataset.getStoredBlock(locatedblock.getBlock().getBlockId()); + ExtendedBlock blk = locatedblock.getBlock(); + Block b = dataset.getStoredBlock(blk.getPoolId(), blk.getBlockId()); File blockfile = dataset.findBlockFile(b.getBlockId()); System.out.println("blockfile=" + blockfile); if (blockfile != null) { Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Tue Feb 22 20:31:14 2011 @@ -116,8 +116,8 @@ public class TestLeaseRecovery extends j dfs.dfs.getNamenode(), filestr).getBlock(); long currentGS = lastblock.getGenerationStamp(); for(int i = 0; i < REPLICATION_NUM; i++) { - updatedmetainfo[i] = - datanodes[i].data.getStoredBlock(lastblock.getBlockId()); + updatedmetainfo[i] = datanodes[i].data.getStoredBlock(lastblock + .getPoolId(), lastblock.getBlockId()); assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId()); assertEquals(oldSize, updatedmetainfo[i].getNumBytes()); assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp()); Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Feb 22 20:31:14 2011 @@ -323,8 +323,9 @@ public class SimulatedFSDataset impleme blockMap = new HashMap(); } - public synchronized void injectBlocks(Iterable injectBlocks) - throws IOException { + public synchronized void injectBlocks(String poolId, + Iterable injectBlocks) throws IOException { + ExtendedBlock blk = new ExtendedBlock(); if (injectBlocks != null) { int numInjectedBlocks = 0; for (Block b: injectBlocks) { // if any blocks in list is bad, reject list @@ -332,7 +333,8 @@ public class SimulatedFSDataset impleme if (b == null) { throw new NullPointerException("Null blocks in block list"); } - if (isValidBlock(b)) { + blk.set(poolId, b); + if (isValidBlock(blk)) { throw new IOException("Block already exists in block list"); } } @@ -347,9 +349,10 @@ public class SimulatedFSDataset impleme } } - @Override - public synchronized void finalizeBlock(Block b) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } @@ -357,10 +360,10 @@ public class SimulatedFSDataset impleme } - @Override - public synchronized void unfinalizeBlock(Block b) throws IOException { + @Override // FSDatasetInterface + public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { if (isBeingWritten(b)) { - blockMap.remove(b); + blockMap.remove(b.getLocalBlock()); } } @@ -392,9 +395,9 @@ public class SimulatedFSDataset impleme return storage.getFree(); } - @Override - public synchronized long getLength(Block b) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized long getLength(ExtendedBlock b) throws IOException { + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } @@ -407,20 +410,22 @@ public class SimulatedFSDataset impleme return blockMap.get(new Block(blockId)); } - @Override - public Block getStoredBlock(long blkid) throws IOException { - Block b = new Block(blkid); - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public Block getStoredBlock(String poolId, long blkid) throws IOException { + ExtendedBlock b = new ExtendedBlock(poolId, blkid); + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { return null; } b.setGenerationStamp(binfo.getGenerationStamp()); b.setNumBytes(binfo.getNumBytes()); - return b; + return b.getLocalBlock(); } - @Override - public synchronized void invalidate(Block[] invalidBlks) throws IOException { + @Override // FSDatasetInterface + public synchronized void invalidate(String poolId, Block[] invalidBlks) + throws IOException { boolean error = false; if (invalidBlks == null) { return; @@ -443,10 +448,11 @@ public class SimulatedFSDataset impleme } } - @Override - public synchronized boolean isValidBlock(Block b) { + @Override // FSDatasetInterface + public synchronized boolean isValidBlock(ExtendedBlock b) { // return (blockMap.containsKey(b)); - BInfo binfo = blockMap.get(b); + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { return false; } @@ -454,8 +460,9 @@ public class SimulatedFSDataset impleme } /* check if a block is created but not finalized */ - private synchronized boolean isBeingWritten(Block b) { - BInfo binfo = blockMap.get(b); + private synchronized boolean isBeingWritten(ExtendedBlock b) { + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { return false; } @@ -466,10 +473,10 @@ public class SimulatedFSDataset impleme return getStorageInfo(); } - @Override - public synchronized ReplicaInPipelineInterface append(Block b, + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - BInfo binfo = blockMap.get(b); + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null || !binfo.isFinalized()) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); @@ -478,10 +485,10 @@ public class SimulatedFSDataset impleme return binfo; } - @Override - public synchronized ReplicaInPipelineInterface recoverAppend(Block b, + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - BInfo binfo = blockMap.get(b); + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); @@ -495,10 +502,10 @@ public class SimulatedFSDataset impleme return binfo; } - @Override - public void recoverClose(Block b, long newGS, - long expectedBlockLen) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + throws IOException { + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); @@ -506,15 +513,15 @@ public class SimulatedFSDataset impleme if (!binfo.isFinalized()) { binfo.finalizeBlock(binfo.getNumBytes()); } - blockMap.remove(b); + blockMap.remove(b.getLocalBlock()); binfo.theBlock.setGenerationStamp(newGS); blockMap.put(binfo.theBlock, binfo); } - @Override - public synchronized ReplicaInPipelineInterface recoverRbw(Block b, + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - BInfo binfo = blockMap.get(b); + BInfo binfo = blockMap.get(b.getLocalBlock()); if ( binfo == null) { throw new ReplicaNotFoundException("Block " + b + " does not exist, and cannot be appended to."); @@ -529,14 +536,14 @@ public class SimulatedFSDataset impleme return binfo; } - @Override - public synchronized ReplicaInPipelineInterface createRbw(Block b) + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { return createTemporary(b); } - @Override - public synchronized ReplicaInPipelineInterface createTemporary(Block b) + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -546,15 +553,17 @@ public class SimulatedFSDataset impleme throw new ReplicaAlreadyExistsException("Block " + b + " is being written, and cannot be written to."); } - BInfo binfo = new BInfo(b, true); + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = new BInfo(b.getLocalBlock(), true); blockMap.put(binfo.theBlock, binfo); return binfo; } - @Override - public synchronized InputStream getBlockInputStream(Block b) - throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized InputStream getBlockInputStream(ExtendedBlock b) + throws IOException { + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -563,18 +572,18 @@ public class SimulatedFSDataset impleme return binfo.getIStream(); } - @Override - public synchronized InputStream getBlockInputStream(Block b, long seekOffset) - throws IOException { + @Override // FSDatasetInterface + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { InputStream result = getBlockInputStream(b); result.skip(seekOffset); return result; } /** Not supported */ - @Override - public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff - ) throws IOException { + @Override // FSDatasetInterface + public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + long ckoff) throws IOException { throw new IOException("Not supported"); } @@ -585,9 +594,10 @@ public class SimulatedFSDataset impleme * @throws IOException - block does not exist or problems accessing * the meta file */ - private synchronized InputStream getMetaDataInStream(Block b) + private synchronized InputStream getMetaDataInStream(ExtendedBlock b) throws IOException { - BInfo binfo = blockMap.get(b); + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -598,9 +608,11 @@ public class SimulatedFSDataset impleme return binfo.getMetaIStream(); } - @Override - public synchronized long getMetaDataLength(Block b) throws IOException { - BInfo binfo = blockMap.get(b); + @Override // FSDatasetInterface + public synchronized long getMetaDataLength(ExtendedBlock b) + throws IOException { + // TODO:FEDERATION use ExtendedBlock + BInfo binfo = blockMap.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -611,16 +623,15 @@ public class SimulatedFSDataset impleme return binfo.getMetaIStream().getLength(); } - @Override - public MetaDataInputStream getMetaDataInputStream(Block b) - throws IOException { - - return new MetaDataInputStream(getMetaDataInStream(b), - getMetaDataLength(b)); + @Override // FSDatasetInterface + public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) + throws IOException { + return new MetaDataInputStream(getMetaDataInStream(b), + getMetaDataLength(b)); } - @Override - public synchronized boolean metaFileExists(Block b) throws IOException { + @Override // FSDatasetInterface + public synchronized boolean metaFileExists(ExtendedBlock b) throws IOException { if (!isValidBlock(b)) { throw new IOException("Block " + b + " is valid, and cannot be written to."); @@ -632,8 +643,8 @@ public class SimulatedFSDataset impleme // nothing to check for simulated data set } - @Override - public synchronized void adjustCrcChannelPosition(Block b, + @Override // FSDatasetInterface + public synchronized void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream, int checksumSize) throws IOException { @@ -818,15 +829,15 @@ public class SimulatedFSDataset impleme } @Override // FSDatasetInterface - public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock, + public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newlength) throws IOException { return new FinalizedReplica( oldBlock.getBlockId(), newlength, recoveryId, null, null); } - @Override - public long getReplicaVisibleLength(Block block) throws IOException { + @Override // FSDatasetInterface + public long getReplicaVisibleLength(ExtendedBlock block) throws IOException { return block.getNumBytes(); } } Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Tue Feb 22 20:31:14 2011 @@ -68,6 +68,7 @@ public class TestBlockReport { static final int BLOCK_SIZE = 1024; static final int NUM_BLOCKS = 10; static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1; + static String bpid; private MiniDFSCluster cluster; private DistributedFileSystem fs; @@ -86,6 +87,7 @@ public class TestBlockReport { REPL_FACTOR = 1; //Reset if case a test has modified the value cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build(); fs = (DistributedFileSystem) cluster.getFileSystem(); + bpid = cluster.getNamesystem().getPoolId(); } @After @@ -173,7 +175,7 @@ public class TestBlockReport { File dataDir = new File(cluster.getDataDirectory()); assertTrue(dataDir.isDirectory()); - List blocks2Remove = new ArrayList(); + List blocks2Remove = new ArrayList(); List removedIndex = new ArrayList(); List lBlocks = cluster.getNameNode().getBlockLocations( filePath.toString(), FILE_START, @@ -186,7 +188,7 @@ public class TestBlockReport { } for (Integer aRemovedIndex : removedIndex) { - blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock().getLocalBlock()); + blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock()); } ArrayList blocks = locatedToBlocks(lBlocks, removedIndex); @@ -194,7 +196,7 @@ public class TestBlockReport { LOG.debug("Number of blocks allocated " + lBlocks.size()); } - for (Block b : blocks2Remove) { + for (ExtendedBlock b : blocks2Remove) { if(LOG.isDebugEnabled()) { LOG.debug("Removing the block " + b.getBlockName()); } @@ -694,7 +696,8 @@ public class TestBlockReport { // Get block from the first DN ret = cluster.getDataNodes().get(DN_N0). - data.getStoredBlock(lb.getBlock().getBlockId()); + data.getStoredBlock(lb.getBlock() + .getPoolId(), lb.getBlock().getBlockId()); return ret; } Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java Tue Feb 22 20:31:14 2011 @@ -121,7 +121,8 @@ public class TestDatanodeRestart { } else { Assert.assertEquals(fileLen, replica.getNumBytes()); } - dn.data.invalidate(new Block[]{replica}); + String bpid = cluster.getNamesystem().getPoolId(); + dn.data.invalidate(bpid, new Block[]{replica}); } finally { IOUtils.closeStream(out); if (fs.exists(src)) { Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Tue Feb 22 20:31:14 2011 @@ -48,7 +48,7 @@ import org.junit.Test; */ public class TestInterDatanodeProtocol { public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException { - Block metainfo = dn.data.getStoredBlock(b.getBlockId()); + Block metainfo = dn.data.getStoredBlock(b.getPoolId(), b.getBlockId()); Assert.assertEquals(b.getBlockId(), metainfo.getBlockId()); Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes()); } @@ -209,7 +209,10 @@ public class TestInterDatanodeProtocol { } } - /** Test {@link FSDataset#updateReplicaUnderRecovery(Block, long, long)} */ + /** + * Test for + * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)} + * */ @Test public void testUpdateReplicaUnderRecovery() throws IOException { final Configuration conf = new HdfsConfiguration(); @@ -255,8 +258,8 @@ public class TestInterDatanodeProtocol { //with (block length) != (stored replica's on disk length). { //create a block with same id and gs but different length. - final Block tmp = new Block(rri.getBlockId(), rri.getNumBytes() - 1, - rri.getGenerationStamp()); + final ExtendedBlock tmp = new ExtendedBlock(b.getPoolId(), rri + .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp()); try { //update should fail fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength); @@ -268,7 +271,7 @@ public class TestInterDatanodeProtocol { //update final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery( - rri, recoveryid, newlength); + new ExtendedBlock(b.getPoolId(), rri), recoveryid, newlength); //check meta data after update FSDataset.checkReplicaFiles(finalized); Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1073489&r1=1073488&r2=1073489&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Tue Feb 22 20:31:14 2011 @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; @@ -38,11 +39,10 @@ import org.apache.hadoop.util.DataChecks */ public class TestSimulatedFSDataset extends TestCase { - Configuration conf = null; - - + // TODO:FEDERATION initialize this + static String bpid; static final int NUMBLOCKS = 20; static final int BLOCK_LENGTH_MULTIPLIER = 79; @@ -61,10 +61,13 @@ public class TestSimulatedFSDataset exte return blkid*BLOCK_LENGTH_MULTIPLIER; } - int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) throws IOException { + int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) + throws IOException { int bytesAdded = 0; for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) { - Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written + ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0); + // we pass expected len as zero, - fsdataset should use the sizeof actual + // data written ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b); BlockWriteStreams out = bInfo.createStreams(true, 512, 4); try { @@ -90,7 +93,7 @@ public class TestSimulatedFSDataset exte public void testGetMetaData() throws IOException { FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); - Block b = new Block(1, 5, 0); + ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); try { assertFalse(fsdataset.metaFileExists(b)); assertTrue("Expected an IO exception", false); @@ -98,7 +101,7 @@ public class TestSimulatedFSDataset exte // ok - as expected } addSomeBlocks(fsdataset); // Only need to add one but .... - b = new Block(1, 0, 0); + b = new ExtendedBlock(bpid, 1, 0, 0); InputStream metaInput = fsdataset.getMetaDataInputStream(b); DataInputStream metaDataInput = new DataInputStream(metaInput); short version = metaDataInput.readShort(); @@ -122,7 +125,7 @@ public class TestSimulatedFSDataset exte void checkBlockDataAndSize(FSDatasetInterface fsdataset, - Block b, long expectedLen) throws IOException { + ExtendedBlock b, long expectedLen) throws IOException { InputStream input = fsdataset.getBlockInputStream(b); long lengthRead = 0; int data; @@ -137,7 +140,7 @@ public class TestSimulatedFSDataset exte FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); addSomeBlocks(fsdataset); for (int i=1; i <= NUMBLOCKS; ++i) { - Block b = new Block(i, 0, 0); + ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0); assertTrue(fsdataset.isValidBlock(b)); assertEquals(blockIdToLen(i), fsdataset.getLength(b)); checkBlockDataAndSize(fsdataset, b, blockIdToLen(i)); @@ -175,13 +178,14 @@ public class TestSimulatedFSDataset exte SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf); - sfsdataset.injectBlocks(blockReport); + sfsdataset.injectBlocks(bpid, blockReport); blockReport = sfsdataset.getBlockReport(); assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); for (Block b: blockReport) { assertNotNull(b); assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - assertEquals(blockIdToLen(b.getBlockId()), sfsdataset.getLength(b)); + assertEquals(blockIdToLen(b.getBlockId()), sfsdataset + .getLength(new ExtendedBlock(bpid, b))); } assertEquals(bytesAdded, sfsdataset.getDfsUsed()); assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining()); @@ -213,13 +217,14 @@ public class TestSimulatedFSDataset exte assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); blockReport2 = sfsdataset.getBlockReport(); assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - sfsdataset.injectBlocks(blockReport); + sfsdataset.injectBlocks(bpid, blockReport); blockReport = sfsdataset.getBlockReport(); assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks()); for (Block b: blockReport) { assertNotNull(b); assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - assertEquals(blockIdToLen(b.getBlockId()), sfsdataset.getLength(b)); + assertEquals(blockIdToLen(b.getBlockId()), sfsdataset + .getLength(new ExtendedBlock(bpid, b))); } assertEquals(bytesAdded, sfsdataset.getDfsUsed()); assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining()); @@ -231,7 +236,7 @@ public class TestSimulatedFSDataset exte try { sfsdataset = new SimulatedFSDataset(conf); - sfsdataset.injectBlocks(blockReport); + sfsdataset.injectBlocks(bpid, blockReport); assertTrue("Expected an IO exception", false); } catch (IOException e) { // ok - as expected @@ -239,7 +244,7 @@ public class TestSimulatedFSDataset exte } - public void checkInvalidBlock(Block b) throws IOException { + public void checkInvalidBlock(ExtendedBlock b) throws IOException { FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); assertFalse(fsdataset.isValidBlock(b)); try { @@ -267,12 +272,12 @@ public class TestSimulatedFSDataset exte public void testInValidBlocks() throws IOException { FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); - Block b = new Block(1, 5, 0); + ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); checkInvalidBlock(b); // Now check invlaid after adding some blocks addSomeBlocks(fsdataset); - b = new Block(NUMBLOCKS + 99, 5, 0); + b = new ExtendedBlock(bpid, NUMBLOCKS + 99, 5, 0); checkInvalidBlock(b); } @@ -283,9 +288,9 @@ public class TestSimulatedFSDataset exte Block[] deleteBlocks = new Block[2]; deleteBlocks[0] = new Block(1, 0, 0); deleteBlocks[1] = new Block(2, 0, 0); - fsdataset.invalidate(deleteBlocks); - checkInvalidBlock(deleteBlocks[0]); - checkInvalidBlock(deleteBlocks[1]); + fsdataset.invalidate(bpid, deleteBlocks); + checkInvalidBlock(new ExtendedBlock(bpid, deleteBlocks[0])); + checkInvalidBlock(new ExtendedBlock(deleteBlocks[1])); long sizeDeleted = blockIdToLen(1) + blockIdToLen(2); assertEquals(bytesAdded-sizeDeleted, fsdataset.getDfsUsed()); assertEquals(fsdataset.getCapacity()-bytesAdded+sizeDeleted, fsdataset.getRemaining()); @@ -295,7 +300,7 @@ public class TestSimulatedFSDataset exte // Now make sure the rest of the blocks are valid for (int i=3; i <= NUMBLOCKS; ++i) { Block b = new Block(i, 0, 0); - assertTrue(fsdataset.isValidBlock(b)); + assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b))); } }