Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 37928 invoked from network); 28 Nov 2009 20:07:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 28 Nov 2009 20:07:15 -0000 Received: (qmail 90890 invoked by uid 500); 28 Nov 2009 20:07:15 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 90839 invoked by uid 500); 28 Nov 2009 20:07:15 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 90829 invoked by uid 99); 28 Nov 2009 20:07:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Nov 2009 20:07:15 +0000 X-ASF-Spam-Status: No, hits=-2.2 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Nov 2009 20:07:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 00E782388A78; Sat, 28 Nov 2009 20:06:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r885143 [13/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs... Date: Sat, 28 Nov 2009 20:06:08 -0000 To: hdfs-commits@hadoop.apache.org From: stevel@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091128200617.00E782388A78@eris.apache.org> Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Nov 28 20:05:56 2009 @@ -22,15 +22,16 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.security.AccessTokenHandler; +import org.apache.hadoop.hdfs.security.ExportedAccessKeys; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean; import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.AccessTokenHandler; -import org.apache.hadoop.security.ExportedAccessKeys; import org.apache.hadoop.security.PermissionChecker; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; @@ -39,6 +40,7 @@ import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; @@ -52,10 +54,16 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.io.IOUtils; @@ -89,7 +97,7 @@ * 4) machine --> blocklist (inverted #2) * 5) LRU cache of updated-heartbeat machines ***************************************************/ -public class FSNamesystem implements FSConstants, FSNamesystemMBean { +public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats { public static final Log LOG = LogFactory.getLog(FSNamesystem.class); public static final String AUDIT_FORMAT = "ugi=%s\t" + // ugi @@ -123,6 +131,7 @@ public static final Log auditLog = LogFactory.getLog( FSNamesystem.class.getName() + ".audit"); + static int BLOCK_DELETION_INCREMENT = 1000; private boolean isPermissionEnabled; private UserGroupInformation fsOwner; private String supergroup; @@ -199,8 +208,7 @@ private long heartbeatExpireInterval; //replicationRecheckInterval is how often namenode checks for new replication work private long replicationRecheckInterval; - // default block size of a file - private long defaultBlockSize = 0; + private FsServerDefaults serverDefaults; // allow appending to hdfs files private boolean supportAppends = true; @@ -288,7 +296,8 @@ dnthread.start(); this.dnsToSwitchMapping = ReflectionUtils.newInstance( - conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, + conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); /* If the dns to swith mapping supports cache, resolve network @@ -302,7 +311,7 @@ } public static Collection getNamespaceDirs(Configuration conf) { - return getStorageDirs(conf, "dfs.name.dir"); + return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); } public static Collection getStorageDirs(Configuration conf, @@ -314,7 +323,7 @@ // but will retain directories specified in hdfs-site.xml // When importing image from a checkpoint, the name-node can // start with empty set of storage directories. - Configuration cE = new Configuration(false); + Configuration cE = new HdfsConfiguration(false); cE.addResource("core-default.xml"); cE.addResource("core-site.xml"); cE.addResource("hdfs-default.xml"); @@ -353,7 +362,7 @@ } public static Collection getNamespaceEditsDirs(Configuration conf) { - return getStorageDirs(conf, "dfs.name.edits.dir"); + return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY); } /** @@ -397,34 +406,48 @@ } LOG.info("fsOwner=" + fsOwner); - this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup"); - this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true); + this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); + this.isPermissionEnabled = conf.getBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, + DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); LOG.info("supergroup=" + supergroup); LOG.info("isPermissionEnabled=" + isPermissionEnabled); - short filePermission = (short)conf.getInt("dfs.upgrade.permission", 00777); + short filePermission = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY, + DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT); this.defaultPermission = PermissionStatus.createImmutable( fsOwner.getUserName(), supergroup, new FsPermission(filePermission)); long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000; this.heartbeatRecheckInterval = conf.getInt( - "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * heartbeatInterval; this.replicationRecheckInterval = - conf.getInt("dfs.replication.interval", 3) * 1000L; - this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - this.maxFsObjects = conf.getLong("dfs.max.objects", 0); + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; + this.serverDefaults = new FsServerDefaults( + conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE), + conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM), + conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DEFAULT_WRITE_PACKET_SIZE), + (short) conf.getInt("dfs.replication", DEFAULT_REPLICATION_FACTOR), + conf.getInt("io.file.buffer.size", DEFAULT_FILE_BUFFER_SIZE)); + this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT); this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 20*(int)(heartbeatInterval/1000)); - this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0); + this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0); this.supportAppends = conf.getBoolean("dfs.support.append", false); this.isAccessTokenEnabled = conf.getBoolean( - AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false); + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT); if (isAccessTokenEnabled) { this.accessKeyUpdateInterval = conf.getLong( - AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs + DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, + DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs this.accessTokenLifetime = conf.getLong( - AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs } LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000) @@ -530,11 +553,22 @@ */ synchronized void metaSave(String filename) throws IOException { checkSuperuserPrivilege(); - File file = new File(System.getProperty("hadoop.log.dir"), - filename); - PrintWriter out = new PrintWriter(new BufferedWriter( - new FileWriter(file, true))); - + File file = new File(System.getProperty("hadoop.log.dir"), filename); + PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file, + true))); + + long totalInodes = this.dir.totalInodes(); + long totalBlocks = this.getBlocksTotal(); + + ArrayList live = new ArrayList(); + ArrayList dead = new ArrayList(); + this.DFSNodesStatus(live, dead); + + String str = totalInodes + " files and directories, " + totalBlocks + + " blocks = " + (totalInodes + totalBlocks) + " total"; + out.println(str); + out.println("Live Datanodes: "+live.size()); + out.println("Dead Datanodes: "+dead.size()); blockManager.metaSave(out); // @@ -547,7 +581,11 @@ } long getDefaultBlockSize() { - return defaultBlockSize; + return serverDefaults.getBlockSize(); + } + + FsServerDefaults getServerDefaults() { + return serverDefaults; } long getAccessTimePrecision() { @@ -595,13 +633,18 @@ } List results = new ArrayList(); long totalSize = 0; + BlockInfo curBlock; while(totalSize si = new HashSet(); + + // we put the following prerequisite for the operation + // replication and blocks sizes should be the same for ALL the blocks + // check the target + INode inode = dir.getFileINode(target); + + if(inode == null) { + throw new IllegalArgumentException("concat: trg file doesn't exist"); + } + if(inode.isUnderConstruction()) { + throw new IllegalArgumentException("concat: trg file is uner construction"); + } + + INodeFile trgInode = (INodeFile) inode; + + // per design trg shouldn't be empty and all the blocks same size + if(trgInode.blocks.length == 0) { + throw new IllegalArgumentException("concat: "+ target + " file is empty"); + } + + long blockSize = trgInode.preferredBlockSize; + + // check the end block to be full + if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) { + throw new IllegalArgumentException(target + " blocks size should be the same"); + } + + si.add(trgInode); + short repl = trgInode.blockReplication; + + // now check the srcs + boolean endSrc = false; // final src file doesn't have to have full end block + for(int i=0; i= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) { + throw new IllegalArgumentException("concat: blocks sizes of " + + src + " and " + target + " should all be the same"); + } + + si.add(srcInode); + } + + // make sure no two files are the same + if(si.size() < srcs.length+1) { // trg + srcs + // it means at least two files are the same + throw new IllegalArgumentException("at least two files are the same"); + } + + NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + + Arrays.toString(srcs) + " to " + target); + + dir.concatInternal(target,srcs); } + getEditLog().logSync(); + - List results = blockManager.getBlockLocations(blocks, - offset, length, nrBlocksToReturn); - return inode.createLocatedBlocks(results); + if (auditLog.isInfoEnabled()) { + final FileStatus stat = dir.getFileInfo(target); + logAuditEvent(UserGroupInformation.getCurrentUGI(), + Server.getRemoteIp(), + "concat", Arrays.toString(srcs), target, stat); + } + } /** @@ -863,6 +1060,24 @@ return dir.getPreferredBlockSize(filename); } + /* + * Verify that parent dir exists + */ + private void verifyParentDir(String src) throws FileAlreadyExistsException, + FileNotFoundException { + Path parent = new Path(src).getParent(); + if (parent != null) { + INode[] pathINodes = dir.getExistingPathINodes(parent.toString()); + if (pathINodes[pathINodes.length - 1] == null) { + throw new FileNotFoundException("Parent directory doesn't exist: " + + parent.toString()); + } else if (!pathINodes[pathINodes.length - 1].isDirectory()) { + throw new FileAlreadyExistsException("Parent path is not a directory: " + + parent.toString()); + } + } + } + /** * Create a new file entry in the namespace. * @@ -873,10 +1088,11 @@ */ void startFile(String src, PermissionStatus permissions, String holder, String clientMachine, - EnumSet flag, short replication, long blockSize + EnumSet flag, boolean createParent, + short replication, long blockSize ) throws IOException { startFileInternal(src, permissions, holder, clientMachine, flag, - replication, blockSize); + createParent, replication, blockSize); getEditLog().logSync(); if (auditLog.isInfoEnabled()) { final FileStatus stat = dir.getFileInfo(src); @@ -891,6 +1107,7 @@ String holder, String clientMachine, EnumSet flag, + boolean createParent, short replication, long blockSize ) throws IOException { @@ -902,6 +1119,7 @@ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src + ", holder=" + holder + ", clientMachine=" + clientMachine + + ", createParent=" + createParent + ", replication=" + replication + ", overwrite=" + overwrite + ", append=" + append); @@ -928,6 +1146,10 @@ } } + if (!createParent) { + verifyParentDir(src); + } + try { INode myFile = dir.getFileINode(src); if (myFile != null && myFile.isUnderConstruction()) { @@ -936,40 +1158,45 @@ // If the file is under construction , then it must be in our // leases. Find the appropriate lease record. // - Lease lease = leaseManager.getLease(holder); - // - // We found the lease for this file. And surprisingly the original - // holder is trying to recreate this file. This should never occur. - // - if (lease != null) { + Lease lease = leaseManager.getLeaseByPath(src); + if (lease == null) { throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - " because current leaseholder is trying to recreate file."); + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + " because pendingCreates is non-null but no leases found."); } // - // Find the original holder. + // We found the lease for this file. And surprisingly the original + // holder is trying to recreate this file. This should never occur. // - lease = leaseManager.getLease(pendingFile.clientName); - if (lease == null) { + if (lease.getHolder().equals(holder)) { throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - " because pendingCreates is non-null but no leases found."); + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + " because current leaseholder is trying to recreate file."); } + assert lease.getHolder().equals(pendingFile.getClientName()) : + "Current lease holder " + lease.getHolder() + + " does not match file creator " + pendingFile.getClientName(); // + // Current lease holder is different from the requester. // If the original holder has not renewed in the last SOFTLIMIT - // period, then start lease recovery. + // period, then start lease recovery, otherwise fail. // if (lease.expiredSoftLimit()) { LOG.info("startFile: recover lease " + lease + ", src=" + src); - internalReleaseLease(lease, src); - } - throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder + - " on client " + clientMachine + - ", because this file is already being created by " + - pendingFile.getClientName() + - " on " + pendingFile.getClientMachine()); + boolean isClosed = internalReleaseLease(lease, src, null); + if(!isClosed) + throw new RecoveryInProgressException( + "Failed to close file " + src + + ". Lease recovery is in progress. Try again later."); + + } else + throw new AlreadyBeingCreatedException("failed to create file " + + src + " for " + holder + " on client " + clientMachine + + ", because this file is already being created by " + + pendingFile.getClientName() + + " on " + pendingFile.getClientMachine()); } try { @@ -985,7 +1212,7 @@ else { //append & create a nonexist file equals to overwrite this.startFileInternal(src, permissions, holder, clientMachine, - EnumSet.of(CreateFlag.OVERWRITE), replication, blockSize); + EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize); return; } } else if (myFile.isDirectory()) { @@ -1022,7 +1249,7 @@ clientMachine, clientNode); dir.replaceNode(src, node, cons); - leaseManager.addLease(cons.clientName, src); + leaseManager.addLease(cons.getClientName(), src); } else { // Now we can add the name to the filesystem. This file has no @@ -1038,7 +1265,7 @@ throw new IOException("DIR* NameSystem.startFile: " + "Unable to add file to namespace."); } - leaseManager.addLease(newNode.clientName, src); + leaseManager.addLease(newNode.getClientName(), src); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " +"add "+src+" to namespace for "+holder); @@ -1061,7 +1288,7 @@ " Please refer to dfs.support.append configuration parameter."); } startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), - (short)blockManager.maxReplication, (long)0); + false, (short)blockManager.maxReplication, (long)0); getEditLog().logSync(); // @@ -1072,40 +1299,36 @@ LocatedBlock lb = null; synchronized (this) { INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src); - - BlockInfo[] blocks = file.getBlocks(); - if (blocks != null && blocks.length > 0) { - BlockInfo last = blocks[blocks.length-1]; - // this is a redundant search in blocksMap - // should be resolved by the new implementation of append - BlockInfo storedBlock = blockManager.getStoredBlock(last); - assert last == storedBlock : "last block should be in the blocksMap"; - if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) { + BlockInfo lastBlock = file.getLastBlock(); + if (lastBlock != null) { + assert lastBlock == blockManager.getStoredBlock(lastBlock) : + "last block of the file is not in blocksMap"; + if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) { long fileLength = file.computeContentSummary().getLength(); - DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock); + DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock); // remove the replica locations of this block from the node for (int i = 0; i < targets.length; i++) { - targets[i].removeBlock(storedBlock); + targets[i].removeBlock(lastBlock); } - // set the locations of the last block in the lease record - file.setLastBlock(storedBlock, targets); + // convert last block to under-construction and set its locations + blockManager.convertLastBlockToUnderConstruction(file, targets); - lb = new LocatedBlock(last, targets, - fileLength-storedBlock.getNumBytes()); + lb = new LocatedBlock(lastBlock, targets, + fileLength-lastBlock.getNumBytes()); if (isAccessTokenEnabled) { lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock() .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE))); } // Remove block from replication queue. - blockManager.updateNeededReplications(last, 0, 0); + blockManager.updateNeededReplications(lastBlock, 0, 0); // remove this block from the list of pending blocks to be deleted. // This reduces the possibility of triggering HADOOP-1349. // for (DatanodeDescriptor dd : targets) { String datanodeId = dd.getStorageID(); - blockManager.removeFromInvalidates(datanodeId, last); + blockManager.removeFromInvalidates(datanodeId, lastBlock); } } } @@ -1138,8 +1361,17 @@ * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ - public LocatedBlock getAdditionalBlock(String src, - String clientName + public LocatedBlock getAdditionalBlock(String src, + String clientName, + Block previous + ) throws IOException { + return getAdditionalBlock(src, clientName, previous, null); + } + + public LocatedBlock getAdditionalBlock(String src, + String clientName, + Block previous, + HashMap excludedNodes ) throws IOException { long fileLength, blockSize; int replication; @@ -1159,6 +1391,9 @@ INodeFileUnderConstruction pendingFile = checkLease(src, clientName); + // commit the last block + blockManager.commitLastBlock(pendingFile, previous); + // // If we fail this, bad things happen! // @@ -1173,7 +1408,7 @@ // choose targets for the new block to be allocated. DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget( - replication, clientNode, null, blockSize); + src, replication, clientNode, excludedNodes, blockSize); if (targets.length < blockManager.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + @@ -1195,9 +1430,11 @@ throw new NotReplicatedYetException("Not replicated yet:" + src); } + // complete the penultimate block + blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2); + // allocate new block record block locations in INode. - newBlock = allocateBlock(src, pathINodes); - pendingFile.setTargets(targets); + newBlock = allocateBlock(src, pathINodes, targets); for (DatanodeDescriptor dn : targets) { dn.incBlocksScheduled(); @@ -1277,15 +1514,18 @@ COMPLETE_SUCCESS } - public CompleteFileStatus completeFile(String src, String holder) throws IOException { - CompleteFileStatus status = completeFileInternal(src, holder); + public CompleteFileStatus completeFile(String src, + String holder, + Block last) throws IOException { + CompleteFileStatus status = completeFileInternal(src, holder, last); getEditLog().logSync(); return status; } - - private synchronized CompleteFileStatus completeFileInternal(String src, - String holder) throws IOException { + private synchronized CompleteFileStatus completeFileInternal( + String src, + String holder, + Block last) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder); if (isInSafeMode()) throw new SafeModeException("Cannot complete file " + src, safeMode); @@ -1306,7 +1546,12 @@ ("from " + pendingFile.getClientMachine())) ); return CompleteFileStatus.OPERATION_FAILED; - } else if (!checkFileProgress(pendingFile, true)) { + } + + // commit the last block + blockManager.commitLastBlock(pendingFile, last); + + if (!checkFileProgress(pendingFile, true)) { return CompleteFileStatus.STILL_WAITING; } @@ -1339,13 +1584,15 @@ * @param inodes INode representing each of the components of src. * inodes[inodes.length-1] is the INode for the file. */ - private Block allocateBlock(String src, INode[] inodes) throws IOException { + private Block allocateBlock(String src, + INode[] inodes, + DatanodeDescriptor targets[]) throws IOException { Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); while(isValidBlock(b)) { b.setBlockId(FSNamesystem.randBlockId.nextLong()); } b.setGenerationStamp(getGenerationStamp()); - b = dir.addBlock(src, inodes, b); + b = dir.addBlock(src, inodes, b, targets); NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +src+ ". "+b); return b; @@ -1356,13 +1603,16 @@ * replicated. If not, return false. If checkall is true, then check * all blocks, otherwise check only penultimate block. */ - synchronized boolean checkFileProgress(INodeFile v, boolean checkall) { + synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException { if (checkall) { // // check all blocks of the file. // - for (Block block: v.getBlocks()) { + for (BlockInfo block: v.getBlocks()) { if (!blockManager.checkMinReplication(block)) { + LOG.info("BLOCK* NameSystem.checkFileProgress: " + + "block " + block + " has not reached minimal replication " + + blockManager.minReplication); return false; } } @@ -1370,8 +1620,11 @@ // // check the penultimate block of this file // - Block b = v.getPenultimateBlock(); + BlockInfo b = v.getPenultimateBlock(); if (b != null && !blockManager.checkMinReplication(b)) { + LOG.info("BLOCK* NameSystem.checkFileProgress: " + + "block " + b + " has not reached minimal replication " + + blockManager.minReplication); return false; } } @@ -1401,8 +1654,12 @@ // are made, edit namespace and return to client. //////////////////////////////////////////////////////////////// - /** Change the indicated filename. */ - public boolean renameTo(String src, String dst) throws IOException { + /** + * Change the indicated filename. + * @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead. + */ + @Deprecated + boolean renameTo(String src, String dst) throws IOException { boolean status = renameToInternal(src, dst); getEditLog().logSync(); if (status && auditLog.isInfoEnabled()) { @@ -1414,6 +1671,8 @@ return status; } + /** @deprecated See {@link #renameTo(String, String)} */ + @Deprecated private synchronized boolean renameToInternal(String src, String dst ) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst); @@ -1439,7 +1698,46 @@ } return false; } + + + /** Rename src to dst */ + void renameTo(String src, String dst, Options.Rename... options) + throws IOException { + renameToInternal(src, dst, options); + getEditLog().logSync(); + if (auditLog.isInfoEnabled()) { + StringBuilder cmd = new StringBuilder("rename options="); + for (Rename option : options) { + cmd.append(option.value()).append(" "); + } + final FileStatus stat = dir.getFileInfo(dst); + logAuditEvent(UserGroupInformation.getCurrentUGI(), Server.getRemoteIp(), + cmd.toString(), src, dst, stat); + } + } + + private synchronized void renameToInternal(String src, String dst, + Options.Rename... options) throws IOException { + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - " + + src + " to " + dst); + } + if (isInSafeMode()) { + throw new SafeModeException("Cannot rename " + src, safeMode); + } + if (!DFSUtil.isValidName(dst)) { + throw new IOException("Invalid name: " + dst); + } + if (isPermissionEnabled) { + checkParentAccess(src, FsAction.WRITE); + checkAncestorAccess(dst, FsAction.WRITE); + } + FileStatus dinfo = dir.getFileInfo(dst); + dir.renameTo(src, dst, options); + changeLease(src, dst, dinfo); // update lease with new filename + } + /** * Remove the indicated filename from namespace. If the filename * is a directory (non empty) and recursive is set to false then throw exception. @@ -1448,8 +1746,10 @@ if ((!recursive) && (!dir.isDirEmpty(src))) { throw new IOException(src + " is non empty"); } + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src); + } boolean status = deleteInternal(src, true); - getEditLog().logSync(); if (status && auditLog.isInfoEnabled()) { logAuditEvent(UserGroupInformation.getCurrentUGI(), Server.getRemoteIp(), @@ -1459,25 +1759,68 @@ } /** - * Remove the indicated filename from the namespace. This may - * invalidate some blocks that make up the file. + * Remove a file/directory from the namespace. + *

+ * For large directories, deletion is incremental. The blocks under + * the directory are collected and deleted a small number at a time holding + * the {@link FSNamesystem} lock. + *

+ * For small directory or file the deletion is done in one shot. */ - synchronized boolean deleteInternal(String src, + private boolean deleteInternal(String src, boolean enforcePermission) throws IOException { - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src); + boolean deleteNow = false; + ArrayList collectedBlocks = new ArrayList(); + synchronized(this) { + if (isInSafeMode()) { + throw new SafeModeException("Cannot delete " + src, safeMode); + } + if (enforcePermission && isPermissionEnabled) { + checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL); + } + // Unlink the target directory from directory tree + if (!dir.delete(src, collectedBlocks)) { + return false; + } + deleteNow = collectedBlocks.size() <= BLOCK_DELETION_INCREMENT; + if (deleteNow) { // Perform small deletes right away + removeBlocks(collectedBlocks); + } } - if (isInSafeMode()) - throw new SafeModeException("Cannot delete " + src, safeMode); - if (enforcePermission && isPermissionEnabled) { - checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL); + // Log directory deletion to editlog + getEditLog().logSync(); + if (!deleteNow) { + removeBlocks(collectedBlocks); // Incremental deletion of blocks } - - return dir.delete(src) != null; + collectedBlocks.clear(); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* Namesystem.delete: " + + src +" is removed"); + } + return true; } + /** From the given list, incrementally remove the blocks from blockManager */ + private void removeBlocks(List blocks) { + int start = 0; + int end = 0; + while (start < blocks.size()) { + end = BLOCK_DELETION_INCREMENT + start; + end = end > blocks.size() ? blocks.size() : end; + synchronized(this) { + for (int i=start; i blocks) { leaseManager.removeLeaseWithPrefixPath(src); + if (blocks == null) { + return; + } for(Block b : blocks) { blockManager.removeBlock(b); } @@ -1502,9 +1845,9 @@ /** * Create all the necessary directories */ - public boolean mkdirs(String src, PermissionStatus permissions - ) throws IOException { - boolean status = mkdirsInternal(src, permissions); + public boolean mkdirs(String src, PermissionStatus permissions, + boolean createParent) throws IOException { + boolean status = mkdirsInternal(src, permissions, createParent); getEditLog().logSync(); if (status && auditLog.isInfoEnabled()) { final FileStatus stat = dir.getFileInfo(src); @@ -1519,7 +1862,7 @@ * Create all the necessary directories */ private synchronized boolean mkdirsInternal(String src, - PermissionStatus permissions) throws IOException { + PermissionStatus permissions, boolean createParent) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src); if (isPermissionEnabled) { checkTraverse(src); @@ -1538,6 +1881,10 @@ checkAncestorAccess(src, FsAction.WRITE); } + if (!createParent) { + verifyParentDir(src); + } + // validate that we have enough inodes. This is, at best, a // heuristic because the mkdirs() operation migth need to // create multiple inodes. @@ -1592,20 +1939,31 @@ * Move a file that is being written to be immutable. * @param src The filename * @param lease The lease for the client creating the file - */ - void internalReleaseLease(Lease lease, String src) throws IOException { + * @param recoveryLeaseHolder reassign lease to this holder if the last block + * needs recovery; keep current holder if null. + * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal + * replication;
+ * RecoveryInProgressException if lease recovery is in progress.
+ * IOException in case of an error. + * @return true if file has been successfully finalized and closed or + * false if block recovery has been initiated + */ + boolean internalReleaseLease( + Lease lease, String src, String recoveryLeaseHolder) + throws AlreadyBeingCreatedException, + IOException { LOG.info("Recovering lease=" + lease + ", src=" + src); INodeFile iFile = dir.getFileINode(src); if (iFile == null) { - final String message = "DIR* NameSystem.internalReleaseCreate: " + final String message = "DIR* NameSystem.internalReleaseLease: " + "attempt to release a create lock on " + src + " file does not exist."; NameNode.stateChangeLog.warn(message); throw new IOException(message); } if (!iFile.isUnderConstruction()) { - final String message = "DIR* NameSystem.internalReleaseCreate: " + final String message = "DIR* NameSystem.internalReleaseLease: " + "attempt to release a create lock on " + src + " but file is already closed."; NameNode.stateChangeLog.warn(message); @@ -1613,39 +1971,123 @@ } INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile; + int nrBlocks = pendingFile.numBlocks(); + BlockInfo[] blocks = pendingFile.getBlocks(); + + int nrCompleteBlocks; + BlockInfo curBlock = null; + for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) { + curBlock = blocks[nrCompleteBlocks]; + if(!curBlock.isComplete()) + break; + assert blockManager.checkMinReplication(curBlock) : + "A COMPLETE block is not minimally replicated in " + src; + } + + // If there are no incomplete blocks associated with this file, + // then reap lease immediately and close the file. + if(nrCompleteBlocks == nrBlocks) { + finalizeINodeFileUnderConstruction(src, pendingFile); + NameNode.stateChangeLog.warn("BLOCK*" + + " internalReleaseLease: All existing blocks are COMPLETE," + + " lease removed, file closed."); + return true; // closed! + } + + // Only the last and the penultimate blocks may be in non COMPLETE state. + // If the penultimate block is not COMPLETE, then it must be COMMITTED. + if(nrCompleteBlocks < nrBlocks - 2 || + nrCompleteBlocks == nrBlocks - 2 && + curBlock.getBlockUCState() != BlockUCState.COMMITTED) { + final String message = "DIR* NameSystem.internalReleaseLease: " + + "attempt to release a create lock on " + + src + " but file is already closed."; + NameNode.stateChangeLog.warn(message); + throw new IOException(message); + } - // Initialize lease recovery for pendingFile. If there are no blocks - // associated with this file, then reap lease immediately. Otherwise - // renew the lease and trigger lease recovery. - if (pendingFile.getTargets() == null || - pendingFile.getTargets().length == 0) { - if (pendingFile.getBlocks().length == 0) { + // no we know that the last block is not COMPLETE, and + // that the penultimate block if exists is either COMPLETE or COMMITTED + BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock(); + BlockUCState lastBlockState = lastBlock.getBlockUCState(); + BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); + BlockUCState penultimateBlockState = (penultimateBlock == null ? + BlockUCState.COMPLETE : penultimateBlock.getBlockUCState()); + assert penultimateBlockState == BlockUCState.COMPLETE || + penultimateBlockState == BlockUCState.COMMITTED : + "Unexpected state of penultimate block in " + src; + + switch(lastBlockState) { + case COMPLETE: + assert false : "Already checked that the last block is incomplete"; + break; + case COMMITTED: + // Close file if committed blocks are minimally replicated + if(blockManager.checkMinReplication(penultimateBlock) && + blockManager.checkMinReplication(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile); NameNode.stateChangeLog.warn("BLOCK*" - + " internalReleaseLease: No blocks found, lease removed."); - return; - } - // setup the Inode.targets for the last block from the blockManager - // - BlockInfo[] blocks = pendingFile.getBlocks(); - BlockInfo last = blocks[blocks.length-1]; - DatanodeDescriptor[] targets = blockManager.getNodes(last); - pendingFile.setTargets(targets); + + " internalReleaseLease: Committed blocks are minimally replicated," + + " lease removed, file closed."); + return true; // closed! + } + // Cannot close file right now, since some blocks + // are not yet minimally replicated. + // This may potentially cause infinite loop in lease recovery + // if there are no valid replicas on data-nodes. + String message = "DIR* NameSystem.internalReleaseLease: " + + "Failed to release lease for file " + src + + ". Committed blocks are waiting to be minimally replicated." + + " Try again later."; + NameNode.stateChangeLog.warn(message); + throw new AlreadyBeingCreatedException(message); + case UNDER_CONSTRUCTION: + case UNDER_RECOVERY: + // setup the last block locations from the blockManager if not known + if(lastBlock.getNumExpectedLocations() == 0) + lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock)); + // start recovery of the last block for this file + long blockRecoveryId = nextGenerationStamp(); + lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); + lastBlock.initializeBlockRecovery(blockRecoveryId); + leaseManager.renewLease(lease); + // Cannot close file right now, since the last block requires recovery. + // This may potentially cause infinite loop in lease recovery + // if there are no valid replicas on data-nodes. + NameNode.stateChangeLog.warn( + "DIR* NameSystem.internalReleaseLease: " + + "File " + src + " has not been closed." + + " Lease recovery is in progress. " + + "RecoveryId = " + blockRecoveryId + " for block " + lastBlock); + break; } - // start lease recovery of the last block for this file. - pendingFile.assignPrimaryDatanode(); - leaseManager.renewLease(lease); + return false; + } + + Lease reassignLease(Lease lease, String src, String newHolder, + INodeFileUnderConstruction pendingFile) { + if(newHolder == null) + return lease; + pendingFile.setClientName(newHolder); + return leaseManager.reassignLease(lease, src, newHolder); } - private void finalizeINodeFileUnderConstruction(String src, + + private void finalizeINodeFileUnderConstruction( + String src, INodeFileUnderConstruction pendingFile) throws IOException { - leaseManager.removeLease(pendingFile.clientName, src); + leaseManager.removeLease(pendingFile.getClientName(), src); + + // complete the penultimate block + blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2); // The file is no longer pending. - // Create permanent INode, update blockmap + // Create permanent INode, update blocks INodeFile newFile = pendingFile.convertToInodeFile(); dir.replaceNode(src, pendingFile, newFile); + // complete last block of the file + blockManager.completeBlock(newFile, newFile.numBlocks()-1); // close file and persist block allocations for this file dir.closeFile(src, newFile); @@ -1663,30 +2105,35 @@ + ", closeFile=" + closeFile + ", deleteBlock=" + deleteblock + ")"); - final BlockInfo oldblockinfo = blockManager.getStoredBlock(lastblock); - if (oldblockinfo == null) { + final BlockInfo storedBlock = blockManager.getStoredBlock(lastblock); + if (storedBlock == null) { throw new IOException("Block (=" + lastblock + ") not found"); } - INodeFile iFile = oldblockinfo.getINode(); - if (!iFile.isUnderConstruction()) { + INodeFile iFile = storedBlock.getINode(); + if (!iFile.isUnderConstruction() || storedBlock.isComplete()) { throw new IOException("Unexpected block (=" + lastblock + ") since the file (=" + iFile.getLocalName() + ") is not under construction"); } - INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile; - - // Remove old block from blocks map. This always have to be done - // because the generation stamp of this block is changing. - blockManager.removeBlockFromMap(oldblockinfo); + long recoveryId = + ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId(); + if(recoveryId != newgenerationstamp) { + throw new IOException("The recovery id " + newgenerationstamp + + " does not match current recovery id " + + recoveryId + " for block " + lastblock); + } + + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile; if (deleteblock) { - pendingFile.removeBlock(lastblock); + pendingFile.removeLastBlock(lastblock); + blockManager.removeBlockFromMap(storedBlock); } else { - // update last block, construct newblockinfo and add it to the blocks map - lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp); - final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile); + // update last block + storedBlock.setGenerationStamp(newgenerationstamp); + storedBlock.setNumBytes(newlength); // find the DatanodeDescriptor objects // There should be no locations in the blockManager till now because the @@ -1703,13 +2150,11 @@ // Otherwise fsck will report these blocks as MISSING, especially if the // blocksReceived from Datanodes take a long time to arrive. for (int i = 0; i < descriptors.length; i++) { - descriptors[i].addBlock(newblockinfo); + descriptors[i].addBlock(storedBlock); } - pendingFile.setLastBlock(newblockinfo, null); - } else { - // add locations into the INodeUnderConstruction - pendingFile.setLastBlock(newblockinfo, descriptors); } + // add pipeline locations into the INodeUnderConstruction + pendingFile.setLastBlock(storedBlock, descriptors); } // If this commit does not want to close the file, persist @@ -1723,7 +2168,10 @@ LOG.info("commitBlockSynchronization(" + lastblock + ") successful"); return; } - + + // commit the last block + blockManager.commitLastBlock(pendingFile, storedBlock); + //remove lease, close file finalizeINodeFileUnderConstruction(src, pendingFile); getEditLog().logSync(); @@ -2352,8 +2800,10 @@ void chooseExcessReplicates(Collection nonExcess, Block b, short replication, DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint) { + DatanodeDescriptor delNodeHint, + BlockPlacementPolicy replicator) { // first form a rack to datanodes map and + INodeFile inode = blockManager.getINode(b); HashMap> rackMap = new HashMap>(); for (Iterator iter = nonExcess.iterator(); @@ -2390,24 +2840,13 @@ boolean firstOne = true; while (nonExcess.size() - replication > 0) { DatanodeInfo cur = null; - long minSpace = Long.MAX_VALUE; // check if we can del delNodeHint if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) && (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) { cur = delNodeHint; } else { // regular excessive replica removal - Iterator iter = - priSet.isEmpty() ? remains.iterator() : priSet.iterator(); - while( iter.hasNext() ) { - DatanodeDescriptor node = iter.next(); - long free = node.getRemaining(); - - if (minSpace > free) { - minSpace = free; - cur = node; - } - } + cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains); } firstOne = false; @@ -2701,14 +3140,11 @@ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { LOG.info("Start Decommissioning node " + node.getName()); node.startDecommission(); + node.decommissioningStatus.setStartTime(now()); // // all the blocks that reside on this node have to be // replicated. - Iterator decommissionBlocks = node.getBlockIterator(); - while(decommissionBlocks.hasNext()) { - Block block = decommissionBlocks.next(); - blockManager.updateNeededReplications(block, -1, 0); - } + checkDecommissionStateInternal(node); } } @@ -2831,7 +3267,7 @@ // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames. // Update the file names and refresh internal includes and excludes list if (conf == null) - conf = new Configuration(); + conf = new HdfsConfiguration(); hostsReader.updateFileNames(conf.get("dfs.hosts",""), conf.get("dfs.hosts.exclude", "")); hostsReader.refresh(); @@ -2984,9 +3420,10 @@ * @param conf configuration */ SafeModeInfo(Configuration conf) { - this.threshold = conf.getFloat("dfs.safemode.threshold.pct", 0.95f); - this.extension = conf.getInt("dfs.safemode.extension", 0); - this.safeReplication = conf.getInt("dfs.replication.min", 1); + this.threshold = conf.getFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, 0.95f); + this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); + this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); this.blockTotal = 0; this.blockSafe = 0; } @@ -3329,7 +3766,7 @@ void setBlockTotal() { if (safeMode == null) return; - safeMode.setBlockTotal((int)getBlocksTotal()); + safeMode.setBlockTotal((int)getCompleteBlocksTotal()); } /** @@ -3340,6 +3777,33 @@ } /** + * Get the total number of COMPLETE blocks in the system. + * For safe mode only complete blocks are counted. + */ + long getCompleteBlocksTotal() { + // Calculate number of blocks under construction + long numUCBlocks = 0; + for (Lease lease : leaseManager.getSortedLeases()) { + for(String path : lease.getPaths()) { + INode node = dir.getFileINode(path); + assert node != null : "Found a lease for nonexisting file."; + assert node.isUnderConstruction() : + "Found a lease for file that is not under construction."; + INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node; + BlockInfo[] blocks = cons.getBlocks(); + if(blocks == null) + continue; + for(BlockInfo b : blocks) { + if(!b.isComplete()) + numUCBlocks++; + } + } + } + LOG.info("Number of blocks under construction: " + numUCBlocks); + return getBlocksTotal() - numUCBlocks; + } + + /** * Enter safe mode manually. * @throws IOException */ @@ -3658,29 +4122,124 @@ return gs; } + private INodeFileUnderConstruction checkUCBlock(Block block, String clientName) + throws IOException { + // check safe mode + if (isInSafeMode()) + throw new SafeModeException("Cannot get a new generation stamp and an " + + "access token for block " + block, safeMode); + + // check stored block state + BlockInfo storedBlock = blockManager.getStoredBlock(block); + if (storedBlock == null || + storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) { + throw new IOException(block + + " does not exist or is not under Construction" + storedBlock); + } + + // check file inode + INodeFile file = storedBlock.getINode(); + if (file==null || !file.isUnderConstruction()) { + throw new IOException("The file " + storedBlock + + " is belonged to does not exist or it is not under construction."); + } + + // check lease + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file; + if (clientName == null || !clientName.equals(pendingFile.getClientName())) { + throw new LeaseExpiredException("Lease mismatch: " + block + + " is accessed by a non lease holder " + clientName); + } + + return pendingFile; + } + /** - * Verifies that the block is associated with a file that has a lease. - * Increments, logs and then returns the stamp + * Get a new generation stamp together with an access token for + * a block under construction + * + * This method is called for recovering a failed pipeline or setting up + * a pipeline to append to a block. + * + * @param block a block + * @param clientName the name of a client + * @return a located block with a new generation stamp and an access token + * @throws IOException if any error occurs + */ + synchronized LocatedBlock updateBlockForPipeline(Block block, + String clientName) throws IOException { + // check vadility of parameters + checkUCBlock(block, clientName); + + // get a new generation stamp and an access token + block.setGenerationStamp(nextGenerationStamp()); + LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]); + if (isAccessTokenEnabled) { + locatedBlock.setAccessToken(accessTokenHandler.generateToken( + block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE))); + } + return locatedBlock; + } + + + /** + * Update a pipeline for a block under construction + * + * @param clientName the name of the client + * @param oldblock and old block + * @param newBlock a new block with a new generation stamp and length + * @param newNodes datanodes in the pipeline + * @throws IOException if any error occurs */ - synchronized long nextGenerationStampForBlock(Block block) throws IOException { - BlockInfo storedBlock = blockManager.getStoredBlock(block); - if (storedBlock == null) { - String msg = block + " is already commited, storedBlock == null."; - LOG.info(msg); + synchronized void updatePipeline(String clientName, Block oldBlock, + Block newBlock, DatanodeID[] newNodes) + throws IOException { + assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + + oldBlock + " has different block identifier"; + LOG.info("updatePipeline(block=" + oldBlock + + ", newGenerationStamp=" + newBlock.getGenerationStamp() + + ", newLength=" + newBlock.getNumBytes() + + ", newNodes=" + Arrays.asList(newNodes) + + ", clientName=" + clientName + + ")"); + + // check the vadility of the block and lease holder name + final INodeFileUnderConstruction pendingFile = + checkUCBlock(oldBlock, clientName); + final BlockInfoUnderConstruction blockinfo = pendingFile.getLastBlock(); + + // check new GS & length: this is not expected + if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() || + newBlock.getNumBytes() < blockinfo.getNumBytes()) { + String msg = "Update " + oldBlock + " (len = " + + blockinfo.getNumBytes() + ") to an older state: " + newBlock + + " (len = " + newBlock.getNumBytes() +")"; + LOG.warn(msg); throw new IOException(msg); } - INodeFile fileINode = storedBlock.getINode(); - if (!fileINode.isUnderConstruction()) { - String msg = block + " is already commited, !fileINode.isUnderConstruction()."; - LOG.info(msg); - throw new IOException(msg); + + // Update old block with the new generation stamp and new length + blockinfo.setGenerationStamp(newBlock.getGenerationStamp()); + blockinfo.setNumBytes(newBlock.getNumBytes()); + + // find the DatanodeDescriptor objects + DatanodeDescriptor[] descriptors = null; + if (newNodes.length > 0) { + descriptors = new DatanodeDescriptor[newNodes.length]; + for(int i = 0; i < newNodes.length; i++) { + descriptors[i] = getDatanode(newNodes[i]); + } } - if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) { - String msg = block + " is beening recovered, ignoring this request."; - LOG.info(msg); - throw new IOException(msg); + blockinfo.setExpectedLocations(descriptors); + + // persist blocks only if append is supported + String src = leaseManager.findPath(pendingFile); + if (supportAppends) { + dir.persistBlocks(src, pendingFile); + getEditLog().logSync(); } - return nextGenerationStamp(); + LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock); + return; } // rename was successful. If any part of the renamed subtree had @@ -3783,4 +4342,38 @@ DatanodeDescriptor getDatanode(String nodeID) { return datanodeMap.get(nodeID); } + + /** + * Return a range of corrupt replica block ids. Up to numExpectedBlocks + * blocks starting at the next block after startingBlockId are returned + * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId + * is null, up to numExpectedBlocks blocks are returned from the beginning. + * If startingBlockId cannot be found, null is returned. + * + * @param numExpectedBlocks Number of block ids to return. + * 0 <= numExpectedBlocks <= 100 + * @param startingBlockId Block id from which to start. If null, start at + * beginning. + * @return Up to numExpectedBlocks blocks from startingBlockId if it exists + * + */ + long[] getCorruptReplicaBlockIds(int numExpectedBlocks, + Long startingBlockId) { + return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks, + startingBlockId); + } + + public synchronized ArrayList getDecommissioningNodes() { + ArrayList decommissioningNodes = + new ArrayList(); + ArrayList results = + getDatanodeListForReport(DatanodeReportType.LIVE); + for (Iterator it = results.iterator(); it.hasNext();) { + DatanodeDescriptor node = it.next(); + if (node.isDecommissionInProgress()) { + decommissioningNodes.add(node); + } + } + return decommissioningNodes; + } } Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Sat Nov 28 20:05:56 2009 @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; @@ -81,8 +83,8 @@ final XMLOutputter xml = new XMLOutputter(out, "UTF-8"); xml.declaration(); - final Configuration conf = new Configuration(DataNode.getDataNode().getConf()); - final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT); + final Configuration conf = new HdfsConfiguration(DataNode.getDataNode().getConf()); + final int socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); UnixUserGroupInformation.saveToConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi); @@ -99,4 +101,4 @@ xml.endDocument(); } } -} \ No newline at end of file +} Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Sat Nov 28 20:05:56 2009 @@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; @@ -48,7 +49,7 @@ UserGroupInformation.setCurrentUser(ugi); final ServletContext context = getServletContext(); - final Configuration conf = new Configuration((Configuration) context.getAttribute("name.conf")); + final Configuration conf = new HdfsConfiguration((Configuration) context.getAttribute("name.conf")); UnixUserGroupInformation.saveToConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi); Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Sat Nov 28 20:05:56 2009 @@ -25,15 +25,13 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; /** * We keep an in-memory representation of the file/block hierarchy. * This is a base INode class containing common fields for file and * directory inodes. */ -abstract class INode implements Comparable { +abstract class INode implements Comparable, FSInodeInfo { protected byte[] name; protected INodeDirectory parent; protected long modificationTime; @@ -247,6 +245,12 @@ } /** {@inheritDoc} */ + public String getFullPathName() { + // Get the full path name of this inode. + return FSDirectory.getFullPathName(this); + } + + /** {@inheritDoc} */ public String toString() { return "\"" + getLocalName() + "\":" + getPermissionStatus(); } @@ -417,10 +421,4 @@ } return null; } - - - LocatedBlocks createLocatedBlocks(List blocks) { - return new LocatedBlocks(computeContentSummary().getLength(), blocks, - isUnderConstruction()); - } } Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Sat Nov 28 20:05:56 2009 @@ -88,7 +88,7 @@ * @param dsQuota diskspace quota to be set * */ - void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException { + void setQuota(long newNsQuota, long newDsQuota) { nsQuota = newNsQuota; dsQuota = newDsQuota; } @@ -116,18 +116,20 @@ * * @param nsDelta the change of the tree size * @param dsDelta change to disk space occupied - * @throws QuotaExceededException if the changed size is greater - * than the quota */ - void updateNumItemsInTree(long nsDelta, long dsDelta) throws - QuotaExceededException { - long newCount = nsCount + nsDelta; - long newDiskspace = diskspace + dsDelta; - if (nsDelta>0 || dsDelta>0) { - verifyQuota(nsQuota, newCount, dsQuota, newDiskspace); - } - nsCount = newCount; - diskspace = newDiskspace; + void updateNumItemsInTree(long nsDelta, long dsDelta) { + nsCount += nsDelta; + diskspace += dsDelta; + } + + /** Update the size of the tree + * + * @param nsDelta the change of the tree size + * @param dsDelta change to disk space occupied + **/ + void unprotectedUpdateNumItemsInTree(long nsDelta, long dsDelta) { + nsCount = nsCount + nsDelta; + diskspace = diskspace + dsDelta; } /** @@ -146,14 +148,16 @@ /** Verify if the namespace count disk space satisfies the quota restriction * @throws QuotaExceededException if the given quota is less than the count */ - private static void verifyQuota(long nsQuota, long nsCount, - long dsQuota, long diskspace) - throws QuotaExceededException { - if (nsQuota >= 0 && nsQuota < nsCount) { - throw new NSQuotaExceededException(nsQuota, nsCount); - } - if (dsQuota >= 0 && dsQuota < diskspace) { - throw new DSQuotaExceededException(dsQuota, diskspace); + void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException { + long newCount = nsCount + nsDelta; + long newDiskspace = diskspace + dsDelta; + if (nsDelta>0 || dsDelta>0) { + if (nsQuota >= 0 && nsQuota < newCount) { + throw new NSQuotaExceededException(nsQuota, newCount); + } + if (dsQuota >= 0 && dsQuota < newDiskspace) { + throw new DSQuotaExceededException(dsQuota, newDiskspace); + } } } } Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sat Nov 28 20:05:56 2009 @@ -88,6 +88,26 @@ } /** + * append array of blocks to this.blocks + */ + void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) { + int size = this.blocks.length; + + BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks]; + System.arraycopy(this.blocks, 0, newlist, 0, size); + + for(INodeFile in: inodes) { + System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length); + size += in.blocks.length; + } + + for(BlockInfo bi: this.blocks) { + bi.setINode(this); + } + this.blocks = newlist; + } + + /** * add a block to the block list */ void addBlock(BlockInfo newblock) { @@ -112,8 +132,11 @@ int collectSubtreeBlocksAndClear(List v) { parent = null; - for (Block blk : blocks) { - v.add(blk); + if(blocks != null && v != null) { + for (BlockInfo blk : blocks) { + v.add(blk); + blk.setINode(null); + } } blocks = null; return 1; @@ -121,16 +144,29 @@ /** {@inheritDoc} */ long[] computeContentSummary(long[] summary) { - long bytes = 0; - for(Block blk : blocks) { - bytes += blk.getNumBytes(); - } - summary[0] += bytes; + summary[0] += computeFileSize(true); summary[1]++; summary[3] += diskspaceConsumed(); return summary; } + /** Compute file size. + * May or may not include BlockInfoUnderConstruction. + */ + long computeFileSize(boolean includesBlockInfoUnderConstruction) { + if (blocks == null || blocks.length == 0) { + return 0; + } + final int last = blocks.length - 1; + //check if the last block is BlockInfoUnderConstruction + long bytes = blocks[last] instanceof BlockInfoUnderConstruction + && !includesBlockInfoUnderConstruction? + 0: blocks[last].getNumBytes(); + for(int i = 0; i < last; i++) { + bytes += blocks[i].getNumBytes(); + } + return bytes; + } @Override @@ -146,6 +182,9 @@ long diskspaceConsumed(Block[] blkArr) { long size = 0; + if(blkArr == null) + return 0; + for (Block blk : blkArr) { if (blk != null) { size += blk.getNumBytes(); @@ -172,22 +211,33 @@ /** * Return the penultimate allocated block for this file. */ - Block getPenultimateBlock() { + BlockInfo getPenultimateBlock() { if (blocks == null || blocks.length <= 1) { return null; } return blocks[blocks.length - 2]; } - INodeFileUnderConstruction toINodeFileUnderConstruction( - String clientName, String clientMachine, DatanodeDescriptor clientNode - ) throws IOException { - if (isUnderConstruction()) { - return (INodeFileUnderConstruction)this; - } - return new INodeFileUnderConstruction(name, - blockReplication, modificationTime, preferredBlockSize, - blocks, getPermissionStatus(), - clientName, clientMachine, clientNode); + /** + * Get the last block of the file. + * Make sure it has the right type. + */ + T getLastBlock() throws IOException { + if (blocks == null || blocks.length == 0) + return null; + T returnBlock = null; + try { + @SuppressWarnings("unchecked") // ClassCastException is caught below + T tBlock = (T)blocks[blocks.length - 1]; + returnBlock = tBlock; + } catch(ClassCastException cce) { + throw new IOException("Unexpected last block type: " + + blocks[blocks.length - 1].getClass().getSimpleName()); + } + return returnBlock; + } + + int numBlocks() { + return blocks == null ? 0 : blocks.length; } } Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Sat Nov 28 20:05:56 2009 @@ -21,16 +21,13 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; class INodeFileUnderConstruction extends INodeFile { - final String clientName; // lease holder + private String clientName; // lease holder private final String clientMachine; private final DatanodeDescriptor clientNode; // if client is a cluster node too. - - private int primaryNodeIndex = -1; //the node working on lease recovery - private DatanodeDescriptor[] targets = null; //locations for last block - private long lastRecoveryTime = 0; INodeFileUnderConstruction(PermissionStatus permissions, short replication, @@ -67,6 +64,10 @@ return clientName; } + void setClientName(String clientName) { + this.clientName = clientName; + } + String getClientMachine() { return clientMachine; } @@ -83,15 +84,6 @@ return true; } - DatanodeDescriptor[] getTargets() { - return targets; - } - - void setTargets(DatanodeDescriptor[] targets) { - this.targets = targets; - this.primaryNodeIndex = -1; - } - // // converts a INodeFileUnderConstruction into a INodeFile // use the modification time as the access time @@ -108,10 +100,10 @@ } /** - * remove a block from the block list. This block should be + * Remove a block from the block list. This block should be * the last one on the list. */ - void removeBlock(Block oldblock) throws IOException { + void removeLastBlock(Block oldblock) throws IOException { if (blocks == null) { throw new IOException("Trying to delete non-existant block " + oldblock); } @@ -124,57 +116,24 @@ BlockInfo[] newlist = new BlockInfo[size_1]; System.arraycopy(blocks, 0, newlist, 0, size_1); blocks = newlist; - - // Remove the block locations for the last block. - targets = null; - } - - synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets - ) throws IOException { - if (blocks == null) { - throw new IOException("Trying to update non-existant block (newblock=" - + newblock + ")"); - } - blocks[blocks.length - 1] = newblock; - setTargets(newtargets); - lastRecoveryTime = 0; } /** - * Initialize lease recovery for this object + * Convert the last block of the file to an under-construction block. + * Set its locations. */ - void assignPrimaryDatanode() { - //assign the first alive datanode as the primary datanode - - if (targets.length == 0) { - NameNode.stateChangeLog.warn("BLOCK*" - + " INodeFileUnderConstruction.initLeaseRecovery:" - + " No blocks found, lease removed."); - } - - int previous = primaryNodeIndex; - //find an alive datanode beginning from previous - for(int i = 1; i <= targets.length; i++) { - int j = (previous + i)%targets.length; - if (targets[j].isAlive) { - DatanodeDescriptor primary = targets[primaryNodeIndex = j]; - primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets); - NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1] - + " recovery started, primary=" + primary); - return; - } - } - } - - /** - * Update lastRecoveryTime if expired. - * @return true if lastRecoveryTimeis updated. - */ - synchronized boolean setLastRecoveryTime(long now) { - boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD; - if (expired) { - lastRecoveryTime = now; - } - return expired; + BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock, + DatanodeDescriptor[] targets) + throws IOException { + if (blocks == null || blocks.length == 0) { + throw new IOException("Trying to update non-existant block. " + + "File is empty."); + } + BlockInfoUnderConstruction ucBlock = + lastBlock.convertToBlockUnderConstruction( + BlockUCState.UNDER_CONSTRUCTION, targets); + ucBlock.setINode(this); + setBlock(numBlocks()-1, ucBlock); + return ucBlock; } } Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=885143&r1=885142&r2=885143&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original) +++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Sat Nov 28 20:05:56 2009 @@ -102,7 +102,7 @@ /** * Adds (or re-adds) the lease for the specified file. */ - synchronized void addLease(String holder, String src) { + synchronized Lease addLease(String holder, String src) { Lease lease = getLease(holder); if (lease == null) { lease = new Lease(holder); @@ -113,6 +113,7 @@ } sortedLeasesByPath.put(src, lease); lease.paths.add(src); + return lease; } /** @@ -143,11 +144,22 @@ } /** + * Reassign lease for file src to the new holder. + */ + synchronized Lease reassignLease(Lease lease, String src, String newHolder) { + assert newHolder != null : "new lease holder is null"; + if (lease != null) { + removeLease(lease, src); + } + return addLease(newHolder, src); + } + + /** * Finds the pathname for the specified pendingFile */ synchronized String findPath(INodeFileUnderConstruction pendingFile ) throws IOException { - Lease lease = getLease(pendingFile.clientName); + Lease lease = getLease(pendingFile.getClientName()); if (lease != null) { String src = lease.findPath(pendingFile); if (src != null) { @@ -265,7 +277,11 @@ Collection getPaths() { return paths; } - + + String getHolder() { + return holder; + } + void replacePath(String oldpath, String newpath) { paths.remove(oldpath); paths.add(newpath); @@ -376,7 +392,13 @@ oldest.getPaths().toArray(leasePaths); for(String p : leasePaths) { try { - fsnamesystem.internalReleaseLease(oldest, p); + if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) { + LOG.info("Lease recovery for file " + p + + " is complete. File closed."); + removing.add(p); + } else + LOG.info("Started block recovery for file " + p + + " lease " + oldest); } catch (IOException e) { LOG.error("Cannot release the path "+p+" in the lease "+oldest, e); removing.add(p);