hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r547419 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/net/
Date Thu, 14 Jun 2007 21:53:07 GMT
Author: cutting
Date: Thu Jun 14 14:53:06 2007
New Revision: 547419

URL: http://svn.apache.org/viewvc?view=rev&rev=547419
Log:
HADOOP-1269.  Finer grained locking in HDFS namenode.  Contributed by Dhruba.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jun 14 14:53:06 2007
@@ -120,6 +120,9 @@
  38. HADOOP-1139.  Log HDFS block transitions at INFO level, to better
      enable diagnosis of problems.  (Dhruba Borthakur via cutting)
 
+ 39. HADOOP-1269.  Finer grained locking in HDFS namenode.
+     (Dhruba Borthakur via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jun 14 14:53:06
2007
@@ -1273,9 +1273,10 @@
                          " seconds");
               }
               try {
-                LOG.debug("NotReplicatedYetException sleeping " + src +
+                LOG.warn("NotReplicatedYetException sleeping " + src +
                           " retries left " + retries);
                 Thread.sleep(sleeptime);
+                sleeptime *= 2;
               } catch (InterruptedException ie) {
               }
             }                

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Jun 14 14:53:06 2007
@@ -289,22 +289,6 @@
       }
     }
     String errorMsg = null;
-    // verify build version
-    if (!nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) {
-      errorMsg = "Incompatible build versions: namenode BV = " 
-        + nsInfo.getBuildVersion() + "; datanode BV = "
-        + Storage.getBuildVersion();
-      LOG.fatal(errorMsg);
-      try {
-        namenode.errorReport(dnRegistration,
-                             DatanodeProtocol.NOTIFY, errorMsg);
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + getNameNodeAddr());
-      }
-      throw new IOException(errorMsg);
-    }
-    assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
-      "Data-node and name-node layout versions must be the same.";
     return nsInfo;
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jun 14 14:53:06
2007
@@ -429,28 +429,39 @@
    * @see ClientProtocol#open(String, long, long)
    * @see ClientProtocol#getBlockLocations(String, long, long)
    */
-  synchronized LocatedBlocks  getBlockLocations(String clientMachine,
-                                                String src, 
-                                                long offset, 
-                                                long length
-                                                ) throws IOException {
+  LocatedBlocks getBlockLocations(String clientMachine,
+                                  String src, 
+                                  long offset, 
+                                  long length
+                                  ) throws IOException {
     if (offset < 0) {
       throw new IOException("Negative offset is not supported. File: " + src );
     }
     if (length < 0) {
       throw new IOException("Negative length is not supported. File: " + src );
     }
-    return  getBlockLocations(clientMachine, 
-                              dir.getFileINode(src), 
-                              offset, length, Integer.MAX_VALUE);
+
+    DatanodeDescriptor client = null;
+    LocatedBlocks blocks =  getBlockLocations(dir.getFileINode(src), 
+                                              offset, length, 
+                                              Integer.MAX_VALUE);
+    if (blocks == null) {
+      return null;
+    }
+    client = host2DataNodeMap.getDatanodeByHost(clientMachine);
+    for (Iterator<LocatedBlock> it = blocks.getLocatedBlocks().iterator();
+         it.hasNext();) {
+      LocatedBlock block = (LocatedBlock) it.next();
+      clusterMap.sortByDistance(client, 
+                                (DatanodeDescriptor[])(block.getLocations()));
+    }
+    return blocks;
   }
   
-  private LocatedBlocks getBlockLocations(String clientMachine,
-                                          FSDirectory.INode inode, 
-                                          long offset, 
-                                          long length,
-                                          int nrBlocksToReturn
-                                          ) throws IOException {
+  private synchronized LocatedBlocks getBlockLocations(FSDirectory.INode inode, 
+                                                       long offset, 
+                                                       long length,
+                                                       int nrBlocksToReturn) {
     if(inode == null || inode.isDir()) {
       return null;
     }
@@ -479,8 +490,6 @@
     
     long endOff = offset + length;
     
-    DatanodeDescriptor client;
-    client = host2DataNodeMap.getDatanodeByHost(clientMachine);
     do {
       // get block locations
       int numNodes = blocksMap.numNodes(blocks[curBlk]);
@@ -491,7 +500,6 @@
             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
           machineSet[numNodes++] = it.next();
         }
-        clusterMap.sortByDistance(client, machineSet);
       }
       results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos));
       curPos += blocks[curBlk].getNumBytes();
@@ -585,7 +593,54 @@
    * @throws IOException if the filename is invalid
    *         {@link FSDirectory#isValidToCreate(UTF8)}.
    */
-  public synchronized LocatedBlock startFile(UTF8 src, 
+  public LocatedBlock startFile(UTF8 src, 
+                                UTF8 holder, 
+                                UTF8 clientMachine, 
+                                boolean overwrite,
+                                short replication,
+                                long blockSize
+                                ) throws IOException {
+
+    //
+    // Create file into pendingCreates and get the first blockId
+    //
+    Block newBlock = startFileInternal(src, holder, clientMachine,
+                                       overwrite, replication,
+                                       blockSize);
+
+    //
+    // Get the array of replication targets
+    //
+    try {
+      DatanodeDescriptor clientNode = 
+        host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
+      DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+                                                             clientNode, null, blockSize);
+      if (targets.length < this.minReplication) {
+        if (clusterMap.getNumOfLeaves() == 0) {
+          throw new IOException("Failed to create file " + src
+                                + " on client " + clientMachine
+                                + " because this cluster has no datanodes.");
+        }
+        throw new IOException("Failed to create file " + src
+                              + " on client " + clientMachine
+                              + " because there were not enough datanodes available. "
+                              + "Found " + targets.length
+                              + " datanodes but MIN_REPLICATION for the cluster is "
+                              + "configured to be "
+                              + this.minReplication
+                              + ".");
+      }
+      return new LocatedBlock(newBlock, targets, 0L);
+
+    } catch (IOException ie) {
+      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+                                   + ie.getMessage());
+      throw ie;
+    }
+  }
+
+  public synchronized Block startFileInternal(UTF8 src, 
                                              UTF8 holder, 
                                              UTF8 clientMachine, 
                                              boolean overwrite,
@@ -666,26 +721,8 @@
         }
       }
 
-      // Get the array of replication targets
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
-      DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
-                                                             clientNode, null, blockSize);
-      if (targets.length < this.minReplication) {
-        if (clusterMap.getNumOfLeaves() == 0) {
-          throw new IOException("Failed to create file "+src
-                                + " on client " + clientMachine
-                                + " because this cluster has no datanodes.");
-        }
-        throw new IOException("Failed to create file "+src
-                              + " on client " + clientMachine
-                              + " because there were not enough datanodes available. "
-                              + "Found " + targets.length
-                              + " datanodes but MIN_REPLICATION for the cluster is "
-                              + "configured to be "
-                              + this.minReplication
-                              + ".");
-      }
 
       // Reserve space for this pending file
       pendingCreates.put(src, 
@@ -709,9 +746,9 @@
         }
         lease.startedCreate(src);
       }
-
+      
       // Create first block
-      return new LocatedBlock(allocateBlock(src), targets, 0L);
+      return allocateBlock(src);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
                                    +ie.getMessage());
@@ -730,38 +767,52 @@
    * are replicated.  Will return an empty 2-elt array if we want the
    * client to "try again later".
    */
-  public synchronized LocatedBlock getAdditionalBlock(UTF8 src, 
-                                                      UTF8 clientName
-                                                      ) throws IOException {
+  public LocatedBlock getAdditionalBlock(UTF8 src, 
+                                         UTF8 clientName
+                                         ) throws IOException {
+    long fileLength, blockSize;
+    int replication;
+    DatanodeDescriptor clientNode = null;
+    Block newBlock = null;
+
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
                                   +src+" for "+clientName);
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot add block to " + src, safeMode);
-    FileUnderConstruction pendingFile = pendingCreates.get(src);
-    // make sure that we still have the lease on this file
-    if (pendingFile == null) {
-      throw new LeaseExpiredException("No lease on " + src);
-    }
-    if (!pendingFile.getClientName().equals(clientName)) {
-      throw new LeaseExpiredException("Lease mismatch on " + src + 
-                                      " owned by " + pendingFile.getClientName() + 
-                                      " and appended by " + clientName);
-    }
 
-    //
-    // If we fail this, bad things happen!
-    //
-    if (!checkFileProgress(pendingFile, false)) {
-      throw new NotReplicatedYetException("Not replicated yet:" + src);
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add block to " + src, safeMode);
+      }
+
+      //
+      // make sure that we still have the lease on this file
+      //
+      FileUnderConstruction pendingFile = pendingCreates.get(src);
+      if (pendingFile == null) {
+        throw new LeaseExpiredException("No lease on " + src);
+      }
+      if (!pendingFile.getClientName().equals(clientName)) {
+        throw new LeaseExpiredException("Lease mismatch on " + src + 
+                                        " owned by " + pendingFile.getClientName() + 
+                                        " and appended by " + clientName);
+      }
+
+      //
+      // If we fail this, bad things happen!
+      //
+      if (!checkFileProgress(pendingFile, false)) {
+        throw new NotReplicatedYetException("Not replicated yet:" + src);
+      }
+      fileLength = pendingFile.computeFileLength();
+      blockSize = pendingFile.getBlockSize();
+      clientNode = pendingFile.getClientNode();
+      replication = (int)pendingFile.getReplication();
+      newBlock = allocateBlock(src);
     }
 
-    // Get the array of replication targets
-    DatanodeDescriptor clientNode = pendingFile.getClientNode();
-    DatanodeDescriptor targets[] = replicator.chooseTarget(
-                                                           (int)(pendingFile.getReplication()),
+    DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
                                                            clientNode,
                                                            null,
-                                                           pendingFile.getBlockSize());
+                                                           blockSize);
     if (targets.length < this.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
@@ -769,9 +820,7 @@
     }
         
     // Create next block
-    return new LocatedBlock(allocateBlock(src), 
-                            targets, 
-                            pendingFile.computeFileLength());
+    return new LocatedBlock(newBlock, targets, fileLength);
   }
 
   /**
@@ -930,7 +979,7 @@
   /**
    * Allocate a block at the given pending filename
    */
-  synchronized Block allocateBlock(UTF8 src) {
+  private Block allocateBlock(UTF8 src) {
     Block b = null;
     do {
       b = new Block(FSNamesystem.randBlockId.nextLong(), 0);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java Thu Jun 14 14:53:06
2007
@@ -18,26 +18,34 @@
 package org.apache.hadoop.dfs;
 
 import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 class Host2NodesMap {
   private HashMap<String, DatanodeDescriptor[]> map
     = new HashMap<String, DatanodeDescriptor[]>();
   private Random r = new Random();
+  private ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
                       
   /** Check if node is already in the map. */
-  synchronized boolean contains(DatanodeDescriptor node) {
+  boolean contains(DatanodeDescriptor node) {
     if (node==null) {
       return false;
     }
       
     String host = node.getHost();
-    DatanodeDescriptor[] nodes = map.get(host);
-    if (nodes != null) {
-      for(DatanodeDescriptor containedNode:nodes) {
-        if (node==containedNode) {
-          return true;
+    hostmapLock.readLock().lock();
+    try {
+      DatanodeDescriptor[] nodes = map.get(host);
+      if (nodes != null) {
+        for(DatanodeDescriptor containedNode:nodes) {
+          if (node==containedNode) {
+            return true;
+          }
         }
       }
+    } finally {
+      hostmapLock.readLock().unlock();
     }
     return false;
   }
@@ -45,85 +53,101 @@
   /** add node to the map 
    * return true if the node is added; false otherwise.
    */
-  synchronized boolean add(DatanodeDescriptor node) {
-    if (node==null || contains(node)) {
-      return false;
-    }
+  boolean add(DatanodeDescriptor node) {
+    hostmapLock.writeLock().lock();
+    try {
+      if (node==null || contains(node)) {
+        return false;
+      }
       
-    String host = node.getHost();
-    DatanodeDescriptor[] nodes = map.get(host);
-    DatanodeDescriptor[] newNodes;
-    if (nodes==null) {
-      newNodes = new DatanodeDescriptor[1];
-      newNodes[0]=node;
-    } else { // rare case: more than one datanode on the host
-      newNodes = new DatanodeDescriptor[nodes.length+1];
-      System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
-      newNodes[nodes.length] = node;
+      String host = node.getHost();
+      DatanodeDescriptor[] nodes = map.get(host);
+      DatanodeDescriptor[] newNodes;
+      if (nodes==null) {
+        newNodes = new DatanodeDescriptor[1];
+        newNodes[0]=node;
+      } else { // rare case: more than one datanode on the host
+        newNodes = new DatanodeDescriptor[nodes.length+1];
+        System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
+        newNodes[nodes.length] = node;
+      }
+      map.put(host, newNodes);
+      return true;
+    } finally {
+      hostmapLock.writeLock().unlock();
     }
-    map.put(host, newNodes);
-    return true;
   }
     
   /** remove node from the map 
    * return true if the node is removed; false otherwise.
    */
-  synchronized boolean remove(DatanodeDescriptor node) {
+  boolean remove(DatanodeDescriptor node) {
     if (node==null) {
       return false;
     }
       
     String host = node.getHost();
-    DatanodeDescriptor[] nodes = map.get(host);
-    if (nodes==null) {
-      return false;
-    }
-    if (nodes.length==1) {
-      if (nodes[0]==node) {
-        map.remove(host);
-        return true;
-      } else {
+    hostmapLock.writeLock().lock();
+    try {
+
+      DatanodeDescriptor[] nodes = map.get(host);
+      if (nodes==null) {
         return false;
       }
-    }
-    //rare case
-    int i=0;
-    for(; i<nodes.length; i++) {
-      if (nodes[i]==node) {
-        break;
+      if (nodes.length==1) {
+        if (nodes[0]==node) {
+          map.remove(host);
+          return true;
+        } else {
+          return false;
+        }
       }
-    }
-    if (i==nodes.length) {
-      return false;
-    } else {
-      DatanodeDescriptor[] newNodes;
-      newNodes = new DatanodeDescriptor[nodes.length-1];
-      System.arraycopy(nodes, 0, newNodes, 0, i);
-      System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
-      map.put(host, newNodes);
-      return true;
+      //rare case
+      int i=0;
+      for(; i<nodes.length; i++) {
+        if (nodes[i]==node) {
+          break;
+        }
+      }
+      if (i==nodes.length) {
+        return false;
+      } else {
+        DatanodeDescriptor[] newNodes;
+        newNodes = new DatanodeDescriptor[nodes.length-1];
+        System.arraycopy(nodes, 0, newNodes, 0, i);
+        System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
+        map.put(host, newNodes);
+        return true;
+      }
+    } finally {
+      hostmapLock.writeLock().unlock();
     }
   }
     
   /** get a data node by its host.
    * @return DatanodeDescriptor if found; otherwise null.
    */
-  synchronized DatanodeDescriptor getDatanodeByHost(String host) {
+  DatanodeDescriptor getDatanodeByHost(String host) {
     if (host==null) {
       return null;
     }
       
-    DatanodeDescriptor[] nodes = map.get(host);
-    // no entry
-    if (nodes== null) {
-      return null;
-    }
-    // one node
-    if (nodes.length == 1) {
-      return nodes[0];
+    hostmapLock.readLock().lock();
+    try {
+      DatanodeDescriptor[] nodes = map.get(host);
+      // no entry
+      if (nodes== null) {
+        return null;
+      }
+      // one node
+      if (nodes.length == 1) {
+        return nodes[0];
+      }
+      // more than one node
+      return nodes[r.nextInt(nodes.length)];
+    } finally {
+      hostmapLock.readLock().unlock();
     }
-    // more than one node
-    return nodes[r.nextInt(nodes.length)];
   }
     
   /**
@@ -144,16 +168,21 @@
       host = name.substring(0, colon);
     }
 
-    DatanodeDescriptor[] nodes = map.get(host);
-    // no entry
-    if (nodes== null) {
-      return null;
-    }
-    for(DatanodeDescriptor containedNode:nodes) {
-      if (name.equals(containedNode.getName())) {
-        return containedNode;
+    hostmapLock.readLock().lock();
+    try {
+      DatanodeDescriptor[] nodes = map.get(host);
+      // no entry
+      if (nodes== null) {
+        return null;
       }
+      for(DatanodeDescriptor containedNode:nodes) {
+        if (name.equals(containedNode.getName())) {
+          return containedNode;
+        }
+      }
+      return null;
+    } finally {
+      hostmapLock.readLock().unlock();
     }
-    return null;
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Thu Jun 14 14:53:06
2007
@@ -23,6 +23,8 @@
 import java.util.List;
 import java.util.Random;
 import java.util.Arrays;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -224,7 +226,7 @@
     } // end of remove
         
     /** Given a node's string representation, return a reference to the node */ 
-    Node getLoc(String loc) {
+    private Node getLoc(String loc) {
       if (loc == null || loc.length() == 0) return this;
             
       String[] path = loc.split(PATH_SEPARATOR_STR, 2);
@@ -300,8 +302,10 @@
     
   InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
   private int numOfRacks = 0;  // rack counter
+  private ReadWriteLock netlock;
     
   public NetworkTopology() {
+    netlock = new ReentrantReadWriteLock();
   }
     
   /** Add a data node
@@ -310,21 +314,26 @@
    *          data node to be added
    * @exception IllegalArgumentException if add a data node to a leave
    */
-  public synchronized void add(DatanodeDescriptor node) {
+  public void add(DatanodeDescriptor node) {
     if (node==null) return;
+    netlock.writeLock().lock();
     LOG.info("Adding a new node: "+node.getPath());
-    Node rack = getNode(node.getNetworkLocation());
-    if (rack != null && !(rack instanceof InnerNode)) {
-      throw new IllegalArgumentException("Unexpected data node " 
-                                         + node.toString() 
-                                         + " at an illegal network location");
-    }
-    if (clusterMap.add(node)) {
-      if (rack == null) {
-        numOfRacks++;
+    try {
+      Node rack = getNode(node.getNetworkLocation());
+      if (rack != null && !(rack instanceof InnerNode)) {
+        throw new IllegalArgumentException("Unexpected data node " 
+                                           + node.toString() 
+                                           + " at an illegal network location");
+      }
+      if (clusterMap.add(node)) {
+        if (rack == null) {
+          numOfRacks++;
+        }
       }
+      LOG.debug("NetworkTopology became:\n" + this.toString());
+    } finally {
+      netlock.writeLock().unlock();
     }
-    LOG.debug("NetworkTopology became:\n" + this.toString());
   }
     
   /** Remove a data node
@@ -332,16 +341,21 @@
    * @param node
    *          data node to be removed
    */ 
-  public synchronized void remove(DatanodeDescriptor node) {
+  public void remove(DatanodeDescriptor node) {
     if (node==null) return;
+    netlock.writeLock().lock();
     LOG.info("Removing a node: "+node.getPath());
-    if (clusterMap.remove(node)) {
-      InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
-      if (rack == null) {
-        numOfRacks--;
+    try {
+      if (clusterMap.remove(node)) {
+        InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
+        if (rack == null) {
+          numOfRacks--;
+        }
       }
+      LOG.debug("NetworkTopology became:\n" + this.toString());
+    } finally {
+      netlock.writeLock().unlock();
     }
-    LOG.debug("NetworkTopology became:\n" + this.toString());
   }
        
   /** Check if the tree contains data node <i>node</i>
@@ -350,13 +364,18 @@
    *          a data node
    * @return true if <i>node</i> is already in the tree; false otherwise
    */
-  public synchronized boolean contains(DatanodeDescriptor node) {
+  public boolean contains(DatanodeDescriptor node) {
     if (node == null) return false;
-    Node parent = node.getParent();
-    for(int level=node.getLevel(); parent!=null&&level>0;
-        parent=parent.getParent(), level--) {
-      if (parent == clusterMap)
-        return true;
+    netlock.readLock().lock();
+    try {
+      Node parent = node.getParent();
+      for(int level=node.getLevel(); parent!=null&&level>0;
+          parent=parent.getParent(), level--) {
+        if (parent == clusterMap)
+          return true;
+      }
+    } finally {
+      netlock.readLock().unlock();
     }
     return false; 
   }
@@ -367,7 +386,7 @@
    *          a path-like string representation of a node
    * @return a reference to the node; null if the node is not in the tree
    */
-  public synchronized Node getNode(String loc) {
+  private Node getNode(String loc) {
     loc = NodeBase.normalize(loc);
     if (!NodeBase.ROOT.equals(loc))
       loc = loc.substring(1);
@@ -375,13 +394,23 @@
   }
     
   /** Return the total number of racks */
-  public synchronized int getNumOfRacks() {
-    return numOfRacks;
+  public int getNumOfRacks() {
+    netlock.readLock().lock();
+    try {
+      return numOfRacks;
+    } finally {
+      netlock.readLock().unlock();
+    }
   }
     
   /** Return the total number of data nodes */
-  public synchronized int getNumOfLeaves() {
-    return clusterMap.getNumOfLeaves();
+  public int getNumOfLeaves() {
+    netlock.readLock().lock();
+    try {
+      return clusterMap.getNumOfLeaves();
+    } finally {
+      netlock.readLock().unlock();
+    }
   }
     
   /** Return the distance between two data nodes
@@ -397,24 +426,28 @@
     if (node1 == node2) {
       return 0;
     }
-    int i;
     Node n1=node1, n2=node2;
-    int level1=node1.getLevel(), level2=node2.getLevel();
     int dis = 0;
-    while(n1!=null && level1>level2) {
-      n1 = n1.getParent();
-      level1--;
-      dis++;
-    }
-    while(n2!=null && level2>level1) {
-      n2 = n2.getParent();
-      level2--;
-      dis++;
-    }
-    while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
-      n1=n1.getParent();
-      n2=n2.getParent();
-      dis+=2;
+    netlock.readLock().lock();
+    try {
+      int level1=node1.getLevel(), level2=node2.getLevel();
+      while(n1!=null && level1>level2) {
+        n1 = n1.getParent();
+        level1--;
+        dis++;
+      }
+      while(n2!=null && level2>level1) {
+        n2 = n2.getParent();
+        level2--;
+        dis++;
+      }
+      while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
+        n1=n1.getParent();
+        n2=n2.getParent();
+        dis+=2;
+      }
+    } finally {
+      netlock.readLock().unlock();
     }
     if (n1==null) {
       LOG.warn("The cluster does not contain data node: "+node1.getPath());
@@ -440,11 +473,16 @@
       return false;
     }
       
-    if (node1 == node2 || node1.equals(node2)) {
-      return true;
-    }
+    netlock.readLock().lock();
+    try {
+      if (node1 == node2 || node1.equals(node2)) {
+        return true;
+      }
         
-    return node1.getParent()==node2.getParent();
+      return node1.getParent()==node2.getParent();
+    } finally {
+      netlock.readLock().unlock();
+    }
   }
     
   final private static Random r = new Random();
@@ -455,10 +493,15 @@
    * @return the choosen data node
    */
   public DatanodeDescriptor chooseRandom(String scope) {
-    if (scope.startsWith("~")) {
-      return chooseRandom(NodeBase.ROOT, scope.substring(1));
-    } else {
-      return chooseRandom(scope, null);
+    netlock.readLock().lock();
+    try {
+      if (scope.startsWith("~")) {
+        return chooseRandom(NodeBase.ROOT, scope.substring(1));
+      } else {
+        return chooseRandom(scope, null);
+      }
+    } finally {
+      netlock.readLock().unlock();
     }
   }
     
@@ -507,22 +550,27 @@
     }
     scope = NodeBase.normalize(scope);
     int count=0; // the number of nodes in both scope & excludedNodes
-    for(DatanodeDescriptor node:excludedNodes) {
-      if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR).
-          startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
-        count++;
+    netlock.readLock().lock();
+    try {
+      for(DatanodeDescriptor node:excludedNodes) {
+        if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR).
+            startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
+          count++;
+        }
+      }
+      Node n=getNode(scope);
+      int scopeNodeCount=1;
+      if (n instanceof InnerNode) {
+        scopeNodeCount=((InnerNode)n).getNumOfLeaves();
+      }
+      if (isExcluded) {
+        return clusterMap.getNumOfLeaves()-
+          scopeNodeCount-excludedNodes.size()+count;
+      } else {
+        return scopeNodeCount-count;
       }
-    }
-    Node n=getNode(scope);
-    int scopeNodeCount=1;
-    if (n instanceof InnerNode) {
-      scopeNodeCount=((InnerNode)n).getNumOfLeaves();
-    }
-    if (isExcluded) {
-      return clusterMap.getNumOfLeaves()-
-        scopeNodeCount-excludedNodes.size()+count;
-    } else {
-      return scopeNodeCount-count;
+    } finally {
+      netlock.readLock().unlock();
     }
   }
     
@@ -549,21 +597,22 @@
   /* Set and used only inside sortByDistance. 
    * This saves an allocation each time we sort.
    */
-  private DatanodeDescriptor distFrom = null;
+  private static ThreadLocal<DatanodeDescriptor> distFrom = 
+    new ThreadLocal<DatanodeDescriptor>();
   private final Comparator<DatanodeDescriptor> nodeDistanceComparator = 
     new Comparator<DatanodeDescriptor>() {
       public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
-        return getDistance(distFrom, n1) - getDistance(distFrom, n2);
+        return getDistance(distFrom.get(), n1) - getDistance(distFrom.get(), n2);
       }
     };
       
   /** Sorts nodes array by their distances to <i>reader</i>. */
-  public synchronized void sortByDistance(final DatanodeDescriptor reader,
-                                          DatanodeDescriptor[] nodes) { 
+  public void sortByDistance(final DatanodeDescriptor reader,
+                             DatanodeDescriptor[] nodes) { 
     if (reader != null && contains(reader)) {
-      distFrom = reader;
+      distFrom.set(reader);
       Arrays.sort(nodes, nodeDistanceComparator);
-      distFrom = null;
+      distFrom.set(null);
     }
   }
 }



Mime
View raw message