hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r529410 [9/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...
Date Mon, 16 Apr 2007 21:44:46 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java Mon Apr 16 14:44:35 2007
@@ -31,7 +31,7 @@
 
   public InconsistentFSStateException( File dir, String descr ) {
     super( "Directory " + getFilePath( dir )
-          + " is in an inconsistent state: " + descr );
+           + " is in an inconsistent state: " + descr );
   }
 
   public InconsistentFSStateException( File dir, String descr, Throwable ex ) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java Mon Apr 16 14:44:35 2007
@@ -35,8 +35,8 @@
                                     String ofWhat,
                                     int versionExpected ) {
     super( "Unexpected version " 
-        + (ofWhat==null ? "" : "of " + ofWhat) + ". Reported: "
-        + versionReported + ". Expecting = " + versionExpected + "." );
+           + (ofWhat==null ? "" : "of " + ofWhat) + ". Reported: "
+           + versionReported + ". Expecting = " + versionExpected + "." );
   }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java Mon Apr 16 14:44:35 2007
@@ -29,220 +29,220 @@
 import org.apache.hadoop.conf.*;
 
 public class JspHelper {
-    static FSNamesystem fsn = null;
-    static InetSocketAddress nameNodeAddr;
-    static Configuration conf = new Configuration();
-
-    static int defaultChunkSizeToView = 
-                        conf.getInt("dfs.default.chunk.view.size",32 * 1024);
-    static Random rand = new Random();
-
-    public JspHelper() {
-      if (DataNode.getDataNode() != null) {
-        nameNodeAddr = DataNode.getDataNode().getNameNodeAddr();
-      }
-      else {
-        fsn = FSNamesystem.getFSNamesystem();
-        nameNodeAddr = new InetSocketAddress(fsn.getDFSNameNodeMachine(),
-                  fsn.getDFSNameNodePort()); 
-      }      
-    }
-    public DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
-      TreeSet deadNodes = new TreeSet();
-      DatanodeInfo chosenNode = null;
-      int failures = 0;
-      Socket s = null;
-      DatanodeInfo [] nodes = blk.getLocations();
-      if (nodes == null || nodes.length == 0) {
-        throw new IOException("No nodes contain this block");
-      }
-      while (s == null) {
-        if (chosenNode == null) {
-          do {
-            chosenNode = nodes[rand.nextInt(nodes.length)];
-          } while (deadNodes.contains(chosenNode));
-        }
-        int index = rand.nextInt(nodes.length);
-        chosenNode = nodes[index];
+  static FSNamesystem fsn = null;
+  static InetSocketAddress nameNodeAddr;
+  static Configuration conf = new Configuration();
+
+  static int defaultChunkSizeToView = 
+    conf.getInt("dfs.default.chunk.view.size",32 * 1024);
+  static Random rand = new Random();
+
+  public JspHelper() {
+    if (DataNode.getDataNode() != null) {
+      nameNodeAddr = DataNode.getDataNode().getNameNodeAddr();
+    }
+    else {
+      fsn = FSNamesystem.getFSNamesystem();
+      nameNodeAddr = new InetSocketAddress(fsn.getDFSNameNodeMachine(),
+                                           fsn.getDFSNameNodePort()); 
+    }      
+  }
+  public DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
+    TreeSet deadNodes = new TreeSet();
+    DatanodeInfo chosenNode = null;
+    int failures = 0;
+    Socket s = null;
+    DatanodeInfo [] nodes = blk.getLocations();
+    if (nodes == null || nodes.length == 0) {
+      throw new IOException("No nodes contain this block");
+    }
+    while (s == null) {
+      if (chosenNode == null) {
+        do {
+          chosenNode = nodes[rand.nextInt(nodes.length)];
+        } while (deadNodes.contains(chosenNode));
+      }
+      int index = rand.nextInt(nodes.length);
+      chosenNode = nodes[index];
 
-        //just ping to check whether the node is alive
-        InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getHost() + ":" + chosenNode.getInfoPort());
-        
-        try {
-          s = new Socket();
-          s.connect(targetAddr, FSConstants.READ_TIMEOUT);
-          s.setSoTimeout(FSConstants.READ_TIMEOUT);
-        } catch (IOException e) {
-          deadNodes.add(chosenNode);
-          s.close();
-          s = null;
-          failures++;
-        }
-        if (failures == nodes.length)
-          throw new IOException("Could not reach the block containing the data. Please try again");
+      //just ping to check whether the node is alive
+      InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getHost() + ":" + chosenNode.getInfoPort());
         
+      try {
+        s = new Socket();
+        s.connect(targetAddr, FSConstants.READ_TIMEOUT);
+        s.setSoTimeout(FSConstants.READ_TIMEOUT);
+      } catch (IOException e) {
+        deadNodes.add(chosenNode);
+        s.close();
+        s = null;
+        failures++;
       }
-      s.close();
-      return chosenNode;
+      if (failures == nodes.length)
+        throw new IOException("Could not reach the block containing the data. Please try again");
+        
     }
-    public void streamBlockInAscii(InetSocketAddress addr, long blockId, long blockSize, 
-            long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
-      throws IOException {
-      if (chunkSizeToView == 0) return;
-      Socket s = new Socket();
-      s.connect(addr, FSConstants.READ_TIMEOUT);
-      s.setSoTimeout(FSConstants.READ_TIMEOUT);
-      //
-      // Xmit header info to datanode
-      //
-      DataOutputStream os = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-      os.write(FSConstants.OP_READSKIP_BLOCK);
-      new Block(blockId, blockSize).write(os);
-      os.writeLong(offsetIntoBlock);
-      os.flush();
-
-      //
-      // Get bytes in block, set streams
-      //
-      DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-      long curBlockSize = in.readLong();
-      long amtSkipped = in.readLong();
-      if (curBlockSize != blockSize) {
-        throw new IOException("Recorded block size is " + blockSize + ", but datanode reports size of " + curBlockSize);
-      }
-      if (amtSkipped != offsetIntoBlock) {
-        throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
-      }
-      
-      long amtToRead = chunkSizeToView;
-      if (amtToRead + offsetIntoBlock > blockSize)
-        amtToRead = blockSize - offsetIntoBlock;
-      byte[] buf = new byte[(int)amtToRead];
-      int readOffset = 0;
-      int retries = 2;
-      while (true) {
-        int numRead;
-        try {
-          numRead = in.read(buf, readOffset, (int)amtToRead);
-        }
-        catch (IOException e) {
-          retries--;
-          if (retries == 0)
-            throw new IOException("Could not read data from datanode");
-          continue;
-        }
-        amtToRead -= numRead;
-        readOffset += numRead;
-        if (amtToRead == 0)
-          break;
-      }
-      s.close();
-      in.close();
-      out.print(new String(buf));
-    }
-    public void DFSNodesStatus( ArrayList<DatanodeDescriptor> live,
-                                ArrayList<DatanodeDescriptor> dead ) {
-        if ( fsn != null )
-            fsn.DFSNodesStatus(live, dead);
-    }
-    public void addTableHeader(JspWriter out) throws IOException {
-      out.print("<table border=\"1\""+
-                " cellpadding=\"2\" cellspacing=\"2\">");
-      out.print("<tbody>");
-    }
-    public void addTableRow(JspWriter out, String[] columns) throws IOException {
-      out.print("<tr>");
-      for (int i = 0; i < columns.length; i++) {
-        out.print("<td style=\"vertical-align: top;\"><B>"+columns[i]+"</B><br></td>");
-      }
-      out.print("</tr>");
+    s.close();
+    return chosenNode;
+  }
+  public void streamBlockInAscii(InetSocketAddress addr, long blockId, long blockSize, 
+                                 long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
+    throws IOException {
+    if (chunkSizeToView == 0) return;
+    Socket s = new Socket();
+    s.connect(addr, FSConstants.READ_TIMEOUT);
+    s.setSoTimeout(FSConstants.READ_TIMEOUT);
+    //
+    // Xmit header info to datanode
+    //
+    DataOutputStream os = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+    os.write(FSConstants.OP_READSKIP_BLOCK);
+    new Block(blockId, blockSize).write(os);
+    os.writeLong(offsetIntoBlock);
+    os.flush();
+
+    //
+    // Get bytes in block, set streams
+    //
+    DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+    long curBlockSize = in.readLong();
+    long amtSkipped = in.readLong();
+    if (curBlockSize != blockSize) {
+      throw new IOException("Recorded block size is " + blockSize + ", but datanode reports size of " + curBlockSize);
+    }
+    if (amtSkipped != offsetIntoBlock) {
+      throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
     }
-    public void addTableRow(JspWriter out, String[] columns, int row) throws IOException {
-      out.print("<tr>");
       
-      for (int i = 0; i < columns.length; i++) {
-        if( row/2*2 == row ) {//even
-          out.print("<td style=\"vertical-align: top;background-color:LightGrey;\"><B>"+columns[i]+"</B><br></td>");
-        } else {
-          out.print("<td style=\"vertical-align: top;background-color:LightBlue;\"><B>"+columns[i]+"</B><br></td>");
+    long amtToRead = chunkSizeToView;
+    if (amtToRead + offsetIntoBlock > blockSize)
+      amtToRead = blockSize - offsetIntoBlock;
+    byte[] buf = new byte[(int)amtToRead];
+    int readOffset = 0;
+    int retries = 2;
+    while (true) {
+      int numRead;
+      try {
+        numRead = in.read(buf, readOffset, (int)amtToRead);
+      }
+      catch (IOException e) {
+        retries--;
+        if (retries == 0)
+          throw new IOException("Could not read data from datanode");
+        continue;
+      }
+      amtToRead -= numRead;
+      readOffset += numRead;
+      if (amtToRead == 0)
+        break;
+    }
+    s.close();
+    in.close();
+    out.print(new String(buf));
+  }
+  public void DFSNodesStatus( ArrayList<DatanodeDescriptor> live,
+                              ArrayList<DatanodeDescriptor> dead ) {
+    if ( fsn != null )
+      fsn.DFSNodesStatus(live, dead);
+  }
+  public void addTableHeader(JspWriter out) throws IOException {
+    out.print("<table border=\"1\""+
+              " cellpadding=\"2\" cellspacing=\"2\">");
+    out.print("<tbody>");
+  }
+  public void addTableRow(JspWriter out, String[] columns) throws IOException {
+    out.print("<tr>");
+    for (int i = 0; i < columns.length; i++) {
+      out.print("<td style=\"vertical-align: top;\"><B>"+columns[i]+"</B><br></td>");
+    }
+    out.print("</tr>");
+  }
+  public void addTableRow(JspWriter out, String[] columns, int row) throws IOException {
+    out.print("<tr>");
+      
+    for (int i = 0; i < columns.length; i++) {
+      if( row/2*2 == row ) {//even
+        out.print("<td style=\"vertical-align: top;background-color:LightGrey;\"><B>"+columns[i]+"</B><br></td>");
+      } else {
+        out.print("<td style=\"vertical-align: top;background-color:LightBlue;\"><B>"+columns[i]+"</B><br></td>");
           
-        }
       }
-      out.print("</tr>");
-    }
-    public void addTableFooter(JspWriter out) throws IOException {
-      out.print("</tbody></table>");
-    }
-
-    public String getSafeModeText() {
-      if( ! fsn.isInSafeMode() )
-        return "";
-      return "Safe mode is ON. <em>" + fsn.getSafeModeTip() + "</em><br>";
     }
+    out.print("</tr>");
+  }
+  public void addTableFooter(JspWriter out) throws IOException {
+    out.print("</tbody></table>");
+  }
+
+  public String getSafeModeText() {
+    if( ! fsn.isInSafeMode() )
+      return "";
+    return "Safe mode is ON. <em>" + fsn.getSafeModeTip() + "</em><br>";
+  }
     
-    public void sortNodeList(ArrayList<DatanodeDescriptor> nodes,
-                             String field, String order) {
+  public void sortNodeList(ArrayList<DatanodeDescriptor> nodes,
+                           String field, String order) {
         
-        class NodeComapare implements Comparator<DatanodeDescriptor> {
-            static final int 
-                FIELD_NAME              = 1,
-                FIELD_LAST_CONTACT      = 2,
-                FIELD_BLOCKS            = 3,
-                FIELD_SIZE              = 4,
-                FIELD_DISK_USED         = 5,
-                SORT_ORDER_ASC          = 1,
-                SORT_ORDER_DSC          = 2;
+    class NodeComapare implements Comparator<DatanodeDescriptor> {
+      static final int 
+        FIELD_NAME              = 1,
+        FIELD_LAST_CONTACT      = 2,
+        FIELD_BLOCKS            = 3,
+        FIELD_SIZE              = 4,
+        FIELD_DISK_USED         = 5,
+        SORT_ORDER_ASC          = 1,
+        SORT_ORDER_DSC          = 2;
 
-            int sortField = FIELD_NAME;
-            int sortOrder = SORT_ORDER_ASC;
+      int sortField = FIELD_NAME;
+      int sortOrder = SORT_ORDER_ASC;
             
-            public NodeComapare(String field, String order) {
-                if ( field.equals( "lastcontact" ) ) {
-                    sortField = FIELD_LAST_CONTACT;
-                } else if ( field.equals( "size" ) ) {
-                    sortField = FIELD_SIZE;
-                } else if ( field.equals( "blocks" ) ) {
-                    sortField = FIELD_BLOCKS;
-                } else if ( field.equals( "pcused" ) ) {
-                    sortField = FIELD_DISK_USED;
-                } else {
-                    sortField = FIELD_NAME;
-                }
+      public NodeComapare(String field, String order) {
+        if ( field.equals( "lastcontact" ) ) {
+          sortField = FIELD_LAST_CONTACT;
+        } else if ( field.equals( "size" ) ) {
+          sortField = FIELD_SIZE;
+        } else if ( field.equals( "blocks" ) ) {
+          sortField = FIELD_BLOCKS;
+        } else if ( field.equals( "pcused" ) ) {
+          sortField = FIELD_DISK_USED;
+        } else {
+          sortField = FIELD_NAME;
+        }
                 
-                if ( order.equals("DSC") ) {
-                    sortOrder = SORT_ORDER_DSC;
-                } else {
-                    sortOrder = SORT_ORDER_ASC;
-                }
-            }
-
-            public int compare( DatanodeDescriptor d1,
-                                DatanodeDescriptor d2 ) {
-                int ret = 0;
-                switch ( sortField ) {
-                case FIELD_LAST_CONTACT:
-                    ret = (int) (d2.getLastUpdate() - d1.getLastUpdate());
-                    break;
-                case FIELD_BLOCKS:
-                    ret = d1.numBlocks() - d2.numBlocks();
-                    break;
-                case FIELD_SIZE:
-                    long  dlong = d1.getCapacity() - d2.getCapacity();
-                    ret = (dlong < 0) ? -1 : ( (dlong > 0) ? 1 : 0 );
-                    break;
-                case FIELD_DISK_USED:
-                    double ddbl =((d2.getRemaining()*1.0/d2.getCapacity())-
-                                  (d1.getRemaining()*1.0/d1.getCapacity()));
-                    ret = (ddbl < 0) ? -1 : ( (ddbl > 0) ? 1 : 0 );
-                    break;
-                case FIELD_NAME: 
-                    ret = d1.getHostName().compareTo(d2.getHostName());
-                    break;
-                }
-                return ( sortOrder == SORT_ORDER_DSC ) ? -ret : ret;
-            }
+        if ( order.equals("DSC") ) {
+          sortOrder = SORT_ORDER_DSC;
+        } else {
+          sortOrder = SORT_ORDER_ASC;
         }
-        
-        Collections.sort( nodes, new NodeComapare( field, order ) );
+      }
+
+      public int compare( DatanodeDescriptor d1,
+                          DatanodeDescriptor d2 ) {
+        int ret = 0;
+        switch ( sortField ) {
+        case FIELD_LAST_CONTACT:
+          ret = (int) (d2.getLastUpdate() - d1.getLastUpdate());
+          break;
+        case FIELD_BLOCKS:
+          ret = d1.numBlocks() - d2.numBlocks();
+          break;
+        case FIELD_SIZE:
+          long  dlong = d1.getCapacity() - d2.getCapacity();
+          ret = (dlong < 0) ? -1 : ( (dlong > 0) ? 1 : 0 );
+          break;
+        case FIELD_DISK_USED:
+          double ddbl =((d2.getRemaining()*1.0/d2.getCapacity())-
+                        (d1.getRemaining()*1.0/d1.getCapacity()));
+          ret = (ddbl < 0) ? -1 : ( (ddbl > 0) ? 1 : 0 );
+          break;
+        case FIELD_NAME: 
+          ret = d1.getHostName().compareTo(d2.getHostName());
+          break;
+        }
+        return ( sortOrder == SORT_ORDER_DSC ) ? -ret : ret;
+      }
     }
+        
+    Collections.sort( nodes, new NodeComapare( field, order ) );
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/LocatedBlock.java Mon Apr 16 14:44:35 2007
@@ -29,62 +29,62 @@
  ****************************************************/
 class LocatedBlock implements Writable {
 
-    static {                                      // register a ctor
-      WritableFactories.setFactory
-        (LocatedBlock.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new LocatedBlock(); }
-         });
-    }
-
-    Block b;
-    DatanodeInfo locs[];
-
-    /**
-     */
-    public LocatedBlock() {
-        this.b = new Block();
-        this.locs = new DatanodeInfo[0];
-    }
-
-    /**
-     */
-    public LocatedBlock(Block b, DatanodeInfo[] locs) {
-        this.b = b;
-        this.locs = locs;
-    }
-
-    /**
-     */
-    public Block getBlock() {
-        return b;
-    }
-
-    /**
-     */
-    DatanodeInfo[] getLocations() {
-        return locs;
-    }
-
-    ///////////////////////////////////////////
-    // Writable
-    ///////////////////////////////////////////
-    public void write(DataOutput out) throws IOException {
-        b.write(out);
-        out.writeInt(locs.length);
-        for (int i = 0; i < locs.length; i++) {
-            locs[i].write(out);
-        }
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        this.b = new Block();
-        b.readFields(in);
-        int count = in.readInt();
-        this.locs = new DatanodeInfo[count];
-        for (int i = 0; i < locs.length; i++) {
-            locs[i] = new DatanodeInfo();
-            locs[i].readFields(in);
-        }
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (LocatedBlock.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new LocatedBlock(); }
+       });
+  }
+
+  Block b;
+  DatanodeInfo locs[];
+
+  /**
+   */
+  public LocatedBlock() {
+    this.b = new Block();
+    this.locs = new DatanodeInfo[0];
+  }
+
+  /**
+   */
+  public LocatedBlock(Block b, DatanodeInfo[] locs) {
+    this.b = b;
+    this.locs = locs;
+  }
+
+  /**
+   */
+  public Block getBlock() {
+    return b;
+  }
+
+  /**
+   */
+  DatanodeInfo[] getLocations() {
+    return locs;
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    b.write(out);
+    out.writeInt(locs.length);
+    for (int i = 0; i < locs.length; i++) {
+      locs[i].write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.b = new Block();
+    b.readFields(in);
+    int count = in.readInt();
+    this.locs = new DatanodeInfo[count];
+    for (int i = 0; i < locs.length; i++) {
+      locs[i] = new DatanodeInfo();
+      locs[i].readFields(in);
     }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Apr 16 14:44:35 2007
@@ -69,751 +69,751 @@
  * @author Mike Cafarella
  **********************************************************/
 public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
-    public long getProtocolVersion(String protocol, 
-                                   long clientVersion) throws IOException { 
-      if (protocol.equals(ClientProtocol.class.getName())) {
-        return ClientProtocol.versionID; 
-      } else if (protocol.equals(DatanodeProtocol.class.getName())){
-        return DatanodeProtocol.versionID;
-      } else {
-        throw new IOException("Unknown protocol to name node: " + protocol);
-      }
-    }
-    
-    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
-    public static final Log stateChangeLog = LogFactory.getLog( "org.apache.hadoop.dfs.StateChange");
-
-    private FSNamesystem namesystem;
-    private Server server;
-    private Thread emptier;
-    private int handlerCount = 2;
-    
-    private InetSocketAddress nameNodeAddress = null;
-    
-    /** only used for testing purposes  */
-    private boolean stopRequested = false;
-
-    /** Format a new filesystem.  Destroys any filesystem that may already
-     * exist at this location.  **/
-    public static void format(Configuration conf) throws IOException {
-      format( conf, false );
-    }
-
-    private class NameNodeMetrics implements Updater {
-      private final MetricsRecord metricsRecord;
-      private int numFilesCreated = 0;
-      private int numFilesOpened = 0;
-      private int numFilesRenamed = 0;
-      private int numFilesListed = 0;
+  public long getProtocolVersion(String protocol, 
+                                 long clientVersion) throws IOException { 
+    if (protocol.equals(ClientProtocol.class.getName())) {
+      return ClientProtocol.versionID; 
+    } else if (protocol.equals(DatanodeProtocol.class.getName())){
+      return DatanodeProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
+    
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
+  public static final Log stateChangeLog = LogFactory.getLog( "org.apache.hadoop.dfs.StateChange");
+
+  private FSNamesystem namesystem;
+  private Server server;
+  private Thread emptier;
+  private int handlerCount = 2;
+    
+  private InetSocketAddress nameNodeAddress = null;
+    
+  /** only used for testing purposes  */
+  private boolean stopRequested = false;
+
+  /** Format a new filesystem.  Destroys any filesystem that may already
+   * exist at this location.  **/
+  public static void format(Configuration conf) throws IOException {
+    format( conf, false );
+  }
+
+  private class NameNodeMetrics implements Updater {
+    private final MetricsRecord metricsRecord;
+    private int numFilesCreated = 0;
+    private int numFilesOpened = 0;
+    private int numFilesRenamed = 0;
+    private int numFilesListed = 0;
       
-      NameNodeMetrics() {
-        MetricsContext metricsContext = MetricsUtil.getContext("dfs");
-        metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
-        metricsContext.registerUpdater(this);
-      }
+    NameNodeMetrics() {
+      MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+      metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
+      metricsContext.registerUpdater(this);
+    }
       
-      /**
-       * Since this object is a registered updater, this method will be called
-       * periodically, e.g. every 5 seconds.
-       */
-      public void doUpdates(MetricsContext unused) {
-        synchronized (this) {
-          metricsRecord.incrMetric("files_created", numFilesCreated);
-          metricsRecord.incrMetric("files_opened", numFilesOpened);
-          metricsRecord.incrMetric("files_renamed", numFilesRenamed);
-          metricsRecord.incrMetric("files_listed", numFilesListed);
+    /**
+     * Since this object is a registered updater, this method will be called
+     * periodically, e.g. every 5 seconds.
+     */
+    public void doUpdates(MetricsContext unused) {
+      synchronized (this) {
+        metricsRecord.incrMetric("files_created", numFilesCreated);
+        metricsRecord.incrMetric("files_opened", numFilesOpened);
+        metricsRecord.incrMetric("files_renamed", numFilesRenamed);
+        metricsRecord.incrMetric("files_listed", numFilesListed);
               
-          numFilesCreated = 0;
-          numFilesOpened = 0;
-          numFilesRenamed = 0;
-          numFilesListed = 0;
-        }
-        metricsRecord.update();
-      }
-      
-      synchronized void createFile() {
-        ++numFilesCreated;
+        numFilesCreated = 0;
+        numFilesOpened = 0;
+        numFilesRenamed = 0;
+        numFilesListed = 0;
       }
+      metricsRecord.update();
+    }
       
-      synchronized void openFile() {
-        ++numFilesOpened;
-      }
+    synchronized void createFile() {
+      ++numFilesCreated;
+    }
       
-      synchronized void renameFile() {
-        ++numFilesRenamed;
-      }
+    synchronized void openFile() {
+      ++numFilesOpened;
+    }
       
-      synchronized void listFile(int nfiles) {
-        numFilesListed += nfiles;
-      }
+    synchronized void renameFile() {
+      ++numFilesRenamed;
     }
-    
-    private NameNodeMetrics myMetrics = new NameNodeMetrics();
-    
-    /**
-     * Initialize the server
-     * 
-     * @param hostname which hostname to bind to
-     * @param port the port number to bind to
-     * @param conf the configuration
-     */
-    private void init(String hostname, int port, 
-                      Configuration conf
-                      ) throws IOException {
-      this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
-      this.server = RPC.getServer(this, hostname, port, handlerCount, 
-                                  false, conf);
-
-      // The rpc-server port can be ephemeral... ensure we have the correct info
-      this.nameNodeAddress = this.server.getListenerAddress(); 
-      conf.set("fs.default.name", new String(nameNodeAddress.getHostName() + ":" + nameNodeAddress.getPort()));
-      LOG.info("Namenode up at: " + this.nameNodeAddress);
-
-      try {
-        this.namesystem = new FSNamesystem(this.nameNodeAddress.getHostName(), this.nameNodeAddress.getPort(), this, conf);
-        this.server.start();  //start RPC server   
-  
-        this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
-        this.emptier.setDaemon(true);
-        this.emptier.start();
-      } catch (IOException e) {
-        this.server.stop();
-        throw e;
-      }
       
+    synchronized void listFile(int nfiles) {
+      numFilesListed += nfiles;
     }
+  }
     
-    /**
-     * Start NameNode.
-     * <p>
-     * The name-node can be started with one of the following startup options:
-     * <ul> 
-     * <li>{@link FSConstants.StartupOption#REGULAR REGULAR} - normal startup</li>
-     * <li>{@link FSConstants.StartupOption#FORMAT FORMAT} - format name node</li>
-     * <li>{@link FSConstants.StartupOption#UPGRADE UPGRADE} - start the cluster  
-     * upgrade and create a snapshot of the current file system state</li> 
-     * <li>{@link FSConstants.StartupOption#ROLLBACK ROLLBACK} - roll the  
-     *            cluster back to the previous state</li>
-     * </ul>
-     * The option is passed via configuration field: 
-     * <tt>dfs.namenode.startup</tt>
-     * 
-     * The conf will be modified to reflect the actual ports on which 
-     * the NameNode is up and running if the user passes the port as
-     * <code>zero</code> in the conf.
-     * 
-     * @param conf  confirguration
-     * @throws IOException
-     */
-    public NameNode(Configuration conf) throws IOException {
-      InetSocketAddress addr = 
-        DataNode.createSocketAddr(conf.get("fs.default.name"));
-      init( addr.getHostName(), addr.getPort(), conf );
-    }
-
-    /**
-     * Create a NameNode at the specified location and start it.
-     * 
-     * The conf will be modified to reflect the actual ports on which 
-     * the NameNode is up and running if the user passes the port as
-     * <code>zero</code>.  
-     */
-    public NameNode(String bindAddress, int port, 
+  private NameNodeMetrics myMetrics = new NameNodeMetrics();
+    
+  /**
+   * Initialize the server
+   * 
+   * @param hostname which hostname to bind to
+   * @param port the port number to bind to
+   * @param conf the configuration
+   */
+  private void init(String hostname, int port, 
                     Configuration conf
                     ) throws IOException {
-      init( bindAddress, port, conf );
-    }
-
-    /**
-     * Wait for service to finish.
-     * (Normally, it runs forever.)
-     */
-    public void join() {
-        try {
-            this.server.join();
-        } catch (InterruptedException ie) {
-        }
-    }
-
-    /**
-     * Stop all NameNode threads and wait for all to finish.
-    */
-    public void stop() {
-      if (! stopRequested) {
-        stopRequested = true;
-        namesystem.close();
-        emptier.interrupt();
-        server.stop();
-      }
-    }
-
-    /////////////////////////////////////////////////////
-    // ClientProtocol
-    /////////////////////////////////////////////////////
-    
-    /**
-     */
-    public LocatedBlock[] open(String src) throws IOException {
-        String clientMachine = Server.getRemoteAddress();
-        if ( clientMachine == null ) {
-            clientMachine = "";
-        }
-        Object openResults[] = namesystem.open(clientMachine, new UTF8(src));
-        if (openResults == null) {
-            throw new IOException("Cannot open filename " + src);
-        } else {
-            myMetrics.openFile();
-            Block blocks[] = (Block[]) openResults[0];
-            DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1];
-            LocatedBlock results[] = new LocatedBlock[blocks.length];
-            for (int i = 0; i < blocks.length; i++) {
-                results[i] = new LocatedBlock(blocks[i], sets[i]);
-            }
-            return results;
-        }
-    }
-
-    /**
-     */
-    public LocatedBlock create(String src, 
-                               String clientName, 
-                               boolean overwrite,
-                               short replication,
-                               long blockSize
-    ) throws IOException {
-       String clientMachine = Server.getRemoteAddress();
-       if ( clientMachine == null ) {
-           clientMachine = "";
-       }
-       stateChangeLog.debug("*DIR* NameNode.create: file "
-            +src+" for "+clientName+" at "+clientMachine);
-       if (!checkPathLength(src)) {
-           throw new IOException("create: Pathname too long.  Limit " 
-               + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-       }
-       Object results[] = namesystem.startFile(new UTF8(src), 
-                                                new UTF8(clientName), 
-                                                new UTF8(clientMachine), 
-                                                overwrite,
-                                                replication,
-                                                blockSize);
-       myMetrics.createFile();
-        Block b = (Block) results[0];
-        DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
-        return new LocatedBlock(b, targets);
-    }
-
-    public boolean setReplication( String src, 
-                                short replication
-                              ) throws IOException {
-      return namesystem.setReplication( src, replication );
+    this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
+    this.server = RPC.getServer(this, hostname, port, handlerCount, 
+                                false, conf);
+
+    // The rpc-server port can be ephemeral... ensure we have the correct info
+    this.nameNodeAddress = this.server.getListenerAddress(); 
+    conf.set("fs.default.name", new String(nameNodeAddress.getHostName() + ":" + nameNodeAddress.getPort()));
+    LOG.info("Namenode up at: " + this.nameNodeAddress);
+
+    try {
+      this.namesystem = new FSNamesystem(this.nameNodeAddress.getHostName(), this.nameNodeAddress.getPort(), this, conf);
+      this.server.start();  //start RPC server   
+  
+      this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
+      this.emptier.setDaemon(true);
+      this.emptier.start();
+    } catch (IOException e) {
+      this.server.stop();
+      throw e;
     }
+      
+  }
     
-    /**
-     */
-    public LocatedBlock addBlock(String src, 
-                                 String clientName) throws IOException {
-        stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
-            +src+" for "+clientName);
-        UTF8 src8 = new UTF8(src);
-        UTF8 client8 = new UTF8(clientName);
-        Object[] results = namesystem.getAdditionalBlock(src8, client8);
-        Block b = (Block) results[0];
-        DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
-        return new LocatedBlock(b, targets);            
-    }
-
-    /**
-     * The client needs to give up on the block.
-     */
-    public void abandonBlock(Block b, String src) throws IOException {
-        stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
-                +b.getBlockName()+" of file "+src );
-        if (! namesystem.abandonBlock(b, new UTF8(src))) {
-            throw new IOException("Cannot abandon block during write to " + src);
-        }
-    }
-    /**
-     */
-    public void abandonFileInProgress(String src, 
-                                      String holder) throws IOException {
-        stateChangeLog.debug("*DIR* NameNode.abandonFileInProgress:" + src );
-        namesystem.abandonFileInProgress(new UTF8(src), new UTF8(holder));
-    }
-    /**
-     */
-    public boolean complete(String src, String clientName) throws IOException {
-        stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName );
-        int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));
-        if (returnCode == STILL_WAITING) {
-            return false;
-        } else if (returnCode == COMPLETE_SUCCESS) {
-            return true;
-        } else {
-            throw new IOException("Could not complete write to file " + src + " by " + clientName);
-        }
-    }
-
-    /**
-     * The client has detected an error on the specified located blocks 
-     * and is reporting them to the server.  For now, the namenode will 
-     * delete the blocks from the datanodes.  In the future we might 
-     * check the blocks are actually corrupt. 
-     */
-    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-      stateChangeLog.debug("*DIR* NameNode.reportBadBlocks");
+  /**
+   * Start NameNode.
+   * <p>
+   * The name-node can be started with one of the following startup options:
+   * <ul> 
+   * <li>{@link FSConstants.StartupOption#REGULAR REGULAR} - normal startup</li>
+   * <li>{@link FSConstants.StartupOption#FORMAT FORMAT} - format name node</li>
+   * <li>{@link FSConstants.StartupOption#UPGRADE UPGRADE} - start the cluster  
+   * upgrade and create a snapshot of the current file system state</li> 
+   * <li>{@link FSConstants.StartupOption#ROLLBACK ROLLBACK} - roll the  
+   *            cluster back to the previous state</li>
+   * </ul>
+   * The option is passed via configuration field: 
+   * <tt>dfs.namenode.startup</tt>
+   * 
+   * The conf will be modified to reflect the actual ports on which 
+   * the NameNode is up and running if the user passes the port as
+   * <code>zero</code> in the conf.
+   * 
+   * @param conf  confirguration
+   * @throws IOException
+   */
+  public NameNode(Configuration conf) throws IOException {
+    InetSocketAddress addr = 
+      DataNode.createSocketAddr(conf.get("fs.default.name"));
+    init( addr.getHostName(), addr.getPort(), conf );
+  }
+
+  /**
+   * Create a NameNode at the specified location and start it.
+   * 
+   * The conf will be modified to reflect the actual ports on which 
+   * the NameNode is up and running if the user passes the port as
+   * <code>zero</code>.  
+   */
+  public NameNode(String bindAddress, int port, 
+                  Configuration conf
+                  ) throws IOException {
+    init( bindAddress, port, conf );
+  }
+
+  /**
+   * Wait for service to finish.
+   * (Normally, it runs forever.)
+   */
+  public void join() {
+    try {
+      this.server.join();
+    } catch (InterruptedException ie) {
+    }
+  }
+
+  /**
+   * Stop all NameNode threads and wait for all to finish.
+   */
+  public void stop() {
+    if (! stopRequested) {
+      stopRequested = true;
+      namesystem.close();
+      emptier.interrupt();
+      server.stop();
+    }
+  }
+
+  /////////////////////////////////////////////////////
+  // ClientProtocol
+  /////////////////////////////////////////////////////
+    
+  /**
+   */
+  public LocatedBlock[] open(String src) throws IOException {
+    String clientMachine = Server.getRemoteAddress();
+    if ( clientMachine == null ) {
+      clientMachine = "";
+    }
+    Object openResults[] = namesystem.open(clientMachine, new UTF8(src));
+    if (openResults == null) {
+      throw new IOException("Cannot open filename " + src);
+    } else {
+      myMetrics.openFile();
+      Block blocks[] = (Block[]) openResults[0];
+      DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1];
+      LocatedBlock results[] = new LocatedBlock[blocks.length];
       for (int i = 0; i < blocks.length; i++) {
-        Block blk = blocks[i].getBlock();
-        DatanodeInfo[] nodes = blocks[i].getLocations();
-        for (int j = 0; j < nodes.length; j++) {
-          DatanodeInfo dn = nodes[j];
-          namesystem.invalidateBlock(blk, dn);
-        }
+        results[i] = new LocatedBlock(blocks[i], sets[i]);
       }
+      return results;
     }
+  }
 
-    /**
-     */
-    public String[][] getHints(String src, long start, long len) throws IOException {
-      return namesystem.getDatanodeHints( src, start, len );
-    }
-    
-    public long getBlockSize(String filename) throws IOException {
-      return namesystem.getBlockSize(filename);
-    }
-    
-    /**
-     */
-    public boolean rename(String src, String dst) throws IOException {
-        stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst );
-        if (!checkPathLength(dst)) {
-            throw new IOException("rename: Pathname too long.  Limit " 
-                + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-        }
-        boolean ret = namesystem.renameTo(new UTF8(src), new UTF8(dst));
-        if (ret) {
-            myMetrics.renameFile();
-        }
-        return ret;
-    }
-
-    /**
-     */
-    public boolean delete(String src) throws IOException {
-        stateChangeLog.debug("*DIR* NameNode.delete: " + src );
-        return namesystem.delete(new UTF8(src));
-    }
-
-    /**
-     */
-    public boolean exists(String src) throws IOException {
-        return namesystem.exists(new UTF8(src));
-    }
-
-    /**
-     */
-    public boolean isDir(String src) throws IOException {
-        return namesystem.isDir(new UTF8(src));
-    }
-
-    /**
-     * Check path length does not exceed maximum.  Returns true if
-     * length and depth are okay.  Returns false if length is too long 
-     * or depth is too great.
-     * 
-     */
-    private boolean checkPathLength(String src) {
-        Path srcPath = new Path(src);
-        return (src.length() <= MAX_PATH_LENGTH &&
-                srcPath.depth() <= MAX_PATH_DEPTH);
-    }
-    
-    /**
-     */
-    public boolean mkdirs(String src) throws IOException {
-        stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src );
-        if (!checkPathLength(src)) {
-            throw new IOException("mkdirs: Pathname too long.  Limit " 
-                + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
-        }
-        return namesystem.mkdirs( src );
+  /**
+   */
+  public LocatedBlock create(String src, 
+                             String clientName, 
+                             boolean overwrite,
+                             short replication,
+                             long blockSize
+                             ) throws IOException {
+    String clientMachine = Server.getRemoteAddress();
+    if ( clientMachine == null ) {
+      clientMachine = "";
+    }
+    stateChangeLog.debug("*DIR* NameNode.create: file "
+                         +src+" for "+clientName+" at "+clientMachine);
+    if (!checkPathLength(src)) {
+      throw new IOException("create: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    Object results[] = namesystem.startFile(new UTF8(src), 
+                                            new UTF8(clientName), 
+                                            new UTF8(clientMachine), 
+                                            overwrite,
+                                            replication,
+                                            blockSize);
+    myMetrics.createFile();
+    Block b = (Block) results[0];
+    DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
+    return new LocatedBlock(b, targets);
+  }
+
+  public boolean setReplication( String src, 
+                                 short replication
+                                 ) throws IOException {
+    return namesystem.setReplication( src, replication );
+  }
+    
+  /**
+   */
+  public LocatedBlock addBlock(String src, 
+                               String clientName) throws IOException {
+    stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+                         +src+" for "+clientName);
+    UTF8 src8 = new UTF8(src);
+    UTF8 client8 = new UTF8(clientName);
+    Object[] results = namesystem.getAdditionalBlock(src8, client8);
+    Block b = (Block) results[0];
+    DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
+    return new LocatedBlock(b, targets);            
+  }
+
+  /**
+   * The client needs to give up on the block.
+   */
+  public void abandonBlock(Block b, String src) throws IOException {
+    stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+                         +b.getBlockName()+" of file "+src );
+    if (! namesystem.abandonBlock(b, new UTF8(src))) {
+      throw new IOException("Cannot abandon block during write to " + src);
+    }
+  }
+  /**
+   */
+  public void abandonFileInProgress(String src, 
+                                    String holder) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.abandonFileInProgress:" + src );
+    namesystem.abandonFileInProgress(new UTF8(src), new UTF8(holder));
+  }
+  /**
+   */
+  public boolean complete(String src, String clientName) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName );
+    int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));
+    if (returnCode == STILL_WAITING) {
+      return false;
+    } else if (returnCode == COMPLETE_SUCCESS) {
+      return true;
+    } else {
+      throw new IOException("Could not complete write to file " + src + " by " + clientName);
+    }
+  }
+
+  /**
+   * The client has detected an error on the specified located blocks 
+   * and is reporting them to the server.  For now, the namenode will 
+   * delete the blocks from the datanodes.  In the future we might 
+   * check the blocks are actually corrupt. 
+   */
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.reportBadBlocks");
+    for (int i = 0; i < blocks.length; i++) {
+      Block blk = blocks[i].getBlock();
+      DatanodeInfo[] nodes = blocks[i].getLocations();
+      for (int j = 0; j < nodes.length; j++) {
+        DatanodeInfo dn = nodes[j];
+        namesystem.invalidateBlock(blk, dn);
+      }
+    }
+  }
+
+  /**
+   */
+  public String[][] getHints(String src, long start, long len) throws IOException {
+    return namesystem.getDatanodeHints( src, start, len );
+  }
+    
+  public long getBlockSize(String filename) throws IOException {
+    return namesystem.getBlockSize(filename);
+  }
+    
+  /**
+   */
+  public boolean rename(String src, String dst) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst );
+    if (!checkPathLength(dst)) {
+      throw new IOException("rename: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    boolean ret = namesystem.renameTo(new UTF8(src), new UTF8(dst));
+    if (ret) {
+      myMetrics.renameFile();
+    }
+    return ret;
+  }
+
+  /**
+   */
+  public boolean delete(String src) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.delete: " + src );
+    return namesystem.delete(new UTF8(src));
+  }
+
+  /**
+   */
+  public boolean exists(String src) throws IOException {
+    return namesystem.exists(new UTF8(src));
+  }
+
+  /**
+   */
+  public boolean isDir(String src) throws IOException {
+    return namesystem.isDir(new UTF8(src));
+  }
+
+  /**
+   * Check path length does not exceed maximum.  Returns true if
+   * length and depth are okay.  Returns false if length is too long 
+   * or depth is too great.
+   * 
+   */
+  private boolean checkPathLength(String src) {
+    Path srcPath = new Path(src);
+    return (src.length() <= MAX_PATH_LENGTH &&
+            srcPath.depth() <= MAX_PATH_DEPTH);
+  }
+    
+  /**
+   */
+  public boolean mkdirs(String src) throws IOException {
+    stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src );
+    if (!checkPathLength(src)) {
+      throw new IOException("mkdirs: Pathname too long.  Limit " 
+                            + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
+    return namesystem.mkdirs( src );
+  }
 
-    /** @deprecated */ @Deprecated
+  /** @deprecated */ @Deprecated
     public boolean obtainLock(String src, String clientName, boolean exclusive) throws IOException {
-        int returnCode = namesystem.obtainLock(new UTF8(src), new UTF8(clientName), exclusive);
-        if (returnCode == COMPLETE_SUCCESS) {
-            return true;
-        } else if (returnCode == STILL_WAITING) {
-            return false;
-        } else {
-            throw new IOException("Failure when trying to obtain lock on " + src);
-        }
+    int returnCode = namesystem.obtainLock(new UTF8(src), new UTF8(clientName), exclusive);
+    if (returnCode == COMPLETE_SUCCESS) {
+      return true;
+    } else if (returnCode == STILL_WAITING) {
+      return false;
+    } else {
+      throw new IOException("Failure when trying to obtain lock on " + src);
     }
+  }
 
-    /** @deprecated */ @Deprecated
+  /** @deprecated */ @Deprecated
     public boolean releaseLock(String src, String clientName) throws IOException {
-        int returnCode = namesystem.releaseLock(new UTF8(src), new UTF8(clientName));
-        if (returnCode == COMPLETE_SUCCESS) {
-            return true;
-        } else if (returnCode == STILL_WAITING) {
-            return false;
-        } else {
-            throw new IOException("Failure when trying to release lock on " + src);
-        }
-    }
-
-    /**
-     */
-    public void renewLease(String clientName) throws IOException {
-        namesystem.renewLease(new UTF8(clientName));        
-    }
-
-    /**
-     */
-    public DFSFileInfo[] getListing(String src) throws IOException {
-        DFSFileInfo[] files = namesystem.getListing(new UTF8(src));
-        if (files != null) {
-            myMetrics.listFile(files.length);
-        }
-        return files;
-    }
-
-    /**
-     */
-    public long[] getStats() throws IOException {
-        long results[] = new long[2];
-        long totalCapacity = namesystem.totalCapacity();
-        results[0] = totalCapacity;
-        results[1] = totalCapacity - namesystem.totalRemaining();
-        return results;
-    }
-
-    /**
-     */
-    public DatanodeInfo[] getDatanodeReport() throws IOException {
-        DatanodeInfo results[] = namesystem.datanodeReport();
-        if (results == null || results.length == 0) {
-            throw new IOException("Cannot find datanode report");
-        }
-        return results;
-    }
-    
-    /**
-     * @inheritDoc
-     */
-    public boolean setSafeMode( SafeModeAction action ) throws IOException {
-      switch( action ) {
-      case SAFEMODE_LEAVE: // leave safe mode
-        namesystem.leaveSafeMode();
-        break;
-      case SAFEMODE_ENTER: // enter safe mode
-        namesystem.enterSafeMode();
-        break;
-      case SAFEMODE_GET: // get safe mode
-      }
-      return namesystem.isInSafeMode();
-    }
-
-    /**
-     * Is the cluster currently in safe mode?
-     */
-    boolean isInSafeMode() {
-      return namesystem.isInSafeMode();
-    }
-
-    /*
-     * Refresh the list of datanodes that the namenode should allow to  
-     * connect.  Uses the files list in the configuration to update the list. 
-     */
-    public void refreshNodes() throws IOException {
-      namesystem.refreshNodes();
-    }
-
-    /**
-     * Returns the size of the current edit log.
-     */
-    public long getEditLogSize() throws IOException {
-      return namesystem.getEditLogSize();
-    }
-
-    /**
-     * Roll the edit log.
-     */
-    public void rollEditLog() throws IOException {
-      namesystem.rollEditLog();
-    }
-
-    /**
-     * Roll the image 
-     */
-    public void rollFsImage() throws IOException {
-      namesystem.rollFSImage();
-    }
-    
-    public void finalizeUpgrade() throws IOException {
-      getFSImage().finalizeUpgrade();
-    }
-
-    /**
-     * Dumps namenode state into specified file
-     */
-    public void metaSave(String filename) throws IOException {
-      namesystem.metaSave(filename);
+    int returnCode = namesystem.releaseLock(new UTF8(src), new UTF8(clientName));
+    if (returnCode == COMPLETE_SUCCESS) {
+      return true;
+    } else if (returnCode == STILL_WAITING) {
+      return false;
+    } else {
+      throw new IOException("Failure when trying to release lock on " + src);
     }
+  }
 
-    ////////////////////////////////////////////////////////////////
-    // DatanodeProtocol
-    ////////////////////////////////////////////////////////////////
-    /** 
-     */
-    public DatanodeRegistration register( DatanodeRegistration nodeReg,
-                                          String networkLocation
+  /**
+   */
+  public void renewLease(String clientName) throws IOException {
+    namesystem.renewLease(new UTF8(clientName));        
+  }
+
+  /**
+   */
+  public DFSFileInfo[] getListing(String src) throws IOException {
+    DFSFileInfo[] files = namesystem.getListing(new UTF8(src));
+    if (files != null) {
+      myMetrics.listFile(files.length);
+    }
+    return files;
+  }
+
+  /**
+   */
+  public long[] getStats() throws IOException {
+    long results[] = new long[2];
+    long totalCapacity = namesystem.totalCapacity();
+    results[0] = totalCapacity;
+    results[1] = totalCapacity - namesystem.totalRemaining();
+    return results;
+  }
+
+  /**
+   */
+  public DatanodeInfo[] getDatanodeReport() throws IOException {
+    DatanodeInfo results[] = namesystem.datanodeReport();
+    if (results == null || results.length == 0) {
+      throw new IOException("Cannot find datanode report");
+    }
+    return results;
+  }
+    
+  /**
+   * @inheritDoc
+   */
+  public boolean setSafeMode( SafeModeAction action ) throws IOException {
+    switch( action ) {
+    case SAFEMODE_LEAVE: // leave safe mode
+      namesystem.leaveSafeMode();
+      break;
+    case SAFEMODE_ENTER: // enter safe mode
+      namesystem.enterSafeMode();
+      break;
+    case SAFEMODE_GET: // get safe mode
+    }
+    return namesystem.isInSafeMode();
+  }
+
+  /**
+   * Is the cluster currently in safe mode?
+   */
+  boolean isInSafeMode() {
+    return namesystem.isInSafeMode();
+  }
+
+  /*
+   * Refresh the list of datanodes that the namenode should allow to  
+   * connect.  Uses the files list in the configuration to update the list. 
+   */
+  public void refreshNodes() throws IOException {
+    namesystem.refreshNodes();
+  }
+
+  /**
+   * Returns the size of the current edit log.
+   */
+  public long getEditLogSize() throws IOException {
+    return namesystem.getEditLogSize();
+  }
+
+  /**
+   * Roll the edit log.
+   */
+  public void rollEditLog() throws IOException {
+    namesystem.rollEditLog();
+  }
+
+  /**
+   * Roll the image 
+   */
+  public void rollFsImage() throws IOException {
+    namesystem.rollFSImage();
+  }
+    
+  public void finalizeUpgrade() throws IOException {
+    getFSImage().finalizeUpgrade();
+  }
+
+  /**
+   * Dumps namenode state into specified file
+   */
+  public void metaSave(String filename) throws IOException {
+    namesystem.metaSave(filename);
+  }
+
+  ////////////////////////////////////////////////////////////////
+  // DatanodeProtocol
+  ////////////////////////////////////////////////////////////////
+  /** 
+   */
+  public DatanodeRegistration register( DatanodeRegistration nodeReg,
+                                        String networkLocation
                                         ) throws IOException {
-      verifyVersion( nodeReg.getVersion() );
-      namesystem.registerDatanode( nodeReg, networkLocation );
+    verifyVersion( nodeReg.getVersion() );
+    namesystem.registerDatanode( nodeReg, networkLocation );
       
-      return nodeReg;
-    }
-    
-    /**
-     * Data node notify the name node that it is alive 
-     * Return a block-oriented command for the datanode to execute.
-     * This will be either a transfer or a delete operation.
-     */
-    public DatanodeCommand sendHeartbeat( DatanodeRegistration nodeReg,
-                                          long capacity, 
-                                          long remaining,
-                                          int xmitsInProgress,
-                                          int xceiverCount) throws IOException {
-        Object xferResults[] = new Object[2];
-        xferResults[0] = xferResults[1] = null;
-        Object deleteList[] = new Object[1];
-        deleteList[0] = null; 
-
-        verifyRequest( nodeReg );
-        if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, 
-                                     xceiverCount, 
-                                     xmitsInProgress,
-                                     xferResults,
-                                     deleteList)) {
-          // request block report from the datanode
-          assert(xferResults[0] == null && deleteList[0] == null);
-          return new DatanodeCommand( DataNodeAction.DNA_REGISTER );
-        }
-        
-        //
-        // Ask to perform pending transfers, if any
-        //
-        if (xferResults[0] != null) {
-            assert(deleteList[0] == null);
-            return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
-        }
-
-        //
-        // If there are no transfers, check for recently-deleted blocks that
-        // should be removed.  This is not a full-datanode sweep, as is done during
-        // a block report.  This is just a small fast removal of blocks that have
-        // just been removed.
-        //
-        if (deleteList[0] != null) {
-            return new BlockCommand((Block[]) deleteList[0]);
-        }
-        return null;
-    }
-
-    public DatanodeCommand blockReport( DatanodeRegistration nodeReg,
-                                        Block blocks[]) throws IOException {
-        verifyRequest( nodeReg );
-        stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
-                +"from "+nodeReg.getName()+" "+blocks.length+" blocks" );
-
-        Block blocksToDelete[] = namesystem.processReport( nodeReg, blocks );
-        if( blocksToDelete != null && blocksToDelete.length > 0 )
-            return new BlockCommand( blocksToDelete );
-        if( getFSImage().isUpgradeFinalized() )
-          return new DatanodeCommand( DataNodeAction.DNA_FINALIZE );
-        return null;
-    }
-
-    public void blockReceived(DatanodeRegistration nodeReg, 
-                              Block blocks[]) throws IOException {
-        verifyRequest( nodeReg );
-        stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
-                +"from "+nodeReg.getName()+" "+blocks.length+" blocks." );
-        for (int i = 0; i < blocks.length; i++) {
-            namesystem.blockReceived( nodeReg, blocks[i] );
-        }
-    }
-
-    /**
-     */
-    public void errorReport(DatanodeRegistration nodeReg,
-                            int errorCode, 
-                            String msg) throws IOException {
-      // Log error message from datanode
-      LOG.info("Report from " + nodeReg.getName() + ": " + msg);
-      if( errorCode == DatanodeProtocol.NOTIFY ) {
-        return;
-      }
-      verifyRequest( nodeReg );
-      if( errorCode == DatanodeProtocol.DISK_ERROR ) {
-          namesystem.removeDatanode( nodeReg );            
-      }
-    }
-    
-    public NamespaceInfo versionRequest() throws IOException {
-      return namesystem.getNamespaceInfo();
-    }
-
-    /** 
-     * Verify request.
-     * 
-     * Verifies correctness of the datanode version, registration ID, and 
-     * if the datanode does not need to be shutdown.
-     * 
-     * @param nodeReg data node registration
-     * @throws IOException
-     */
-    public void verifyRequest( DatanodeRegistration nodeReg ) throws IOException {
-      verifyVersion( nodeReg.getVersion() );
-      if( ! namesystem.getRegistrationID().equals( nodeReg.getRegistrationID() ))
-          throw new UnregisteredDatanodeException( nodeReg );
-    }
-    
-    /**
-     * Verify version.
-     * 
-     * @param version
-     * @throws IOException
-     */
-    public void verifyVersion( int version ) throws IOException {
-      if( version != LAYOUT_VERSION )
-        throw new IncorrectVersionException( version, "data node" );
-    }
-
-    /**
-     * Returns the name of the fsImage file
-     */
-    public File getFsImageName() throws IOException {
-      return getFSImage().getFsImageName();
-    }
+    return nodeReg;
+  }
     
-    FSImage getFSImage() {
-      return namesystem.dir.fsImage;
-    }
-
-    /**
-     * Returns the name of the fsImage file uploaded by periodic
-     * checkpointing
-     */
-    public File[] getFsImageNameCheckpoint() throws IOException {
-      return getFSImage().getFsImageNameCheckpoint();
+  /**
+   * Data node notify the name node that it is alive 
+   * Return a block-oriented command for the datanode to execute.
+   * This will be either a transfer or a delete operation.
+   */
+  public DatanodeCommand sendHeartbeat( DatanodeRegistration nodeReg,
+                                        long capacity, 
+                                        long remaining,
+                                        int xmitsInProgress,
+                                        int xceiverCount) throws IOException {
+    Object xferResults[] = new Object[2];
+    xferResults[0] = xferResults[1] = null;
+    Object deleteList[] = new Object[1];
+    deleteList[0] = null; 
+
+    verifyRequest( nodeReg );
+    if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, 
+                                 xceiverCount, 
+                                 xmitsInProgress,
+                                 xferResults,
+                                 deleteList)) {
+      // request block report from the datanode
+      assert(xferResults[0] == null && deleteList[0] == null);
+      return new DatanodeCommand( DataNodeAction.DNA_REGISTER );
     }
-
-    /**
-     * Returns the name of the edits file
-     */
-    public File getFsEditName() throws IOException {
-      return namesystem.getFsEditName();
-    }
-
-    /**
-     * Returns the address on which the NameNodes is listening to.
-     * @return the address on which the NameNodes is listening to.
-     */
-    public InetSocketAddress getNameNodeAddress() {
-      return nameNodeAddress;
-    }
-
-    /**
-     * Verify that configured directories exist, then
-     * Interactively confirm that formatting is desired 
-     * for each existing directory and format them.
-     * 
-     * @param conf
-     * @param isConfirmationNeeded
-     * @return true if formatting was aborted, false otherwise
-     * @throws IOException
-     */
-    private static boolean format(Configuration conf,
-                                  boolean isConfirmationNeeded
+        
+    //
+    // Ask to perform pending transfers, if any
+    //
+    if (xferResults[0] != null) {
+      assert(deleteList[0] == null);
+      return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
+    }
+
+    //
+    // If there are no transfers, check for recently-deleted blocks that
+    // should be removed.  This is not a full-datanode sweep, as is done during
+    // a block report.  This is just a small fast removal of blocks that have
+    // just been removed.
+    //
+    if (deleteList[0] != null) {
+      return new BlockCommand((Block[]) deleteList[0]);
+    }
+    return null;
+  }
+
+  public DatanodeCommand blockReport( DatanodeRegistration nodeReg,
+                                      Block blocks[]) throws IOException {
+    verifyRequest( nodeReg );
+    stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+                         +"from "+nodeReg.getName()+" "+blocks.length+" blocks" );
+
+    Block blocksToDelete[] = namesystem.processReport( nodeReg, blocks );
+    if( blocksToDelete != null && blocksToDelete.length > 0 )
+      return new BlockCommand( blocksToDelete );
+    if( getFSImage().isUpgradeFinalized() )
+      return new DatanodeCommand( DataNodeAction.DNA_FINALIZE );
+    return null;
+  }
+
+  public void blockReceived(DatanodeRegistration nodeReg, 
+                            Block blocks[]) throws IOException {
+    verifyRequest( nodeReg );
+    stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
+                         +"from "+nodeReg.getName()+" "+blocks.length+" blocks." );
+    for (int i = 0; i < blocks.length; i++) {
+      namesystem.blockReceived( nodeReg, blocks[i] );
+    }
+  }
+
+  /**
+   */
+  public void errorReport(DatanodeRegistration nodeReg,
+                          int errorCode, 
+                          String msg) throws IOException {
+    // Log error message from datanode
+    LOG.info("Report from " + nodeReg.getName() + ": " + msg);
+    if( errorCode == DatanodeProtocol.NOTIFY ) {
+      return;
+    }
+    verifyRequest( nodeReg );
+    if( errorCode == DatanodeProtocol.DISK_ERROR ) {
+      namesystem.removeDatanode( nodeReg );            
+    }
+  }
+    
+  public NamespaceInfo versionRequest() throws IOException {
+    return namesystem.getNamespaceInfo();
+  }
+
+  /** 
+   * Verify request.
+   * 
+   * Verifies correctness of the datanode version, registration ID, and 
+   * if the datanode does not need to be shutdown.
+   * 
+   * @param nodeReg data node registration
+   * @throws IOException
+   */
+  public void verifyRequest( DatanodeRegistration nodeReg ) throws IOException {
+    verifyVersion( nodeReg.getVersion() );
+    if( ! namesystem.getRegistrationID().equals( nodeReg.getRegistrationID() ))
+      throw new UnregisteredDatanodeException( nodeReg );
+  }
+    
+  /**
+   * Verify version.
+   * 
+   * @param version
+   * @throws IOException
+   */
+  public void verifyVersion( int version ) throws IOException {
+    if( version != LAYOUT_VERSION )
+      throw new IncorrectVersionException( version, "data node" );
+  }
+
+  /**
+   * Returns the name of the fsImage file
+   */
+  public File getFsImageName() throws IOException {
+    return getFSImage().getFsImageName();
+  }
+    
+  FSImage getFSImage() {
+    return namesystem.dir.fsImage;
+  }
+
+  /**
+   * Returns the name of the fsImage file uploaded by periodic
+   * checkpointing
+   */
+  public File[] getFsImageNameCheckpoint() throws IOException {
+    return getFSImage().getFsImageNameCheckpoint();
+  }
+
+  /**
+   * Returns the name of the edits file
+   */
+  public File getFsEditName() throws IOException {
+    return namesystem.getFsEditName();
+  }
+
+  /**
+   * Returns the address on which the NameNodes is listening to.
+   * @return the address on which the NameNodes is listening to.
+   */
+  public InetSocketAddress getNameNodeAddress() {
+    return nameNodeAddress;
+  }
+
+  /**
+   * Verify that configured directories exist, then
+   * Interactively confirm that formatting is desired 
+   * for each existing directory and format them.
+   * 
+   * @param conf
+   * @param isConfirmationNeeded
+   * @return true if formatting was aborted, false otherwise
+   * @throws IOException
+   */
+  private static boolean format(Configuration conf,
+                                boolean isConfirmationNeeded
                                 ) throws IOException {
-      Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs( conf );
-      for( Iterator<File> it = dirsToFormat.iterator(); it.hasNext(); ) {
-        File curDir = it.next();
-        if( ! curDir.exists() )
-          continue;
-        if( isConfirmationNeeded ) {
-          System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
-          if (!(System.in.read() == 'Y')) {
-            System.err.println("Format aborted in "+ curDir);
-            return true;
-          }
-          while( System.in.read() != '\n' ); // discard the enter-key
-        }
-      }
-
-      FSNamesystem nsys = new FSNamesystem(new FSImage( dirsToFormat ));
-      nsys.dir.fsImage.format();
-      return false;
-    }
-
-    private static void printUsage() {
-      System.err.println(
-      "Usage: java NameNode [-format] | [-upgrade] | [-rollback]");
-    }
-
-    private static StartupOption parseArguments(String args[], 
-                                                Configuration conf ) {
-      int argsLen = (args == null) ? 0 : args.length;
-      StartupOption startOpt = StartupOption.REGULAR;
-      for( int i=0; i < argsLen; i++ ) {
-        String cmd = args[i];
-        if( "-format".equalsIgnoreCase(cmd) ) {
-          startOpt = StartupOption.FORMAT;
-        } else if( "-regular".equalsIgnoreCase(cmd) ) {
-          startOpt = StartupOption.REGULAR;
-        } else if( "-upgrade".equalsIgnoreCase(cmd) ) {
-          startOpt = StartupOption.UPGRADE;
-        } else if( "-rollback".equalsIgnoreCase(cmd) ) {
-          startOpt = StartupOption.ROLLBACK;
-        } else
-          return null;
+    Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs( conf );
+    for( Iterator<File> it = dirsToFormat.iterator(); it.hasNext(); ) {
+      File curDir = it.next();
+      if( ! curDir.exists() )
+        continue;
+      if( isConfirmationNeeded ) {
+        System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
+        if (!(System.in.read() == 'Y')) {
+          System.err.println("Format aborted in "+ curDir);
+          return true;
+        }
+        while( System.in.read() != '\n' ); // discard the enter-key
       }
-      conf.setObject( "dfs.namenode.startup", startOpt );
-      return startOpt;
     }
 
-    static NameNode createNameNode( String argv[], 
-                                    Configuration conf ) throws IOException {
-      if( conf == null )
-        conf = new Configuration();
-      StartupOption startOpt = parseArguments( argv, conf );
-      if( startOpt == null ) {
-        printUsage();
+    FSNamesystem nsys = new FSNamesystem(new FSImage( dirsToFormat ));
+    nsys.dir.fsImage.format();
+    return false;
+  }
+
+  private static void printUsage() {
+    System.err.println(
+                       "Usage: java NameNode [-format] | [-upgrade] | [-rollback]");
+  }
+
+  private static StartupOption parseArguments(String args[], 
+                                              Configuration conf ) {
+    int argsLen = (args == null) ? 0 : args.length;
+    StartupOption startOpt = StartupOption.REGULAR;
+    for( int i=0; i < argsLen; i++ ) {
+      String cmd = args[i];
+      if( "-format".equalsIgnoreCase(cmd) ) {
+        startOpt = StartupOption.FORMAT;
+      } else if( "-regular".equalsIgnoreCase(cmd) ) {
+        startOpt = StartupOption.REGULAR;
+      } else if( "-upgrade".equalsIgnoreCase(cmd) ) {
+        startOpt = StartupOption.UPGRADE;
+      } else if( "-rollback".equalsIgnoreCase(cmd) ) {
+        startOpt = StartupOption.ROLLBACK;
+      } else
         return null;
-      }
-      
-      if( startOpt == StartupOption.FORMAT ) {
-        boolean aborted = format( conf, true );
-        System.exit(aborted ? 1 : 0);
-      }
+    }
+    conf.setObject( "dfs.namenode.startup", startOpt );
+    return startOpt;
+  }
+
+  static NameNode createNameNode( String argv[], 
+                                  Configuration conf ) throws IOException {
+    if( conf == null )
+      conf = new Configuration();
+    StartupOption startOpt = parseArguments( argv, conf );
+    if( startOpt == null ) {
+      printUsage();
+      return null;
+    }
       
-      NameNode namenode = new NameNode(conf);
-      return namenode;
+    if( startOpt == StartupOption.FORMAT ) {
+      boolean aborted = format( conf, true );
+      System.exit(aborted ? 1 : 0);
     }
-    
-    /**
-     */
-    public static void main(String argv[]) throws Exception {
-      try {
-        NameNode namenode = createNameNode( argv, null );
-        if( namenode != null )
-          namenode.join();
-      } catch ( Throwable e ) {
-        LOG.error( StringUtils.stringifyException( e ) );
-        System.exit(-1);
-      }
+      
+    NameNode namenode = new NameNode(conf);
+    return namenode;
+  }
+    
+  /**
+   */
+  public static void main(String argv[]) throws Exception {
+    try {
+      NameNode namenode = createNameNode( argv, null );
+      if( namenode != null )
+        namenode.join();
+    } catch ( Throwable e ) {
+      LOG.error( StringUtils.stringifyException( e ) );
+      System.exit(-1);
     }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Mon Apr 16 14:44:35 2007
@@ -129,7 +129,7 @@
         return null;
       }
       Block[] blockList = timedOutItems.toArray(
-                            new Block[timedOutItems.size()]);
+                                                new Block[timedOutItems.size()]);
       timedOutItems.clear();
       return blockList;
     }



Mime
View raw message