hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r508595 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/
Date Fri, 16 Feb 2007 21:37:04 GMT
Author: cutting
Date: Fri Feb 16 13:37:03 2007
New Revision: 508595

URL: http://svn.apache.org/viewvc?view=rev&rev=508595
Log:
HADOOP-803.  Reduce memory use by HDFS namenode, phase I.  Contributed by Raghu.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=508595&r1=508594&r2=508595
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 16 13:37:03 2007
@@ -56,6 +56,9 @@
 16. HADOOP-649.  Fix so that jobs with no tasks are not lost.
     (Thomas Friol via cutting)
 
+17. HADOOP-803.  Reduce memory use by HDFS namenode, phase I.
+    (Raghu Angadi via cutting)
+
 
 Branch 0.11 (unreleased)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=508595&r1=508594&r2=508595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Fri Feb 16 13:37:03 2007
@@ -122,17 +122,16 @@
     /////////////////////////////////////
     public int compareTo(Object o) {
         Block b = (Block) o;
-        if (getBlockId() < b.getBlockId()) {
+        if ( blkid < b.blkid ) {
             return -1;
-        } else if (getBlockId() == b.getBlockId()) {
+        } else if ( blkid == b.blkid ) {
             return 0;
         } else {
             return 1;
         }
     }
     public boolean equals(Object o) {
-        Block b = (Block) o;
-        return (this.compareTo(b) == 0);
+        return (this.compareTo(o) == 0);
     }
     
     public int hashCode() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=508595&r1=508594&r2=508595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Fri Feb 16
13:37:03 2007
@@ -37,7 +37,7 @@
  **************************************************/
 public class DatanodeDescriptor extends DatanodeInfo {
 
-  private volatile Collection<Block> blocks = new TreeSet<Block>();
+  private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>();
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
@@ -118,17 +118,12 @@
 
   /**
    */
-  void updateBlocks(Block newBlocks[]) {
-    blocks.clear();
-    for (int i = 0; i < newBlocks.length; i++) {
-      blocks.add(newBlocks[i]);
-    }
-  }
-
-  /**
-   */
   void addBlock(Block b) {
-    blocks.add(b);
+      blocks.put(b, b);
+  }
+  
+  void removeBlock(Block b) {
+      blocks.remove(b);
   }
 
   void resetBlocks() {
@@ -152,13 +147,21 @@
   }
   
   Block[] getBlocks() {
-    return (Block[]) blocks.toArray(new Block[blocks.size()]);
+    return (Block[]) blocks.keySet().toArray(new Block[blocks.size()]);
   }
 
   Iterator<Block> getBlockIterator() {
-    return blocks.iterator();
+    return blocks.keySet().iterator();
   }
-
+  
+  Block getBlock(long blockId) {
+    return blocks.get( new Block(blockId, 0) );
+  }
+  
+  Block getBlock(Block b) {
+    return blocks.get(b);
+  }
+  
   /*
    * Store block replication work.
    */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=508595&r1=508594&r2=508595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Feb 16 13:37:03
2007
@@ -49,7 +49,7 @@
     class INode {
         private String name;
         private INode parent;
-        private TreeMap children = new TreeMap();
+        private TreeMap<String, INode> children = null;
         private Block blocks[];
         private short blockReplication;
 
@@ -111,11 +111,19 @@
         }
 
         /**
-         * Get children 
-         * @return TreeMap of children
+         * Get children iterator
+         * @return Iterator of children
          */
-        TreeMap getChildren() {
-          return this.children;
+        Iterator<INode> getChildIterator() {
+          return ( children != null ) ?  children.values().iterator() : null;
+            // instead of null, we could return a static empty iterator.
+        }
+        
+        void addChild(String name, INode node) {
+          if ( children == null ) {
+            children = new TreeMap<String, INode>();
+          }
+          children.put(name, node);
         }
 
         /**
@@ -162,7 +170,7 @@
         }
         
         INode getChild( String name) {
-          return (INode) children.get( name );
+          return (children == null) ? null : children.get( name );
         }
 
         /**
@@ -197,7 +205,7 @@
             return null;
           }
           // insert into the parent children list
-          parentNode.children.put(name, newNode);
+          parentNode.addChild(name, newNode);
           newNode.parent = parentNode;
           return newNode;
         }
@@ -225,9 +233,9 @@
                 }
             }
             incrDeletedFileCount();
-            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
-                INode child = (INode) it.next();
-                child.collectSubtreeBlocks(v);
+            for (Iterator<INode> it = getChildIterator(); it != null &&
+                                                          it.hasNext(); ) {
+                it.next().collectSubtreeBlocks(v);
             }
         }
 
@@ -235,9 +243,9 @@
          */
         int numItemsInTree() {
             int total = 0;
-            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
-                INode child = (INode) it.next();
-                total += child.numItemsInTree();
+            for (Iterator<INode> it = getChildIterator(); it != null && 
+                                                          it.hasNext(); ) {
+                total += it.next().numItemsInTree();
             }
             return total + 1;
         }
@@ -268,9 +276,9 @@
          */
         long computeContentsLength() {
             long total = computeFileLength();
-            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
-                INode child = (INode) it.next();
-                total += child.computeContentsLength();
+            for (Iterator<INode> it = getChildIterator(); it != null && 
+                                                          it.hasNext(); ) {
+                total += it.next().computeContentsLength();
             }
             return total;
         }
@@ -294,9 +302,9 @@
                 v.add(this);
             }
 
-            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
-                INode child = (INode) it.next();
-                v.add(child);
+            for (Iterator<INode> it = getChildIterator(); it != null && 
+                                                          it.hasNext(); ) {
+                v.add(it.next());
             }
         }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=508595&r1=508594&r2=508595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Fri Feb 16 13:37:03 2007
@@ -405,9 +405,9 @@
           root.getBlocks()[i].write(out);
       }
     }
-    for(Iterator it = root.getChildren().values().iterator(); it.hasNext(); ) {
-      INode child = (INode) it.next();
-      saveImage( fullName, child, out );
+    for(Iterator<INode> it = root.getChildIterator(); it != null &&
+                                                      it.hasNext(); ) {
+      saveImage( fullName, it.next(), out );
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=508595&r1=508594&r2=508595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Feb 16 13:37:03
2007
@@ -61,8 +61,8 @@
     // to client-sent information.
     // Mapping: Block -> TreeSet<DatanodeDescriptor>
     //
-    Map<Block, SortedSet<DatanodeDescriptor>> blocksMap = 
-                              new HashMap<Block, SortedSet<DatanodeDescriptor>>();
+    Map<Block, List<DatanodeDescriptor>> blocksMap = 
+                              new HashMap<Block, List<DatanodeDescriptor>>();
 
     /**
      * Stores the datanode -> block map.  
@@ -182,6 +182,8 @@
     private int maxReplicationStreams;
     // MIN_REPLICATION is how many copies we need in place or else we disallow the write
     private int minReplication;
+    // Default replication
+    private int defaultReplication;
     // heartbeatRecheckInterval is how often namenode checks for expired datanodes
     private long heartbeatRecheckInterval;
     // heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -211,6 +213,7 @@
                         int port,
                         NameNode nn, Configuration conf) throws IOException {
         fsNamesystemObject = this;
+        this.defaultReplication = conf.getInt("dfs.replication", 3);
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
         this.minReplication = conf.getInt("dfs.replication.min", 1);
         if( minReplication <= 0 )
@@ -524,7 +527,7 @@
             DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
 
             for (int i = 0; i < blocks.length; i++) {
-              SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 if (containingNodes == null) {
                     machineSets[i] = new DatanodeDescriptor[0];
                 } else {
@@ -889,22 +892,16 @@
         //
         // We have the pending blocks, but they won't have
         // length info in them (as they were allocated before
-        // data-write took place).  So we need to add the correct
-        // length info to each
-        //
-        // REMIND - mjc - this is very inefficient!  We should
-        // improve this!
+        // data-write took place). Find the block stored in
+        // node descriptor.
         //
         for (int i = 0; i < nrBlocks; i++) {
             Block b = pendingBlocks[i];
-            SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
-            DatanodeDescriptor node = containingNodes.first();
-            for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
-                Block cur = it.next();
-                if (b.getBlockId() == cur.getBlockId()) {
-                    b.setNumBytes(cur.getNumBytes());
-                    break;
-                }
+            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            Block storedBlock = 
+                containingNodes.get(0).getBlock(b);
+            if ( storedBlock != null ) {
+                pendingBlocks[i] = storedBlock;
             }
         }
         
@@ -946,7 +943,7 @@
         // the blocks.
         int numExpectedReplicas = pendingFile.getReplication();
         for (int i = 0; i < nrBlocks; i++) {
-          SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
+          Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
           // filter out containingNodes that are marked for decommission.
           int numCurrentReplica = countContainingNodes(containingNodes);
 
@@ -986,7 +983,7 @@
 
         for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
             Block b = it.next();
-            SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
             if (containingNodes == null || containingNodes.size() < this.minReplication)
{
                 return false;
             }
@@ -1077,7 +1074,7 @@
             for (int i = 0; i < deletedBlocks.length; i++) {
                 Block b = deletedBlocks[i];
 
-                SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
                 if (containingNodes != null) {
                     for (Iterator<DatanodeDescriptor> it = containingNodes.iterator();
it.hasNext(); ) {
                         DatanodeDescriptor node = it.next();
@@ -1201,7 +1198,7 @@
         } else {
           String hosts[][] = new String[(endBlock - startBlock) + 1][];
             for (int i = startBlock; i <= endBlock; i++) {
-              SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 Collection<String> v = new ArrayList<String>();
                 if (containingNodes != null) {
                   for (Iterator<DatanodeDescriptor> it =containingNodes.iterator();
it.hasNext();) {
@@ -1924,12 +1921,16 @@
         // between the old and new block report.
         //
         int newPos = 0;
-        boolean modified = false;
         Iterator<Block> iter = node.getBlockIterator();
         Block oldblk = iter.hasNext() ? iter.next() : null;
         Block newblk = (newReport != null && newReport.length > 0) ? 
                         newReport[0]	: null;
 
+        // common case is that most of the blocks from the datanode
+        // matches blocks in datanode descriptor.                
+        Collection<Block> toRemove = new LinkedList<Block>();
+        Collection<Block> toAdd = new LinkedList<Block>();
+        
         while (oldblk != null || newblk != null) {
            
             int cmp = (oldblk == null) ? 1 : 
@@ -1943,25 +1944,27 @@
                          ? newReport[newPos] : null;
             } else if (cmp < 0) {
                 // The old report has a block the new one does not
-                removeStoredBlock(oldblk, node);
-                modified = true;
+                toRemove.add(oldblk);
                 oldblk = iter.hasNext() ? iter.next() : null;
             } else {
                 // The new report has a block the old one does not
-                addStoredBlock(newblk, node);
-                modified = true;
+                toAdd.add(newblk);
                 newPos++;
                 newblk = (newPos < newReport.length)
                          ? newReport[newPos] : null;
             }
         }
-        //
-        // Modify node so it has the new blockreport
-        //
-        if (modified) {
-            node.updateBlocks(newReport);
+        
+        for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
+            Block b = i.next();
+            removeStoredBlock( b, node );
+            node.removeBlock( b );
+        }
+        for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
+            Block b = i.next();
+            node.addBlock( addStoredBlock(b, node) );
         }
-
+        
         //
         // We've now completely updated the node's block report profile.
         // We now go through all its blocks and find which ones are invalid,
@@ -1990,12 +1993,27 @@
     /**
      * Modify (block-->datanode) map.  Remove block from set of 
      * needed replications if this takes care of the problem.
+     * @return the block that is stored in blockMap.
      */
-    synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
-      SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+    synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
+        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
         if (containingNodes == null) {
-            containingNodes = new TreeSet<DatanodeDescriptor>();
+            //Create an arraylist with the current replication factor
+            FSDirectory.INode inode = dir.getFileByBlock(block);
+            int replication = (inode != null) ? 
+                              inode.getReplication() : defaultReplication;
+            containingNodes = new ArrayList<DatanodeDescriptor>(replication);
             blocksMap.put(block, containingNodes);
+        } else {
+            Block storedBlock = 
+                containingNodes.get(0).getBlock(block);
+            // update stored block's length.
+            if ( storedBlock != null ) {
+                if ( block.getNumBytes() > 0 ) {
+                    storedBlock.setNumBytes( block.getNumBytes() );
+                }
+                block = storedBlock;
+            }
         }
         int curReplicaDelta = 0;
         if (! containingNodes.contains(node)) {
@@ -2018,7 +2036,7 @@
 
         FSDirectory.INode fileINode = dir.getFileByBlock(block);
         if( fileINode == null )  // block does not belong to any file
-            return;
+            return block;
         
         // filter out containingNodes that are marked for decommission.
         int numCurrentReplica = countContainingNodes(containingNodes);
@@ -2036,6 +2054,7 @@
             pendingReplications.remove(block);
         }        
         proccessOverReplicatedBlock( block, fileReplication );
+        return block;
     }
     
     /**
@@ -2044,7 +2063,7 @@
      * mark them in the excessReplicateMap.
      */
     private void proccessOverReplicatedBlock( Block block, short replication ) {
-      SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+      Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
       if( containingNodes == null )
         return;
       Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
@@ -2124,7 +2143,7 @@
     synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
                 +block.getBlockName() + " from "+node.getName() );
-        SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+        Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
         if (containingNodes == null || ! containingNodes.contains(node)) {
           NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
             +block.getBlockName()+" has already been removed from node "+node );
@@ -2182,14 +2201,9 @@
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
                 +block.getBlockName()+" is received from " + nodeID.getName() );
         //
-        // Modify the blocks->datanode map
+        // Modify the blocks->datanode map and node's map.
         // 
-        addStoredBlock(block, node);
-
-        //
-        // Supplement node's blockreport
-        //
-        node.addBlock(block);
+        node.addBlock( addStoredBlock(block, node) );
     }
 
     /**



Mime
View raw message