Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 33277 invoked from network); 19 Apr 2007 21:35:29 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Apr 2007 21:35:29 -0000 Received: (qmail 13350 invoked by uid 500); 19 Apr 2007 21:35:33 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 13331 invoked by uid 500); 19 Apr 2007 21:35:33 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 13277 invoked by uid 99); 19 Apr 2007 21:35:33 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Apr 2007 14:35:33 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Apr 2007 14:35:21 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id C44AF1A9851; Thu, 19 Apr 2007 14:35:00 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r530556 [5/12] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/hbase/src/java/org/apache/hadoop/hbase/ src/contrib/hbase/src/test/org/apache/hadoop/hbase/ src/contrib/streaming/src/java/org/ap... Date: Thu, 19 Apr 2007 21:34:53 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070419213500.C44AF1A9851@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=530556&r1=530555&r2=530556 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Apr 19 14:34:41 2007 @@ -221,21 +221,21 @@ this.defaultReplication = conf.getInt("dfs.replication", 3); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); - if( minReplication <= 0 ) + if (minReplication <= 0) throw new IOException( "Unexpected configuration parameters: dfs.replication.min = " + minReplication - + " must be greater than 0" ); - if( maxReplication >= (int)Short.MAX_VALUE ) + + " must be greater than 0"); + if (maxReplication >= (int)Short.MAX_VALUE) throw new IOException( "Unexpected configuration parameters: dfs.replication.max = " - + maxReplication + " must be less than " + (Short.MAX_VALUE) ); - if( maxReplication < minReplication ) + + maxReplication + " must be less than " + (Short.MAX_VALUE)); + if (maxReplication < minReplication) throw new IOException( "Unexpected configuration parameters: dfs.replication.min = " + minReplication + " must be less than dfs.replication.max = " - + maxReplication ); + + maxReplication); this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2); long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000; this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes @@ -245,11 +245,11 @@ this.localMachine = hostname; this.port = port; - this.dir = new FSDirectory( this ); - StartupOption startOpt = (StartupOption)conf.get( - "dfs.namenode.startup", StartupOption.REGULAR ); - this.dir.loadFSImage( getNamespaceDirs(conf), startOpt ); - this.safeMode = new SafeModeInfo( conf ); + this.dir = new FSDirectory(this); + StartupOption startOpt = (StartupOption)conf.get( + "dfs.namenode.startup", StartupOption.REGULAR); + this.dir.loadFSImage(getNamespaceDirs(conf), startOpt); + this.safeMode = new SafeModeInfo(conf); setBlockTotal(); pendingReplications = new PendingReplicationBlocks(LOG); this.hbthread = new Daemon(new HeartbeatMonitor()); @@ -268,7 +268,7 @@ this.infoPort = conf.getInt("dfs.info.port", 50070); this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0"); - this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false); + this.infoServer = new StatusHttpServer("dfs", infoBindAddress, infoPort, false); this.infoServer.setAttribute("name.system", this); this.infoServer.setAttribute("name.node", nn); this.infoServer.setAttribute("name.conf", conf); @@ -286,9 +286,9 @@ String[] dirNames = conf.getStrings("dfs.name.dir"); if (dirNames == null) dirNames = new String[] {"/tmp/hadoop/dfs/name"}; - Collection dirs = new ArrayList( dirNames.length ); - for( int idx = 0; idx < dirNames.length; idx++ ) { - dirs.add( new File(dirNames[idx] )); + Collection dirs = new ArrayList(dirNames.length); + for(int idx = 0; idx < dirNames.length; idx++) { + dirs.add(new File(dirNames[idx])); } return dirs; } @@ -310,8 +310,8 @@ } NamespaceInfo getNamespaceInfo() { - return new NamespaceInfo( dir.fsImage.getNamespaceID(), - dir.fsImage.getCTime() ); + return new NamespaceInfo(dir.fsImage.getNamespaceID(), + dir.fsImage.getCTime()); } /** Close down this filesystem manager. @@ -368,9 +368,9 @@ Block block = it.next(); out.print(block); for (Iterator jt = blocksMap.nodeIterator(block); - jt.hasNext(); ) { + jt.hasNext();) { DatanodeDescriptor node = jt.next(); - out.print(" " + node + " : " ); + out.print(" " + node + " : "); } out.println(""); } @@ -397,9 +397,9 @@ } /* get replication factor of a block */ - private int getReplication( Block block ) { - FSDirectory.INode fileINode = blocksMap.getINode( block ); - if( fileINode == null ) { // block does not belong to any file + private int getReplication(Block block) { + FSDirectory.INode fileINode = blocksMap.getINode(block); + if (fileINode == null) { // block does not belong to any file return 0; } else { return fileINode.getReplication(); @@ -424,7 +424,7 @@ /* Return the total number of under replication blocks */ synchronized int size() { int size = 0; - for( int i=0; i set:priorityQueues) { - if(set.contains(block)) return true; + if (set.contains(block)) return true; } return false; } @@ -447,9 +447,9 @@ int curReplicas, int expectedReplicas) { if (curReplicas<=0 || curReplicas>=expectedReplicas) { return LEVEL; // no need to replicate - } else if(curReplicas==1) { + } else if (curReplicas==1) { return 0; // highest priority - } else if(curReplicas*3= 0 && priLevel < LEVEL - && priorityQueues.get(priLevel).remove(block) ) { + private boolean remove(Block block, int priLevel) { + if (priLevel >= 0 && priLevel < LEVEL + && priorityQueues.get(priLevel).remove(block)) { NameNode.stateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: " + "Removing block " + block.getBlockName() - + " from priority queue "+ priLevel ); + + " from priority queue "+ priLevel); return true; } else { for(int i=0; i it = - blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) { + for(Iterator it = + blocksMap.nodeIterator(blocks[i]); it.hasNext();) { machineSets[i][ numNodes++ ] = it.next(); } - clusterMap.sortByDistance( client, machineSets[i] ); + clusterMap.sortByDistance(client, machineSets[i]); } } @@ -653,31 +653,31 @@ public synchronized boolean setReplication(String src, short replication ) throws IOException { - if( isInSafeMode() ) - throw new SafeModeException( "Cannot set replication for " + src, safeMode ); - verifyReplication(src, replication, null ); + if (isInSafeMode()) + throw new SafeModeException("Cannot set replication for " + src, safeMode); + verifyReplication(src, replication, null); Vector oldReplication = new Vector(); Block[] fileBlocks; - fileBlocks = dir.setReplication( src, replication, oldReplication ); - if( fileBlocks == null ) // file not found or is a directory + fileBlocks = dir.setReplication(src, replication, oldReplication); + if (fileBlocks == null) // file not found or is a directory return false; int oldRepl = oldReplication.elementAt(0).intValue(); - if( oldRepl == replication ) // the same replication + if (oldRepl == replication) // the same replication return true; // update needReplication priority queues LOG.info("Increasing replication for file " + src - + ". New replication is " + replication ); - for( int idx = 0; idx < fileBlocks.length; idx++ ) - neededReplications.update( fileBlocks[idx], 0, replication-oldRepl ); + + ". New replication is " + replication); + for(int idx = 0; idx < fileBlocks.length; idx++) + neededReplications.update(fileBlocks[idx], 0, replication-oldRepl); - if( oldRepl > replication ) { + if (oldRepl > replication) { // old replication > the new one; need to remove copies LOG.info("Reducing replication for file " + src - + ". New replication is " + replication ); - for( int idx = 0; idx < fileBlocks.length; idx++ ) - proccessOverReplicatedBlock( fileBlocks[idx], replication ); + + ". New replication is " + replication); + for(int idx = 0; idx < fileBlocks.length; idx++) + proccessOverReplicatedBlock(fileBlocks[idx], replication); } return true; } @@ -690,21 +690,21 @@ * Check whether the replication parameter is within the range * determined by system configuration. */ - private void verifyReplication( String src, - short replication, - UTF8 clientName - ) throws IOException { + private void verifyReplication(String src, + short replication, + UTF8 clientName + ) throws IOException { String text = "file " + src + ((clientName != null) ? " on client " + clientName : "") + ".\n" + "Requested replication " + replication; - if( replication > maxReplication ) - throw new IOException( text + " exceeds maximum " + maxReplication ); + if (replication > maxReplication) + throw new IOException(text + " exceeds maximum " + maxReplication); - if( replication < minReplication ) - throw new IOException( - text + " is less than the required minimum " + minReplication ); + if (replication < minReplication) + throw new IOException( + text + " is less than the required minimum " + minReplication); } /** @@ -718,17 +718,17 @@ * @throws IOException if the filename is invalid * {@link FSDirectory#isValidToCreate(UTF8)}. */ - public synchronized Object[] startFile( UTF8 src, - UTF8 holder, - UTF8 clientMachine, - boolean overwrite, - short replication, - long blockSize - ) throws IOException { + public synchronized Object[] startFile(UTF8 src, + UTF8 holder, + UTF8 clientMachine, + boolean overwrite, + short replication, + long blockSize + ) throws IOException { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file " +src+" for "+holder+" at "+clientMachine); - if( isInSafeMode() ) - throw new SafeModeException( "Cannot create file" + src, safeMode ); + if (isInSafeMode()) + throw new SafeModeException("Cannot create file" + src, safeMode); if (!isValidName(src.toString())) { throw new IOException("Invalid file name: " + src); } @@ -785,9 +785,9 @@ } try { - verifyReplication(src.toString(), replication, clientMachine ); - } catch( IOException e) { - throw new IOException( "failed to create "+e.getMessage()); + verifyReplication(src.toString(), replication, clientMachine); + } catch(IOException e) { + throw new IOException("failed to create "+e.getMessage()); } if (!dir.isValidToCreate(src)) { if (overwrite) { @@ -827,8 +827,8 @@ holder, clientMachine, clientNode)); - NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: " - +"add "+src+" to pendingCreates for "+holder ); + NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " + +"add "+src+" to pendingCreates for "+holder); synchronized (leases) { Lease lease = leases.get(holder); if (lease == null) { @@ -871,8 +871,8 @@ ) throws IOException { NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file " +src+" for "+clientName); - if( isInSafeMode() ) - throw new SafeModeException( "Cannot add block to " + src, safeMode ); + if (isInSafeMode()) + throw new SafeModeException("Cannot add block to " + src, safeMode); FileUnderConstruction pendingFile = pendingCreates.get(src); // make sure that we still have the lease on this file if (pendingFile == null) { @@ -916,11 +916,11 @@ // Remove the block from the pending creates list // NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " - +b.getBlockName()+"of file "+src ); + +b.getBlockName()+"of file "+src); FileUnderConstruction pendingFile = pendingCreates.get(src); if (pendingFile != null) { Collection pendingVector = pendingFile.getBlocks(); - for (Iterator it = pendingVector.iterator(); it.hasNext(); ) { + for (Iterator it = pendingVector.iterator(); it.hasNext();) { Block cur = it.next(); if (cur.compareTo(b) == 0) { pendingCreateBlocks.remove(cur); @@ -942,7 +942,7 @@ public synchronized void abandonFileInProgress(UTF8 src, UTF8 holder ) throws IOException { - NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src ); + NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src); synchronized (leases) { // find the lease Lease lease = leases.get(holder); @@ -969,20 +969,20 @@ * Before we return, we make sure that all the file's blocks have * been reported by datanodes and are replicated correctly. */ - public synchronized int completeFile( UTF8 src, - UTF8 holder) throws IOException { - NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder ); - if( isInSafeMode() ) - throw new SafeModeException( "Cannot complete file " + src, safeMode ); + public synchronized int completeFile(UTF8 src, + UTF8 holder) throws IOException { + NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder); + if (isInSafeMode()) + throw new SafeModeException("Cannot complete file " + src, safeMode); FileUnderConstruction pendingFile = pendingCreates.get(src); if (dir.getFile(src) != null || pendingFile == null) { - NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: " - + "failed to complete " + src - + " because dir.getFile()==" + dir.getFile(src) - + " and " + pendingFile); + NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: " + + "failed to complete " + src + + " because dir.getFile()==" + dir.getFile(src) + + " and " + pendingFile); return OPERATION_FAILED; - } else if (! checkFileProgress(pendingFile, true)) { + } else if (!checkFileProgress(pendingFile, true)) { return STILL_WAITING; } @@ -998,8 +998,8 @@ // for (int i = 0; i < nrBlocks; i++) { Block b = pendingBlocks[i]; - Block storedBlock = blocksMap.getStoredBlock( b ); - if ( storedBlock != null ) { + Block storedBlock = blocksMap.getStoredBlock(b); + if (storedBlock != null) { pendingBlocks[i] = storedBlock; } } @@ -1007,7 +1007,7 @@ // // Now we can add the (name,blocks) tuple to the filesystem // - if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) { + if (!dir.addFile(src, pendingBlocks, pendingFile.getReplication())) { return OPERATION_FAILED; } @@ -1024,7 +1024,7 @@ Lease lease = leases.get(holder); if (lease != null) { lease.completedCreate(src); - if (! lease.hasLocks()) { + if (!lease.hasLocks()) { leases.remove(holder); sortedLeases.remove(lease); } @@ -1043,7 +1043,7 @@ int numExpectedReplicas = pendingFile.getReplication(); for (int i = 0; i < nrBlocks; i++) { // filter out containingNodes that are marked for decommission. - int numCurrentReplica = countContainingNodes( pendingBlocks[i] ); + int numCurrentReplica = countContainingNodes(pendingBlocks[i]); if (numCurrentReplica < numExpectedReplicas) { neededReplications.add( pendingBlocks[i], numCurrentReplica, numExpectedReplicas); @@ -1061,13 +1061,13 @@ Block b = null; do { b = new Block(FSNamesystem.randBlockId.nextLong(), 0); - } while ( isValidBlock(b) ); + } while (isValidBlock(b)); FileUnderConstruction v = pendingCreates.get(src); v.getBlocks().add(b); pendingCreateBlocks.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: " +src+ ". "+b.getBlockName()+ - " is created and added to pendingCreates and pendingCreateBlocks" ); + " is created and added to pendingCreates and pendingCreateBlocks"); return b; } @@ -1081,8 +1081,8 @@ // // check all blocks of the file. // - for (Iterator it = v.getBlocks().iterator(); it.hasNext(); ) { - if ( blocksMap.numNodes(it.next()) < this.minReplication ) { + for (Iterator it = v.getBlocks().iterator(); it.hasNext();) { + if (blocksMap.numNodes(it.next()) < this.minReplication) { return false; } } @@ -1156,9 +1156,9 @@ // Check how many copies we have of the block. If we have at least one // copy on a live node, then we can delete it. - int count = countContainingNodes( blk ); - if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() || - dn.isDecommissioned() ))) { + int count = countContainingNodes(blk); + if ((count > 1) || ((count == 1) && (dn.isDecommissionInProgress() || + dn.isDecommissioned()))) { addToInvalidates(blk, dn); removeStoredBlock(blk, getDatanode(dn)); NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: " @@ -1186,9 +1186,9 @@ * Change the indicated filename. */ public synchronized boolean renameTo(UTF8 src, UTF8 dst) throws IOException { - NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst ); - if( isInSafeMode() ) - throw new SafeModeException( "Cannot rename " + src, safeMode ); + NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst); + if (isInSafeMode()) + throw new SafeModeException("Cannot rename " + src, safeMode); if (!isValidName(dst.toString())) { throw new IOException("Invalid name: " + dst); } @@ -1200,21 +1200,21 @@ * invalidate some blocks that make up the file. */ public synchronized boolean delete(UTF8 src) throws IOException { - NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src ); - if( isInSafeMode() ) - throw new SafeModeException( "Cannot delete " + src, safeMode ); + NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src); + if (isInSafeMode()) + throw new SafeModeException("Cannot delete " + src, safeMode); Block deletedBlocks[] = dir.delete(src); if (deletedBlocks != null) { for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; - for ( Iterator it = - blocksMap.nodeIterator( b ); it.hasNext(); ) { + for (Iterator it = + blocksMap.nodeIterator(b); it.hasNext();) { DatanodeDescriptor node = it.next(); addToInvalidates(b, node); NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: " + b.getBlockName() + " is added to invalidSet of " - + node.getName() ); + + node.getName()); } } } @@ -1253,7 +1253,7 @@ // Check for ".." "." ":" "/" StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR); - while( tokens.hasMoreTokens()) { + while(tokens.hasMoreTokens()) { String element = tokens.nextToken(); if (element.equals("..") || element.equals(".") || @@ -1268,11 +1268,11 @@ /** * Create all the necessary directories */ - public synchronized boolean mkdirs( String src ) throws IOException { + public synchronized boolean mkdirs(String src) throws IOException { boolean success; - NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src ); - if( isInSafeMode() ) - throw new SafeModeException( "Cannot create directory " + src, safeMode ); + NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src); + if (isInSafeMode()) + throw new SafeModeException("Cannot create directory " + src, safeMode); if (!isValidName(src)) { throw new IOException("Invalid directory name: " + src); } @@ -1294,7 +1294,7 @@ int startBlock = -1; int endBlock = -1; - Block blocks[] = dir.getFile( new UTF8( src )); + Block blocks[] = dir.getFile(new UTF8(src)); if (blocks == null) { // no blocks return new String[0][]; @@ -1332,9 +1332,9 @@ String hosts[][] = new String[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { Collection v = new ArrayList(); - for ( Iterator it = - blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) { - v.add( it.next().getHostName() ); + for (Iterator it = + blocksMap.nodeIterator(blocks[i]); it.hasNext();) { + v.add(it.next().getHostName()); } hosts[i-startBlock] = v.toArray(new String[v.size()]); } @@ -1396,10 +1396,10 @@ return (locks.size() + creates.size()) > 0; } public void releaseLocks() { - for (Iterator it = locks.iterator(); it.hasNext(); ) + for (Iterator it = locks.iterator(); it.hasNext();) internalReleaseLock(it.next(), holder); locks.clear(); - for (Iterator it = creates.iterator(); it.hasNext(); ) + for (Iterator it = creates.iterator(); it.hasNext();) internalReleaseCreate(it.next(), holder); creates.clear(); } @@ -1463,51 +1463,53 @@ /** * Get a lock (perhaps exclusive) on the given file */ - /** @deprecated */ @Deprecated - public synchronized int obtainLock( UTF8 src, - UTF8 holder, - boolean exclusive) throws IOException { - if( isInSafeMode() ) - throw new SafeModeException( "Cannot lock file " + src, safeMode ); - int result = dir.obtainLock(src, holder, exclusive); - if (result == COMPLETE_SUCCESS) { - synchronized (leases) { - Lease lease = leases.get(holder); - if (lease == null) { - lease = new Lease(holder); - leases.put(holder, lease); - sortedLeases.add(lease); - } else { - sortedLeases.remove(lease); - lease.renew(); - sortedLeases.add(lease); - } - lease.obtained(src); + /** @deprecated */ + @Deprecated + public synchronized int obtainLock(UTF8 src, + UTF8 holder, + boolean exclusive) throws IOException { + if (isInSafeMode()) + throw new SafeModeException("Cannot lock file " + src, safeMode); + int result = dir.obtainLock(src, holder, exclusive); + if (result == COMPLETE_SUCCESS) { + synchronized (leases) { + Lease lease = leases.get(holder); + if (lease == null) { + lease = new Lease(holder); + leases.put(holder, lease); + sortedLeases.add(lease); + } else { + sortedLeases.remove(lease); + lease.renew(); + sortedLeases.add(lease); } + lease.obtained(src); } - return result; } + return result; + } /** * Release the lock on the given file */ - /** @deprecated */ @Deprecated - public synchronized int releaseLock(UTF8 src, UTF8 holder) { - int result = internalReleaseLock(src, holder); - if (result == COMPLETE_SUCCESS) { - synchronized (leases) { - Lease lease = leases.get(holder); - if (lease != null) { - lease.released(src); - if (! lease.hasLocks()) { - leases.remove(holder); - sortedLeases.remove(lease); - } + /** @deprecated */ + @Deprecated + public synchronized int releaseLock(UTF8 src, UTF8 holder) { + int result = internalReleaseLock(src, holder); + if (result == COMPLETE_SUCCESS) { + synchronized (leases) { + Lease lease = leases.get(holder); + if (lease != null) { + lease.released(src); + if (!lease.hasLocks()) { + leases.remove(holder); + sortedLeases.remove(lease); } } } - return result; } + return result; + } private int internalReleaseLock(UTF8 src, UTF8 holder) { return dir.releaseLock(src, holder); } @@ -1524,7 +1526,7 @@ "DIR* NameSystem.internalReleaseCreate: " + src + " is removed from pendingCreates for " + holder + " (failure)"); - for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) { + for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext();) { Block b = it2.next(); pendingCreateBlocks.remove(b); } @@ -1540,8 +1542,8 @@ */ public void renewLease(UTF8 holder) throws IOException { synchronized (leases) { - if( isInSafeMode() ) - throw new SafeModeException( "Cannot renew lease for " + holder, safeMode ); + if (isInSafeMode()) + throw new SafeModeException("Cannot renew lease for " + holder, safeMode); Lease lease = leases.get(holder); if (lease != null) { sortedLeases.remove(lease); @@ -1588,57 +1590,57 @@ * @see DataNode#register() * @author Konstantin Shvachko */ - public synchronized void registerDatanode( DatanodeRegistration nodeReg, - String networkLocation - ) throws IOException { + public synchronized void registerDatanode(DatanodeRegistration nodeReg, + String networkLocation + ) throws IOException { if (!verifyNodeRegistration(nodeReg)) { - throw new DisallowedDatanodeException( nodeReg ); + throw new DisallowedDatanodeException(nodeReg); } String dnAddress = Server.getRemoteAddress(); - if ( dnAddress == null ) { + if (dnAddress == null) { //Mostly not called inside an RPC. - throw new IOException( "Could not find remote address for " + - "registration from " + nodeReg.getName() ); + throw new IOException("Could not find remote address for " + + "registration from " + nodeReg.getName()); } String hostName = nodeReg.getHost(); // update the datanode's name with ip:port - DatanodeID dnReg = new DatanodeID( dnAddress + ":" + nodeReg.getPort(), - nodeReg.getStorageID(), - nodeReg.getInfoPort() ); - nodeReg.updateRegInfo( dnReg ); + DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(), + nodeReg.getStorageID(), + nodeReg.getInfoPort()); + nodeReg.updateRegInfo(dnReg); NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: " + "node registration from " + nodeReg.getName() - + " storage " + nodeReg.getStorageID() ); + + " storage " + nodeReg.getStorageID()); DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID()); - DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName( nodeReg.getName() ); + DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName()); - if( nodeN != null && nodeN != nodeS ) { - NameNode.LOG.info( "BLOCK* NameSystem.registerDatanode: " - + "node from name: " + nodeN.getName() ); + if (nodeN != null && nodeN != nodeS) { + NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: " + + "node from name: " + nodeN.getName()); // nodeN previously served a different data storage, // which is not served by anybody anymore. - removeDatanode( nodeN ); + removeDatanode(nodeN); // physically remove node from datanodeMap - wipeDatanode( nodeN ); + wipeDatanode(nodeN); // and log removal - getEditLog().logRemoveDatanode( nodeN ); + getEditLog().logRemoveDatanode(nodeN); nodeN = null; } - if ( nodeS != null ) { - if( nodeN == nodeS ) { + if (nodeS != null) { + if (nodeN == nodeS) { // The same datanode has been just restarted to serve the same data // storage. We do not need to remove old data blocks, the delta will // be calculated on the next block report from the datanode NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: " - + "node restarted." ); + + "node restarted."); } else { // nodeS is found // The registering datanode is a replacement node for the existing @@ -1646,46 +1648,46 @@ NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " + "node " + nodeS.getName() - + " is replaced by " + nodeReg.getName() + "." ); + + " is replaced by " + nodeReg.getName() + "."); } - getEditLog().logRemoveDatanode( nodeS ); + getEditLog().logRemoveDatanode(nodeS); // update cluster map - clusterMap.remove( nodeS ); - nodeS.updateRegInfo( nodeReg ); - nodeS.setNetworkLocation( networkLocation ); - clusterMap.add( nodeS ); - nodeS.setHostName( hostName ); - getEditLog().logAddDatanode( nodeS ); + clusterMap.remove(nodeS); + nodeS.updateRegInfo(nodeReg); + nodeS.setNetworkLocation(networkLocation); + clusterMap.add(nodeS); + nodeS.setHostName(hostName); + getEditLog().logAddDatanode(nodeS); // also treat the registration message as a heartbeat - synchronized( heartbeats ) { - heartbeats.add( nodeS ); + synchronized(heartbeats) { + heartbeats.add(nodeS); //update its timestamp - nodeS.updateHeartbeat( 0L, 0L, 0); + nodeS.updateHeartbeat(0L, 0L, 0); nodeS.isAlive = true; } return; } // this is a new datanode serving a new data storage - if( nodeReg.getStorageID().equals("") ) { + if (nodeReg.getStorageID().equals("")) { // this data storage has never been registered // it is either empty or was created by pre-storageID version of DFS nodeReg.storageID = newStorageID(); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.registerDatanode: " - + "new storageID " + nodeReg.getStorageID() + " assigned." ); + + "new storageID " + nodeReg.getStorageID() + " assigned."); } // register new datanode DatanodeDescriptor nodeDescr - = new DatanodeDescriptor( nodeReg, networkLocation, hostName ); - unprotectedAddDatanode( nodeDescr ); + = new DatanodeDescriptor(nodeReg, networkLocation, hostName); + unprotectedAddDatanode(nodeDescr); clusterMap.add(nodeDescr); - getEditLog().logAddDatanode( nodeDescr ); + getEditLog().logAddDatanode(nodeDescr); // also treat the registration message as a heartbeat - synchronized( heartbeats ) { - heartbeats.add( nodeDescr ); + synchronized(heartbeats) { + heartbeats.add(nodeDescr); nodeDescr.isAlive = true; // no need to update its timestamp // because its is done when the descriptor is created @@ -1701,7 +1703,7 @@ * @return registration ID */ public String getRegistrationID() { - return Storage.getRegistrationID( dir.fsImage ); + return Storage.getRegistrationID(dir.fsImage); } /** @@ -1714,9 +1716,9 @@ */ private String newStorageID() { String newID = null; - while( newID == null ) { - newID = "DS" + Integer.toString( r.nextInt() ); - if( datanodeMap.get( newID ) != null ) + while(newID == null) { + newID = "DS" + Integer.toString(r.nextInt()); + if (datanodeMap.get(newID) != null) newID = null; } return newID; @@ -1743,20 +1745,20 @@ * @return true if block report is required or false otherwise. * @throws IOException */ - public boolean gotHeartbeat( DatanodeID nodeID, - long capacity, - long remaining, - int xceiverCount, - int xmitsInProgress, - Object[] xferResults, - Object deleteList[] - ) throws IOException { + public boolean gotHeartbeat(DatanodeID nodeID, + long capacity, + long remaining, + int xceiverCount, + int xmitsInProgress, + Object[] xferResults, + Object deleteList[] + ) throws IOException { synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo; try { - nodeinfo = getDatanode( nodeID ); - if (nodeinfo == null ) { + nodeinfo = getDatanode(nodeID); + if (nodeinfo == null) { return true; } } catch(UnregisteredDatanodeException e) { @@ -1769,7 +1771,7 @@ throw new DisallowedDatanodeException(nodeinfo); } - if( !nodeinfo.isAlive ) { + if (!nodeinfo.isAlive) { return true; } else { updateStats(nodeinfo, false); @@ -1968,11 +1970,11 @@ * @param nodeID datanode ID * @author hairong */ - synchronized public void removeDatanode( DatanodeID nodeID ) + synchronized public void removeDatanode(DatanodeID nodeID) throws IOException { - DatanodeDescriptor nodeInfo = getDatanode( nodeID ); + DatanodeDescriptor nodeInfo = getDatanode(nodeID); if (nodeInfo != null) { - removeDatanode( nodeInfo ); + removeDatanode(nodeInfo); } else { NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: " + nodeInfo.getName() + " does not exist"); @@ -1984,21 +1986,21 @@ * @param nodeInfo datanode descriptor * @author hairong */ - private void removeDatanode( DatanodeDescriptor nodeInfo ) { + private void removeDatanode(DatanodeDescriptor nodeInfo) { if (nodeInfo.isAlive) { updateStats(nodeInfo, false); heartbeats.remove(nodeInfo); nodeInfo.isAlive = false; } - for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext(); ) { + for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext();) { removeStoredBlock(it.next(), nodeInfo); } unprotectedRemoveDatanode(nodeInfo); clusterMap.remove(nodeInfo); } - void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) { + void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) { // datanodeMap.remove(nodeDescr.getStorageID()); // deaddatanodeMap.put(nodeDescr.getName(), nodeDescr); nodeDescr.resetBlocks(); @@ -2007,7 +2009,7 @@ + nodeDescr.getName() + " is out of service now."); } - void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) { + void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) { /* To keep host2DataNodeMap consistent with datanodeMap, remove from host2DataNodeMap the datanodeDescriptor removed from datanodeMap before adding nodeDescr to host2DataNodeMap. @@ -2018,7 +2020,7 @@ NameNode.stateChangeLog.debug( "BLOCK* NameSystem.unprotectedAddDatanode: " - + "node " + nodeDescr.getName() + " is added to datanodeMap." ); + + "node " + nodeDescr.getName() + " is added to datanodeMap."); } @@ -2027,7 +2029,7 @@ * * @param nodeID node */ - void wipeDatanode( DatanodeID nodeID ) throws IOException { + void wipeDatanode(DatanodeID nodeID) throws IOException { String key = nodeID.getStorageID(); host2DataNodeMap.remove(datanodeMap.remove(key)); NameNode.stateChangeLog.debug( @@ -2086,7 +2088,7 @@ } } } - allAlive = ! foundDead; + allAlive = !foundDead; } } @@ -2099,9 +2101,9 @@ ) throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " - +"from "+nodeID.getName()+" "+newReport.length+" blocks" ); + +"from "+nodeID.getName()+" "+newReport.length+" blocks"); } - DatanodeDescriptor node = getDatanode( nodeID ); + DatanodeDescriptor node = getDatanode(nodeID); if (node == null) { throw new IOException("ProcessReport from unregisterted node: " + nodeID.getName()); @@ -2152,14 +2154,14 @@ } } - for ( Iterator i = toRemove.iterator(); i.hasNext(); ) { + for (Iterator i = toRemove.iterator(); i.hasNext();) { Block b = i.next(); - removeStoredBlock( b, node ); - node.removeBlock( b ); + removeStoredBlock(b, node); + node.removeBlock(b); } - for ( Iterator i = toAdd.iterator(); i.hasNext(); ) { + for (Iterator i = toAdd.iterator(); i.hasNext();) { Block b = i.next(); - node.addBlock( addStoredBlock(b, node) ); + node.addBlock(addStoredBlock(b, node)); } // @@ -2175,7 +2177,7 @@ // should only be invoked infrequently. // Collection obsolete = new ArrayList(); - for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { + for (Iterator it = node.getBlockIterator(); it.hasNext();) { Block b = it.next(); // @@ -2184,14 +2186,14 @@ // they are added to recentInvalidateSets and will be sent out // thorugh succeeding heartbeat responses. // - if (! isValidBlock(b) && ! pendingCreateBlocks.contains(b)) { + if (!isValidBlock(b) && !pendingCreateBlocks.contains(b)) { if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) { addToInvalidates(b, node); } else { obsolete.add(b); } NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " - +"ask "+nodeID.getName()+" to delete "+b.getBlockName() ); + +"ask "+nodeID.getName()+" to delete "+b.getBlockName()); } } return (Block[]) obsolete.toArray(new Block[obsolete.size()]); @@ -2204,22 +2206,22 @@ */ synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) { - FSDirectory.INode fileINode = blocksMap.getINode( block ); + FSDirectory.INode fileINode = blocksMap.getINode(block); int replication = (fileINode != null) ? fileINode.getReplication() : defaultReplication; - boolean added = blocksMap.addNode( block, node, replication ); + boolean added = blocksMap.addNode(block, node, replication); - Block storedBlock = blocksMap.getStoredBlock( block ); //extra look up! - if ( storedBlock != null && block != storedBlock ) { - if ( block.getNumBytes() > 0 ) { - storedBlock.setNumBytes( block.getNumBytes() ); + Block storedBlock = blocksMap.getStoredBlock(block); //extra look up! + if (storedBlock != null && block != storedBlock) { + if (block.getNumBytes() > 0) { + storedBlock.setNumBytes(block.getNumBytes()); } block = storedBlock; } int curReplicaDelta = 0; - if ( added ) { + if (added) { curReplicaDelta = 1; // // Hairong: I would prefer to set the level of next logrecord @@ -2228,9 +2230,9 @@ // they simply take up all the space in the log file // So I set the level to be trace // - if ( NameNode.stateChangeLog.isTraceEnabled() ) { + if (NameNode.stateChangeLog.isTraceEnabled()) { NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: " - +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() ); + +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName()); } } else { NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: " @@ -2238,16 +2240,16 @@ + block.getBlockName() + " on " + node.getName()); } - if( fileINode == null ) // block does not belong to any file + if (fileINode == null) // block does not belong to any file return block; // filter out containingNodes that are marked for decommission. - int numCurrentReplica = countContainingNodes( block ) + int numCurrentReplica = countContainingNodes(block) + pendingReplications.getNumReplicas(block); // check whether safe replication is reached for the block // only if it is a part of a files - incrementSafeBlockCount( numCurrentReplica ); + incrementSafeBlockCount(numCurrentReplica); // handle underReplication/overReplication short fileReplication = fileINode.getReplication(); @@ -2256,8 +2258,8 @@ } else { neededReplications.update(block, curReplicaDelta, 0); } - if ( numCurrentReplica > fileReplication ) { - proccessOverReplicatedBlock( block, fileReplication ); + if (numCurrentReplica > fileReplication) { + proccessOverReplicatedBlock(block, fileReplication); } return block; } @@ -2267,13 +2269,13 @@ * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void proccessOverReplicatedBlock( Block block, short replication ) { + private void proccessOverReplicatedBlock(Block block, short replication) { Collection nonExcess = new ArrayList(); - for (Iterator it = blocksMap.nodeIterator( block ); - it.hasNext(); ) { + for (Iterator it = blocksMap.nodeIterator(block); + it.hasNext();) { DatanodeDescriptor cur = it.next(); Collection excessBlocks = excessReplicateMap.get(cur.getStorageID()); - if (excessBlocks == null || ! excessBlocks.contains(block)) { + if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { nonExcess.add(cur); } @@ -2302,7 +2304,7 @@ DatanodeInfo node = iter.next(); long free = node.getRemaining(); - if(minSpace > free) { + if (minSpace > free) { minSpace = free; cur = node; } @@ -2317,7 +2319,7 @@ } excessBlocks.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: " - +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" ); + +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap"); // // The 'excessblocks' tracks blocks until we get confirmation @@ -2335,7 +2337,7 @@ } invalidateSet.add(b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: " - +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" ); + +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets"); } } @@ -2345,22 +2347,22 @@ */ synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " - +block.getBlockName() + " from "+node.getName() ); - if ( !blocksMap.removeNode( block, node ) ) { + +block.getBlockName() + " from "+node.getName()); + if (!blocksMap.removeNode(block, node)) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " - +block.getBlockName()+" has already been removed from node "+node ); + +block.getBlockName()+" has already been removed from node "+node); return; } - decrementSafeBlockCount( block ); + decrementSafeBlockCount(block); // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is // necessary. In that case, put block on a possibly-will- // be-replicated list. // - FSDirectory.INode fileINode = blocksMap.getINode( block ); - if( fileINode != null ) { + FSDirectory.INode fileINode = blocksMap.getINode(block); + if (fileINode != null) { neededReplications.update(block, -1, 0); } @@ -2372,7 +2374,7 @@ if (excessBlocks != null) { excessBlocks.remove(block); NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " - +block.getBlockName()+" is removed from excessBlocks" ); + +block.getBlockName()+" is removed from excessBlocks"); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getStorageID()); } @@ -2382,22 +2384,22 @@ /** * The given node is reporting that it received a certain block. */ - public synchronized void blockReceived( DatanodeID nodeID, - Block block - ) throws IOException { - DatanodeDescriptor node = getDatanode( nodeID ); + public synchronized void blockReceived(DatanodeID nodeID, + Block block + ) throws IOException { + DatanodeDescriptor node = getDatanode(nodeID); if (node == null) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block.getBlockName() + " is received from an unrecorded node " - + nodeID.getName() ); + + nodeID.getName()); throw new IllegalArgumentException( "Unexpected exception. Got blockReceived message from node " + block.getBlockName() + ", but there is no info for it"); } - if ( NameNode.stateChangeLog.isDebugEnabled() ) { + if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " - +block.getBlockName()+" is received from " + nodeID.getName() ); + +block.getBlockName()+" is received from " + nodeID.getName()); } // Check if this datanode should actually be shutdown instead. @@ -2409,7 +2411,7 @@ // // Modify the blocks->datanode map and node's map. // - node.addBlock( addStoredBlock(block, node) ); + node.addBlock(addStoredBlock(block, node)); pendingReplications.remove(block); } @@ -2446,23 +2448,23 @@ synchronized (datanodeMap) { results = new DatanodeInfo[datanodeMap.size()]; int i = 0; - for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) - results[i++] = new DatanodeInfo( it.next() ); + for(Iterator it = datanodeMap.values().iterator(); it.hasNext();) + results[i++] = new DatanodeInfo(it.next()); } return results; } /** */ - public synchronized void DFSNodesStatus( ArrayList live, - ArrayList dead ) { + public synchronized void DFSNodesStatus(ArrayList live, + ArrayList dead) { synchronized (datanodeMap) { - for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { + for(Iterator it = datanodeMap.values().iterator(); it.hasNext();) { DatanodeDescriptor node = it.next(); - if( isDatanodeDead(node)) - dead.add( node ); + if (isDatanodeDead(node)) + dead.add(node); else - live.add( node ); + live.add(node); } } } @@ -2473,7 +2475,7 @@ private synchronized void datanodeDump(PrintWriter out) { synchronized (datanodeMap) { out.println("Metasave: Number of datanodes: " + datanodeMap.size()); - for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { + for(Iterator it = datanodeMap.values().iterator(); it.hasNext();) { DatanodeDescriptor node = it.next(); out.println(node.dumpDatanode()); } @@ -2521,7 +2523,7 @@ for (int i = 0; i < nodes.length; i++) { boolean found = false; for (Iterator it = datanodeMap.values().iterator(); - it.hasNext(); ) { + it.hasNext();) { DatanodeDescriptor node = it.next(); // @@ -2583,14 +2585,14 @@ /** * Check if there are any recently-deleted blocks a datanode should remove. */ - public synchronized Block[] blocksToInvalidate( DatanodeID nodeID ) { + public synchronized Block[] blocksToInvalidate(DatanodeID nodeID) { // Ask datanodes to perform block delete // only if safe mode is off. - if( isInSafeMode() ) + if (isInSafeMode()) return null; - Collection invalidateSet = recentInvalidateSets.remove( - nodeID.getStorageID() ); + Collection invalidateSet = recentInvalidateSets.remove( + nodeID.getStorageID()); if (invalidateSet == null) { return null; @@ -2633,7 +2635,7 @@ blockList.append(block.getBlockName()); } NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockToInvalidate: " - +"ask "+nodeID.getName()+" to delete " + blockList ); + +"ask "+nodeID.getName()+" to delete " + blockList); } return sendBlock.toArray(new Block[sendBlock.size()]); } @@ -2644,7 +2646,7 @@ */ private int countContainingNodes(Iterator nodeIter) { int count = 0; - while ( nodeIter.hasNext() ) { + while (nodeIter.hasNext()) { DatanodeDescriptor node = nodeIter.next(); if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { count++; @@ -2653,17 +2655,17 @@ return count; } - /** wrapper for countContainingNodes( Iterator ). */ - private int countContainingNodes( Block b ) { - return countContainingNodes( blocksMap.nodeIterator( b ) ); + /** wrapper for countContainingNodes(Iterator). */ + private int countContainingNodes(Block b) { + return countContainingNodes(blocksMap.nodeIterator(b)); } /** Reeturns a newly allocated list exluding the decommisioned nodes. */ - ArrayList containingNodeList( Block b ) { + ArrayList containingNodeList(Block b) { ArrayList nonCommissionedNodeList = new ArrayList(); - for( Iterator it = blocksMap.nodeIterator( b ); - it.hasNext(); ) { + for(Iterator it = blocksMap.nodeIterator(b); + it.hasNext();) { DatanodeDescriptor node = it.next(); if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { nonCommissionedNodeList.add(node); @@ -2679,9 +2681,9 @@ Block decommissionBlocks[] = srcNode.getBlocks(); for (int i = 0; i < decommissionBlocks.length; i++) { Block block = decommissionBlocks[i]; - FSDirectory.INode fileINode = blocksMap.getINode( block ); - if ( fileINode != null && - fileINode.getReplication() > countContainingNodes(block) ) { + FSDirectory.INode fileINode = blocksMap.getINode(block); + if (fileINode != null && + fileINode.getReplication() > countContainingNodes(block)) { return true; } } @@ -2735,7 +2737,7 @@ int needed) { // Ask datanodes to perform block replication // only if safe mode is off. - if( isInSafeMode() ) + if (isInSafeMode()) return null; synchronized (neededReplications) { @@ -2757,27 +2759,27 @@ } Block block = it.next(); long blockSize = block.getNumBytes(); - FSDirectory.INode fileINode = blocksMap.getINode( block ); + FSDirectory.INode fileINode = blocksMap.getINode(block); if (fileINode == null) { // block does not belong to any file it.remove(); } else { List containingNodes = containingNodeList(block); - Collection excessBlocks = excessReplicateMap.get( - srcNode.getStorageID() ); + Collection excessBlocks = excessReplicateMap.get( + srcNode.getStorageID()); // srcNode must contain the block, and the block must // not be scheduled for removal on that node if (containingNodes.contains(srcNode) - && (excessBlocks == null || ! excessBlocks.contains(block))) { + && (excessBlocks == null || !excessBlocks.contains(block))) { int numCurrentReplica = containingNodes.size() + pendingReplications.getNumReplicas(block); if (numCurrentReplica >= fileINode.getReplication()) { it.remove(); } else { DatanodeDescriptor targets[] = replicator.chooseTarget( - Math.min( fileINode.getReplication() - numCurrentReplica, - needed), + Math.min(fileINode.getReplication() - numCurrentReplica, + needed), datanodeMap.get(srcNode.getStorageID()), containingNodes, null, blockSize); if (targets.length > 0) { @@ -2828,7 +2830,7 @@ + block.getBlockName() + " to " + targetList); NameNode.stateChangeLog.debug( "BLOCK* neededReplications = " + neededReplications.size() - + " pendingReplications = " + pendingReplications.size() ); + + " pendingReplications = " + pendingReplications.size()); } } @@ -2863,13 +2865,13 @@ class ReplicationTargetChooser { final boolean considerLoad; - ReplicationTargetChooser( boolean considerLoad ) { + ReplicationTargetChooser(boolean considerLoad) { this.considerLoad = considerLoad; } private class NotEnoughReplicasException extends Exception { - NotEnoughReplicasException( String msg ) { - super( msg ); + NotEnoughReplicasException(String msg) { + super(msg); } } @@ -2888,8 +2890,8 @@ DatanodeDescriptor[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List excludedNodes, - long blocksize ) { - if( excludedNodes == null) { + long blocksize) { + if (excludedNodes == null) { excludedNodes = new ArrayList(); } @@ -2914,18 +2916,18 @@ DatanodeDescriptor writer, List choosenNodes, List excludedNodes, - long blocksize ) { - if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) { + long blocksize) { + if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return new DatanodeDescriptor[0]; } - if( excludedNodes == null) { + if (excludedNodes == null) { excludedNodes = new ArrayList(); } int clusterSize = clusterMap.getNumOfLeaves(); int totalNumOfReplicas = choosenNodes.size()+numOfReplicas; - if( totalNumOfReplicas > clusterSize) { + if (totalNumOfReplicas > clusterSize) { numOfReplicas -= (totalNumOfReplicas-clusterSize); totalNumOfReplicas = clusterSize; } @@ -2937,11 +2939,11 @@ new ArrayList(choosenNodes); excludedNodes.addAll(choosenNodes); - if(!clusterMap.contains(writer)) + if (!clusterMap.contains(writer)) writer=null; DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, - excludedNodes, blocksize, maxNodesPerRack, results ); + excludedNodes, blocksize, maxNodesPerRack, results); results.removeAll(choosenNodes); @@ -2958,41 +2960,41 @@ int maxNodesPerRack, List results) { - if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) { + if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } int numOfResults = results.size(); - if(writer == null && (numOfResults==1 || numOfResults==2) ) { + if (writer == null && (numOfResults==1 || numOfResults==2)) { writer = results.get(0); } try { - switch( numOfResults ) { + switch(numOfResults) { case 0: writer = chooseLocalNode(writer, excludedNodes, blocksize, maxNodesPerRack, results); - if(--numOfReplicas == 0) break; + if (--numOfReplicas == 0) break; case 1: chooseRemoteRack(1, writer, excludedNodes, blocksize, maxNodesPerRack, results); - if(--numOfReplicas == 0) break; + if (--numOfReplicas == 0) break; case 2: - if(clusterMap.isOnSameRack(results.get(0), results.get(1))) { + if (clusterMap.isOnSameRack(results.get(0), results.get(1))) { chooseRemoteRack(1, writer, excludedNodes, blocksize, maxNodesPerRack, results); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, results); } - if(--numOfReplicas == 0) break; + if (--numOfReplicas == 0) break; default: chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); } } catch (NotEnoughReplicasException e) { LOG.warn("Not able to place enough replicas, still in need of " - + numOfReplicas ); + + numOfReplicas); } return writer; } @@ -3010,14 +3012,14 @@ List results) throws NotEnoughReplicasException { // if no local machine, randomly choose one node - if(localMachine == null) + if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); // otherwise try local machine first - if(!excludedNodes.contains(localMachine)) { + if (!excludedNodes.contains(localMachine)) { excludedNodes.add(localMachine); - if( isGoodTarget(localMachine, blocksize, + if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false, results)) { results.add(localMachine); return localMachine; @@ -3044,9 +3046,9 @@ List results) throws NotEnoughReplicasException { // no local machine, so choose a random machine - if( localMachine == null ) { + if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results ); + blocksize, maxNodesPerRack, results); } // choose one from the local rack @@ -3060,17 +3062,17 @@ for(Iterator iter=results.iterator(); iter.hasNext();) { DatanodeDescriptor nextNode = iter.next(); - if(nextNode != localMachine) { + if (nextNode != localMachine) { newLocal = nextNode; break; } } - if( newLocal != null ) { + if (newLocal != null) { try { return chooseRandom( newLocal.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results); - } catch( NotEnoughReplicasException e2 ) { + } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); @@ -3089,22 +3091,22 @@ * from the local rack */ - private void chooseRemoteRack( int numOfReplicas, - DatanodeDescriptor localMachine, - List excludedNodes, - long blocksize, - int maxReplicasPerRack, - List results) + private void chooseRemoteRack(int numOfReplicas, + DatanodeDescriptor localMachine, + List excludedNodes, + long blocksize, + int maxReplicasPerRack, + List results) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { - chooseRandom( numOfReplicas, "~"+localMachine.getNetworkLocation(), - excludedNodes, blocksize, maxReplicasPerRack, results ); + chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(), + excludedNodes, blocksize, maxReplicasPerRack, results); } catch (NotEnoughReplicasException e) { - chooseRandom( numOfReplicas-(results.size()-oldNumOfReplicas), - localMachine.getNetworkLocation(), excludedNodes, blocksize, - maxReplicasPerRack, results); + chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), + localMachine.getNetworkLocation(), excludedNodes, blocksize, + maxReplicasPerRack, results); } } @@ -3122,12 +3124,12 @@ do { DatanodeDescriptor[] selectedNodes = chooseRandom(1, nodes, excludedNodes); - if(selectedNodes.length == 0 ) { - throw new NotEnoughReplicasException( - "Not able to place enough replicas" ); + if (selectedNodes.length == 0) { + throw new NotEnoughReplicasException( + "Not able to place enough replicas"); } result = (DatanodeDescriptor)(selectedNodes[0]); - } while( !isGoodTarget( result, blocksize, maxNodesPerRack, results)); + } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results)); results.add(result); return result; } @@ -3145,20 +3147,20 @@ do { DatanodeDescriptor[] selectedNodes = chooseRandom(numOfReplicas, nodes, excludedNodes); - if(selectedNodes.length < numOfReplicas) { + if (selectedNodes.length < numOfReplicas) { toContinue = false; } for(int i=0; i0 && toContinue ); + } while (numOfReplicas>0 && toContinue); - if(numOfReplicas>0) { - throw new NotEnoughReplicasException( + if (numOfReplicas>0) { + throw new NotEnoughReplicasException( "Not able to place enough replicas"); } } @@ -3175,10 +3177,10 @@ clusterMap.countNumOfAvailableNodes(nodes, excludedNodes); numOfReplicas = (numOfAvailableNodes 0 ) { + while(numOfReplicas > 0) { DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes); - if(!excludedNodes.contains(choosenNode)) { - results.add( choosenNode ); + if (!excludedNodes.contains(choosenNode)) { + results.add(choosenNode); excludedNodes.add(choosenNode); numOfReplicas--; } @@ -3191,40 +3193,40 @@ * return true if node has enough space, * does not have too much load, and the rack does not have too many nodes */ - private boolean isGoodTarget( DatanodeDescriptor node, - long blockSize, int maxTargetPerLoc, - List results) { + private boolean isGoodTarget(DatanodeDescriptor node, + long blockSize, int maxTargetPerLoc, + List results) { return isGoodTarget(node, blockSize, maxTargetPerLoc, this.considerLoad, results); } - private boolean isGoodTarget( DatanodeDescriptor node, - long blockSize, int maxTargetPerLoc, - boolean considerLoad, - List results) { + private boolean isGoodTarget(DatanodeDescriptor node, + long blockSize, int maxTargetPerLoc, + boolean considerLoad, + List results) { // check if the node is (being) decommissed - if(node.isDecommissionInProgress() || node.isDecommissioned()) { + if (node.isDecommissionInProgress() || node.isDecommissioned()) { LOG.debug("Node "+node.getPath()+ " is not chosen because the node is (being) decommissioned"); return false; } // check the remaining capacity of the target machine - if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) { + if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) { LOG.debug("Node "+node.getPath()+ " is not chosen because the node does not have enough space"); return false; } // check the communication traffic of the target machine - if(considerLoad) { + if (considerLoad) { double avgLoad = 0; int size = clusterMap.getNumOfLeaves(); - if( size != 0 ) { + if (size != 0) { avgLoad = (double)totalLoad()/size; } - if(node.getXceiverCount() > (2.0 * avgLoad)) { + if (node.getXceiverCount() > (2.0 * avgLoad)) { LOG.debug("Node "+node.getPath()+ " is not chosen because the node is too busy"); return false; @@ -3234,14 +3236,14 @@ // check if the target rack has chosen too many nodes String rackname = node.getNetworkLocation(); int counter=1; - for( Iterator iter = results.iterator(); - iter.hasNext(); ) { + for(Iterator iter = results.iterator(); + iter.hasNext();) { DatanodeDescriptor result = iter.next(); - if(rackname.equals(result.getNetworkLocation())) { + if (rackname.equals(result.getNetworkLocation())) { counter++; } } - if(counter>maxTargetPerLoc) { + if (counter>maxTargetPerLoc) { LOG.debug("Node "+node.getPath()+ " is not chosen because the rack has too many chosen nodes"); return false; @@ -3256,29 +3258,29 @@ */ private DatanodeDescriptor[] getPipeline( DatanodeDescriptor writer, - DatanodeDescriptor[] nodes ) { - if( nodes.length==0 ) return nodes; + DatanodeDescriptor[] nodes) { + if (nodes.length==0) return nodes; - synchronized( clusterMap ) { + synchronized(clusterMap) { int index=0; - if(writer == null || !clusterMap.contains(writer)) { + if (writer == null || !clusterMap.contains(writer)) { writer = nodes[0]; } - for( ;indexcurrentDistance ) { + int currentDistance = clusterMap.getDistance(writer, currentNode); + if (shortestDistance>currentDistance) { shortestDistance = currentDistance; shortestNode = currentNode; shortestIndex = i; } } //switch position index & shortestIndex - if( index != shortestIndex ) { + if (index != shortestIndex) { nodes[shortestIndex] = nodes[index]; nodes[index] = shortestNode; } @@ -3321,7 +3323,7 @@ hostsReader.refresh(); synchronized (this) { for (Iterator it = datanodeMap.values().iterator(); - it.hasNext(); ) { + it.hasNext();) { DatanodeDescriptor node = it.next(); // Check if not include. if (!inHostsList(node)) { @@ -3384,7 +3386,7 @@ */ public synchronized void decommissionedDatanodeCheck() { for (Iterator it = datanodeMap.values().iterator(); - it.hasNext(); ) { + it.hasNext();) { DatanodeDescriptor node = it.next(); checkDecommissionStateInternal(node); } @@ -3483,15 +3485,15 @@ * @return DatanodeDescriptor or null if the node is not found. * @throws IOException */ - public DatanodeDescriptor getDatanode( DatanodeID nodeID ) throws IOException { + public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException { UnregisteredDatanodeException e = null; DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID()); if (node == null) return null; if (!node.getName().equals(nodeID.getName())) { - e = new UnregisteredDatanodeException( nodeID, node ); + e = new UnregisteredDatanodeException(nodeID, node); NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: " - + e.getLocalizedMessage() ); + + e.getLocalizedMessage()); throw e; } return node; @@ -3504,13 +3506,13 @@ /** Check if node is already in the map */ synchronized boolean contains(DatanodeDescriptor node) { - if( node==null ) return false; + if (node==null) return false; String host = node.getHost(); DatanodeDescriptor[] nodes = map.get(host); - if( nodes != null ) { + if (nodes != null) { for(DatanodeDescriptor containedNode:nodes) { - if(node==containedNode) + if (node==containedNode) return true; } } @@ -3521,12 +3523,12 @@ * return true if the node is added; false otherwise */ synchronized boolean add(DatanodeDescriptor node) { - if(node==null || contains(node)) return false; + if (node==null || contains(node)) return false; String host = node.getHost(); DatanodeDescriptor[] nodes = map.get(host); DatanodeDescriptor[] newNodes; - if(nodes==null) { + if (nodes==null) { newNodes = new DatanodeDescriptor[1]; newNodes[0]=node; } else { // rare case: more than one datanode on the host @@ -3542,15 +3544,15 @@ * return true if the node is removed; false otherwise */ synchronized boolean remove(DatanodeDescriptor node) { - if(node==null) return false; + if (node==null) return false; String host = node.getHost(); DatanodeDescriptor[] nodes = map.get(host); - if(nodes==null) { + if (nodes==null) { return false; } - if( nodes.length==1 ) { - if( nodes[0]==node ) { + if (nodes.length==1) { + if (nodes[0]==node) { map.remove(host); return true; } else { @@ -3560,11 +3562,11 @@ //rare case int i=0; for(; i= 0; } @@ -3755,10 +3757,10 @@ * Enter safe mode. */ void enter() { - if( reached != 0 ) + if (reached != 0) NameNode.stateChangeLog.info( "STATE* SafeModeInfo.enter: " + "Safe mode is ON.\n" - + getTurnOffTip() ); + + getTurnOffTip()); this.reached = 0; } @@ -3766,16 +3768,16 @@ * Leave safe mode. */ synchronized void leave() { - if( reached >= 0 ) + if (reached >= 0) NameNode.stateChangeLog.info( - "STATE* SafeModeInfo.leave: " + "Safe mode is OFF." ); + "STATE* SafeModeInfo.leave: " + "Safe mode is OFF."); reached = -1; safeMode = null; NameNode.stateChangeLog.info("STATE* Network topology has " +clusterMap.getNumOfRacks()+" racks and " +clusterMap.getNumOfLeaves()+ " datanodes"); NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has " - +neededReplications.size()+" blocks" ); + +neededReplications.size()+" blocks"); } /** @@ -3785,11 +3787,11 @@ * @return true if can leave or false otherwise. */ synchronized boolean canLeave() { - if( reached == 0 ) + if (reached == 0) return false; - if( now() - reached < extension ) + if (now() - reached < extension) return false; - return ! needEnter(); + return !needEnter(); } /** @@ -3805,24 +3807,24 @@ * to be compared with the threshold. */ private float getSafeBlockRatio() { - return ( blockTotal == 0 ? 1 : (float)blockSafe/blockTotal ); + return (blockTotal == 0 ? 1 : (float)blockSafe/blockTotal); } /** * Check and trigger safe mode if needed. */ private void checkMode() { - if( needEnter() ) { + if (needEnter()) { enter(); return; } // the threshold is reached - if( ! isOn() || // safe mode is off - extension <= 0 || threshold <= 0 ) { // don't need to wait + if (!isOn() || // safe mode is off + extension <= 0 || threshold <= 0) { // don't need to wait this.leave(); // just leave safe mode return; } - if( reached > 0 ) // threshold has already been reached before + if (reached > 0) // threshold has already been reached before return; // start monitor reached = now(); @@ -3833,7 +3835,7 @@ /** * Set total number of blocks. */ - synchronized void setBlockTotal( int total) { + synchronized void setBlockTotal(int total) { this.blockTotal = total; checkMode(); } @@ -3843,8 +3845,8 @@ * reached minimal replication. * @param replication current replication */ - synchronized void incrementSafeBlockCount( short replication ) { [... 351 lines stripped ...]