hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r497541 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/Block.java src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java src/java/org/apache/hadoop/dfs/FSNamesystem.java
Date Thu, 18 Jan 2007 19:19:20 GMT
Author: cutting
Date: Thu Jan 18 11:19:19 2007
New Revision: 497541

URL: http://svn.apache.org/viewvc?view=rev&rev=497541
Log:
HADOOP-898.  Revert HADOOP-803, since it was causing problems (svn merge -r 496845:496844,
plus one dependent change).  Contributed by Nigel.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=497541&r1=497540&r2=497541
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 18 11:19:19 2007
@@ -17,9 +17,7 @@
  4. HADOOP-757.  Fix "Bad File Descriptor" exception in HDFS client
     when an output file is closed twice.  (Raghu Angadi via cutting)
 
- 5. HADOOP-803.  Reduce memory footprint of HDFS namenode by replacing
-    the TreeSet of block locations with an ArrayList.
-    (Raghu Angadi via cutting)
+ 5. [ intentionally blank ]
 
  6. HADOOP-890.  Replace dashes in metric names with underscores,
     for better compatibility with some monitoring systems.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=497541&r1=497540&r2=497541
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Thu Jan 18 11:19:19 2007
@@ -121,11 +121,18 @@
     // Comparable
     /////////////////////////////////////
     public int compareTo(Object o) {
-        long diff = getBlockId() - ((Block)o).getBlockId();
-        return ( diff < 0 ) ? -1 : ( ( diff > 0 ) ? 1 : 0 );
+        Block b = (Block) o;
+        if (getBlockId() < b.getBlockId()) {
+            return -1;
+        } else if (getBlockId() == b.getBlockId()) {
+            return 0;
+        } else {
+            return 1;
+        }
     }
     public boolean equals(Object o) {
-        return (this.compareTo(o) == 0);
+        Block b = (Block) o;
+        return (this.compareTo(b) == 0);
     }
     
     public int hashCode() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=497541&r1=497540&r2=497541
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Thu Jan 18
11:19:19 2007
@@ -34,7 +34,7 @@
  **************************************************/
 class DatanodeDescriptor extends DatanodeInfo {
 
-  private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>();
+  private volatile Collection<Block> blocks = new TreeSet<Block>();
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
@@ -60,12 +60,17 @@
 
   /**
    */
-  void addBlock(Block b) {
-      blocks.put(b, b);
+  void updateBlocks(Block newBlocks[]) {
+    blocks.clear();
+    for (int i = 0; i < newBlocks.length; i++) {
+      blocks.add(newBlocks[i]);
+    }
   }
-  
-  void removeBlock(Block b) {
-      blocks.remove(b);
+
+  /**
+   */
+  void addBlock(Block b) {
+    blocks.add(b);
   }
 
   void resetBlocks() {
@@ -89,14 +94,10 @@
   }
   
   Block[] getBlocks() {
-    return blocks.keySet().toArray(new Block[blocks.size()]);
+    return (Block[]) blocks.toArray(new Block[blocks.size()]);
   }
 
   Iterator<Block> getBlockIterator() {
-    return blocks.keySet().iterator();
-  }
-  
-  Block getBlock(long blockId) {
-      return blocks.get( new Block(blockId, 0) );
+    return blocks.iterator();
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=497541&r1=497540&r2=497541
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 18 11:19:19
2007
@@ -59,8 +59,8 @@
     // to client-sent information.
     // Mapping: Block -> TreeSet<DatanodeDescriptor>
     //
-    Map<Block, List<DatanodeDescriptor>> blocksMap = 
-                              new HashMap<Block, List<DatanodeDescriptor>>();
+    Map<Block, SortedSet<DatanodeDescriptor>> blocksMap = 
+                              new HashMap<Block, SortedSet<DatanodeDescriptor>>();
 
     /**
      * Stores the datanode -> block map.  
@@ -179,8 +179,6 @@
     private int maxReplicationStreams;
     // MIN_REPLICATION is how many copies we need in place or else we disallow the write
     private int minReplication;
-    // Default replication
-    private int defaultReplication;
     // heartbeatRecheckInterval is how often namenode checks for expired datanodes
     private long heartbeatRecheckInterval;
     // heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -201,7 +199,6 @@
                         int port,
                         NameNode nn, Configuration conf) throws IOException {
         fsNamesystemObject = this;
-        this.defaultReplication = conf.getInt("dfs.replication", 3);
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
         this.minReplication = conf.getInt("dfs.replication.min", 1);
         if( minReplication <= 0 )
@@ -302,7 +299,7 @@
             DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
 
             for (int i = 0; i < blocks.length; i++) {
-                List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+              SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 if (containingNodes == null) {
                     machineSets[i] = new DatanodeDescriptor[0];
                 } else {
@@ -663,16 +660,22 @@
         //
         // We have the pending blocks, but they won't have
         // length info in them (as they were allocated before
-        // data-write took place). Find the block stored in
-        // node descriptor.
+        // data-write took place).  So we need to add the correct
+        // length info to each
+        //
+        // REMIND - mjc - this is very inefficient!  We should
+        // improve this!
         //
         for (int i = 0; i < nrBlocks; i++) {
             Block b = pendingBlocks[i];
-            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
-            Block storedBlock = 
-                containingNodes.get(0).getBlock(b.getBlockId());
-            if ( storedBlock != null ) {
-                pendingBlocks[i] = storedBlock;
+            SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            DatanodeDescriptor node = containingNodes.first();
+            for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
+                Block cur = it.next();
+                if (b.getBlockId() == cur.getBlockId()) {
+                    b.setNumBytes(cur.getNumBytes());
+                    break;
+                }
             }
         }
         
@@ -713,7 +716,7 @@
         // Now that the file is real, we need to be sure to replicate
         // the blocks.
         for (int i = 0; i < nrBlocks; i++) {
-          List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
+          SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
           // filter out containingNodes that are marked for decommission.
           int numCurrentReplica = countContainingNodes(containingNodes);
 
@@ -758,7 +761,7 @@
 
         for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
             Block b = it.next();
-            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
             if (containingNodes == null || containingNodes.size() < this.minReplication)
{
                 return false;
             }
@@ -791,7 +794,7 @@
         throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
       }
 
-      List<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
+      Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
 
       // Check how many copies we have of the block.  If we have at least one
       // copy on a live node, then we can delete it. 
@@ -849,7 +852,7 @@
             for (int i = 0; i < deletedBlocks.length; i++) {
                 Block b = deletedBlocks[i];
 
-                List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+                SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
                 if (containingNodes != null) {
                     for (Iterator<DatanodeDescriptor> it = containingNodes.iterator();
it.hasNext(); ) {
                         DatanodeDescriptor node = it.next();
@@ -973,7 +976,7 @@
         } else {
           String hosts[][] = new String[(endBlock - startBlock) + 1][];
             for (int i = startBlock; i <= endBlock; i++) {
-                List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+              SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
                 Collection<String> v = new ArrayList<String>();
                 if (containingNodes != null) {
                   for (Iterator<DatanodeDescriptor> it =containingNodes.iterator();
it.hasNext();) {
@@ -1532,16 +1535,12 @@
         // between the old and new block report.
         //
         int newPos = 0;
+        boolean modified = false;
         Iterator<Block> iter = node.getBlockIterator();
         Block oldblk = iter.hasNext() ? iter.next() : null;
         Block newblk = (newReport != null && newReport.length > 0) ? 
                         newReport[0]	: null;
 
-        // common case is that most of the blocks from the datanode
-        // matches blocks in datanode descriptor.                
-        Collection<Block> toRemove = new LinkedList<Block>();
-        Collection<Block> toAdd = new LinkedList<Block>();
-        
         while (oldblk != null || newblk != null) {
            
             int cmp = (oldblk == null) ? 1 : 
@@ -1555,25 +1554,25 @@
                          ? newReport[newPos] : null;
             } else if (cmp < 0) {
                 // The old report has a block the new one does not
-                toRemove.add(oldblk);
                 removeStoredBlock(oldblk, node);
+                modified = true;
                 oldblk = iter.hasNext() ? iter.next() : null;
             } else {
                 // The new report has a block the old one does not
-                toAdd.add(addStoredBlock(newblk, node));
+                addStoredBlock(newblk, node);
+                modified = true;
                 newPos++;
                 newblk = (newPos < newReport.length)
                          ? newReport[newPos] : null;
             }
         }
-        
-        for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
-            node.removeBlock( i.next() );
-        }
-        for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
-            node.addBlock( i.next() );
+        //
+        // Modify node so it has the new blockreport
+        //
+        if (modified) {
+            node.updateBlocks(newReport);
         }
-        
+
         //
         // We've now completely updated the node's block report profile.
         // We now go through all its blocks and find which ones are invalid,
@@ -1602,25 +1601,12 @@
     /**
      * Modify (block-->datanode) map.  Remove block from set of 
      * needed replications if this takes care of the problem.
-     * @return the block that is stored in blockMap.
      */
-    synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
-        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+    synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
+      SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
         if (containingNodes == null) {
-            //Create an arraylist with the current replication factor
-            FSDirectory.INode inode = dir.getFileByBlock(block);
-            int replication = (inode != null) ? 
-                              inode.getReplication() : defaultReplication;
-            containingNodes = new ArrayList<DatanodeDescriptor>(replication);
+            containingNodes = new TreeSet<DatanodeDescriptor>();
             blocksMap.put(block, containingNodes);
-        } else {
-            Block storedBlock = 
-                containingNodes.get(0).getBlock(block.getBlockId());
-            // update stored block's length.
-            if ( block.getNumBytes() > 0 ) {
-                storedBlock.setNumBytes( block.getNumBytes() );
-            }
-            block = storedBlock;
         }
         if (! containingNodes.contains(node)) {
             containingNodes.add(node);
@@ -1642,7 +1628,7 @@
         synchronized (neededReplications) {
             FSDirectory.INode fileINode = dir.getFileByBlock(block);
             if( fileINode == null )  // block does not belong to any file
-                return block;
+                return;
 
             // filter out containingNodes that are marked for decommission.
             int numCurrentReplica = countContainingNodes(containingNodes);
@@ -1667,7 +1653,6 @@
 
             proccessOverReplicatedBlock( block, fileReplication );
         }
-        return block;
     }
     
     /**
@@ -1676,7 +1661,7 @@
      * mark them in the excessReplicateMap.
      */
     private void proccessOverReplicatedBlock( Block block, short replication ) {
-      List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+      SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
       if( containingNodes == null )
         return;
       Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
@@ -1756,7 +1741,7 @@
     synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
                 +block.getBlockName() + " from "+node.getName() );
-        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+        SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
         if (containingNodes == null || ! containingNodes.contains(node)) {
           NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
             +block.getBlockName()+" has already been removed from node "+node );
@@ -1815,9 +1800,14 @@
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
                 +block.getBlockName()+" is received from " + nodeID.getName() );
         //
-        // Modify the blocks->datanode map and node's map.
+        // Modify the blocks->datanode map
         // 
-        node.addBlock( addStoredBlock(block, node) );
+        addStoredBlock(block, node);
+
+        //
+        // Supplement node's blockreport
+        //
+        node.addBlock(block);
     }
 
     /**



Mime
View raw message