Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 21124 invoked from network); 16 Feb 2007 21:37:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Feb 2007 21:37:26 -0000 Received: (qmail 37220 invoked by uid 500); 16 Feb 2007 21:37:33 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 37196 invoked by uid 500); 16 Feb 2007 21:37:33 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 37184 invoked by uid 99); 16 Feb 2007 21:37:33 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Feb 2007 13:37:33 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Feb 2007 13:37:25 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 5E5481A981A; Fri, 16 Feb 2007 13:37:04 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r508595 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ Date: Fri, 16 Feb 2007 21:37:04 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070216213704.5E5481A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Fri Feb 16 13:37:03 2007 New Revision: 508595 URL: http://svn.apache.org/viewvc?view=rev&rev=508595 Log: HADOOP-803. Reduce memory use by HDFS namenode, phase I. Contributed by Raghu. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 16 13:37:03 2007 @@ -56,6 +56,9 @@ 16. HADOOP-649. Fix so that jobs with no tasks are not lost. (Thomas Friol via cutting) +17. HADOOP-803. Reduce memory use by HDFS namenode, phase I. + (Raghu Angadi via cutting) + Branch 0.11 (unreleased) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Fri Feb 16 13:37:03 2007 @@ -122,17 +122,16 @@ ///////////////////////////////////// public int compareTo(Object o) { Block b = (Block) o; - if (getBlockId() < b.getBlockId()) { + if ( blkid < b.blkid ) { return -1; - } else if (getBlockId() == b.getBlockId()) { + } else if ( blkid == b.blkid ) { return 0; } else { return 1; } } public boolean equals(Object o) { - Block b = (Block) o; - return (this.compareTo(b) == 0); + return (this.compareTo(o) == 0); } public int hashCode() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Fri Feb 16 13:37:03 2007 @@ -37,7 +37,7 @@ **************************************************/ public class DatanodeDescriptor extends DatanodeInfo { - private volatile Collection blocks = new TreeSet(); + private volatile SortedMap blocks = new TreeMap(); // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist protected boolean isAlive = false; @@ -118,17 +118,12 @@ /** */ - void updateBlocks(Block newBlocks[]) { - blocks.clear(); - for (int i = 0; i < newBlocks.length; i++) { - blocks.add(newBlocks[i]); - } - } - - /** - */ void addBlock(Block b) { - blocks.add(b); + blocks.put(b, b); + } + + void removeBlock(Block b) { + blocks.remove(b); } void resetBlocks() { @@ -152,13 +147,21 @@ } Block[] getBlocks() { - return (Block[]) blocks.toArray(new Block[blocks.size()]); + return (Block[]) blocks.keySet().toArray(new Block[blocks.size()]); } Iterator getBlockIterator() { - return blocks.iterator(); + return blocks.keySet().iterator(); } - + + Block getBlock(long blockId) { + return blocks.get( new Block(blockId, 0) ); + } + + Block getBlock(Block b) { + return blocks.get(b); + } + /* * Store block replication work. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Feb 16 13:37:03 2007 @@ -49,7 +49,7 @@ class INode { private String name; private INode parent; - private TreeMap children = new TreeMap(); + private TreeMap children = null; private Block blocks[]; private short blockReplication; @@ -111,11 +111,19 @@ } /** - * Get children - * @return TreeMap of children + * Get children iterator + * @return Iterator of children */ - TreeMap getChildren() { - return this.children; + Iterator getChildIterator() { + return ( children != null ) ? children.values().iterator() : null; + // instead of null, we could return a static empty iterator. + } + + void addChild(String name, INode node) { + if ( children == null ) { + children = new TreeMap(); + } + children.put(name, node); } /** @@ -162,7 +170,7 @@ } INode getChild( String name) { - return (INode) children.get( name ); + return (children == null) ? null : children.get( name ); } /** @@ -197,7 +205,7 @@ return null; } // insert into the parent children list - parentNode.children.put(name, newNode); + parentNode.addChild(name, newNode); newNode.parent = parentNode; return newNode; } @@ -225,9 +233,9 @@ } } incrDeletedFileCount(); - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - child.collectSubtreeBlocks(v); + for (Iterator it = getChildIterator(); it != null && + it.hasNext(); ) { + it.next().collectSubtreeBlocks(v); } } @@ -235,9 +243,9 @@ */ int numItemsInTree() { int total = 0; - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - total += child.numItemsInTree(); + for (Iterator it = getChildIterator(); it != null && + it.hasNext(); ) { + total += it.next().numItemsInTree(); } return total + 1; } @@ -268,9 +276,9 @@ */ long computeContentsLength() { long total = computeFileLength(); - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - total += child.computeContentsLength(); + for (Iterator it = getChildIterator(); it != null && + it.hasNext(); ) { + total += it.next().computeContentsLength(); } return total; } @@ -294,9 +302,9 @@ v.add(this); } - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - v.add(child); + for (Iterator it = getChildIterator(); it != null && + it.hasNext(); ) { + v.add(it.next()); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Fri Feb 16 13:37:03 2007 @@ -405,9 +405,9 @@ root.getBlocks()[i].write(out); } } - for(Iterator it = root.getChildren().values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - saveImage( fullName, child, out ); + for(Iterator it = root.getChildIterator(); it != null && + it.hasNext(); ) { + saveImage( fullName, it.next(), out ); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Feb 16 13:37:03 2007 @@ -61,8 +61,8 @@ // to client-sent information. // Mapping: Block -> TreeSet // - Map> blocksMap = - new HashMap>(); + Map> blocksMap = + new HashMap>(); /** * Stores the datanode -> block map. @@ -182,6 +182,8 @@ private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; + // Default replication + private int defaultReplication; // heartbeatRecheckInterval is how often namenode checks for expired datanodes private long heartbeatRecheckInterval; // heartbeatExpireInterval is how long namenode waits for datanode to report @@ -211,6 +213,7 @@ int port, NameNode nn, Configuration conf) throws IOException { fsNamesystemObject = this; + this.defaultReplication = conf.getInt("dfs.replication", 3); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); if( minReplication <= 0 ) @@ -524,7 +527,7 @@ DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][]; for (int i = 0; i < blocks.length; i++) { - SortedSet containingNodes = blocksMap.get(blocks[i]); + Collection containingNodes = blocksMap.get(blocks[i]); if (containingNodes == null) { machineSets[i] = new DatanodeDescriptor[0]; } else { @@ -889,22 +892,16 @@ // // We have the pending blocks, but they won't have // length info in them (as they were allocated before - // data-write took place). So we need to add the correct - // length info to each - // - // REMIND - mjc - this is very inefficient! We should - // improve this! + // data-write took place). Find the block stored in + // node descriptor. // for (int i = 0; i < nrBlocks; i++) { Block b = pendingBlocks[i]; - SortedSet containingNodes = blocksMap.get(b); - DatanodeDescriptor node = containingNodes.first(); - for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { - Block cur = it.next(); - if (b.getBlockId() == cur.getBlockId()) { - b.setNumBytes(cur.getNumBytes()); - break; - } + List containingNodes = blocksMap.get(b); + Block storedBlock = + containingNodes.get(0).getBlock(b); + if ( storedBlock != null ) { + pendingBlocks[i] = storedBlock; } } @@ -946,7 +943,7 @@ // the blocks. int numExpectedReplicas = pendingFile.getReplication(); for (int i = 0; i < nrBlocks; i++) { - SortedSet containingNodes = blocksMap.get(pendingBlocks[i]); + Collection containingNodes = blocksMap.get(pendingBlocks[i]); // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -986,7 +983,7 @@ for (Iterator it = v.getBlocks().iterator(); it.hasNext(); ) { Block b = it.next(); - SortedSet containingNodes = blocksMap.get(b); + Collection containingNodes = blocksMap.get(b); if (containingNodes == null || containingNodes.size() < this.minReplication) { return false; } @@ -1077,7 +1074,7 @@ for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; - SortedSet containingNodes = blocksMap.get(b); + Collection containingNodes = blocksMap.get(b); if (containingNodes != null) { for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor node = it.next(); @@ -1201,7 +1198,7 @@ } else { String hosts[][] = new String[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { - SortedSet containingNodes = blocksMap.get(blocks[i]); + Collection containingNodes = blocksMap.get(blocks[i]); Collection v = new ArrayList(); if (containingNodes != null) { for (Iterator it =containingNodes.iterator(); it.hasNext();) { @@ -1924,12 +1921,16 @@ // between the old and new block report. // int newPos = 0; - boolean modified = false; Iterator iter = node.getBlockIterator(); Block oldblk = iter.hasNext() ? iter.next() : null; Block newblk = (newReport != null && newReport.length > 0) ? newReport[0] : null; + // common case is that most of the blocks from the datanode + // matches blocks in datanode descriptor. + Collection toRemove = new LinkedList(); + Collection toAdd = new LinkedList(); + while (oldblk != null || newblk != null) { int cmp = (oldblk == null) ? 1 : @@ -1943,25 +1944,27 @@ ? newReport[newPos] : null; } else if (cmp < 0) { // The old report has a block the new one does not - removeStoredBlock(oldblk, node); - modified = true; + toRemove.add(oldblk); oldblk = iter.hasNext() ? iter.next() : null; } else { // The new report has a block the old one does not - addStoredBlock(newblk, node); - modified = true; + toAdd.add(newblk); newPos++; newblk = (newPos < newReport.length) ? newReport[newPos] : null; } } - // - // Modify node so it has the new blockreport - // - if (modified) { - node.updateBlocks(newReport); + + for ( Iterator i = toRemove.iterator(); i.hasNext(); ) { + Block b = i.next(); + removeStoredBlock( b, node ); + node.removeBlock( b ); + } + for ( Iterator i = toAdd.iterator(); i.hasNext(); ) { + Block b = i.next(); + node.addBlock( addStoredBlock(b, node) ); } - + // // We've now completely updated the node's block report profile. // We now go through all its blocks and find which ones are invalid, @@ -1990,12 +1993,27 @@ /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. + * @return the block that is stored in blockMap. */ - synchronized void addStoredBlock(Block block, DatanodeDescriptor node) { - SortedSet containingNodes = blocksMap.get(block); + synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) { + List containingNodes = blocksMap.get(block); if (containingNodes == null) { - containingNodes = new TreeSet(); + //Create an arraylist with the current replication factor + FSDirectory.INode inode = dir.getFileByBlock(block); + int replication = (inode != null) ? + inode.getReplication() : defaultReplication; + containingNodes = new ArrayList(replication); blocksMap.put(block, containingNodes); + } else { + Block storedBlock = + containingNodes.get(0).getBlock(block); + // update stored block's length. + if ( storedBlock != null ) { + if ( block.getNumBytes() > 0 ) { + storedBlock.setNumBytes( block.getNumBytes() ); + } + block = storedBlock; + } } int curReplicaDelta = 0; if (! containingNodes.contains(node)) { @@ -2018,7 +2036,7 @@ FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) // block does not belong to any file - return; + return block; // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -2036,6 +2054,7 @@ pendingReplications.remove(block); } proccessOverReplicatedBlock( block, fileReplication ); + return block; } /** @@ -2044,7 +2063,7 @@ * mark them in the excessReplicateMap. */ private void proccessOverReplicatedBlock( Block block, short replication ) { - SortedSet containingNodes = blocksMap.get(block); + Collection containingNodes = blocksMap.get(block); if( containingNodes == null ) return; Collection nonExcess = new ArrayList(); @@ -2124,7 +2143,7 @@ synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName() + " from "+node.getName() ); - SortedSet containingNodes = blocksMap.get(block); + Collection containingNodes = blocksMap.get(block); if (containingNodes == null || ! containingNodes.contains(node)) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" has already been removed from node "+node ); @@ -2182,14 +2201,9 @@ NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block.getBlockName()+" is received from " + nodeID.getName() ); // - // Modify the blocks->datanode map + // Modify the blocks->datanode map and node's map. // - addStoredBlock(block, node); - - // - // Supplement node's blockreport - // - node.addBlock(block); + node.addBlock( addStoredBlock(block, node) ); } /**