hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r753481 [2/3] - in /hadoop/core/trunk: ./ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/common/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ s...
Date Sat, 14 Mar 2009 01:20:37 GMT
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Mar 14 01:20:36 2009
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.BufferedInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -37,9 +36,14 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.*;
@@ -49,7 +53,7 @@
  * 
  */
 public class FSEditLog {
-  private static final byte OP_INVALID = -1;
+  public  static final byte OP_INVALID = -1;
   private static final byte OP_ADD = 0;
   private static final byte OP_RENAME = 1;  // rename
   private static final byte OP_DELETE = 2;  // delete
@@ -68,6 +72,18 @@
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+  /* 
+   * The following operations are used to control remote edit log streams,
+   * and not logged into file streams.
+   */
+  static final byte OP_JSPOOL_START = // start journal spool
+                                    NamenodeProtocol.JA_JSPOOL_START;
+  static final byte OP_CHECKPOINT_TIME = // incr checkpoint time
+                                    NamenodeProtocol.JA_CHECKPOINT_TIME;
+
+  static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
+  		" File system changes are not persistent. No journal streams.";
+
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -111,6 +127,8 @@
    * which stores edits in a local file.
    */
   static private class EditLogFileOutputStream extends EditLogOutputStream {
+    private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE/Byte.SIZE;
+
     private File file;
     private FileOutputStream fp;    // file stream for storing edit logs 
     private FileChannel fc;         // channel of the file stream for sync
@@ -129,11 +147,16 @@
       fc.position(fc.size());
     }
 
-    @Override
-    String getName() {
+    @Override // JournalStream
+    public String getName() {
       return file.getPath();
     }
 
+    @Override // JournalStream
+    public JournalType getType() {
+      return JournalType.FILE;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void write(int b) throws IOException {
@@ -213,8 +236,9 @@
      */
     @Override
     long length() throws IOException {
-      // file size + size of both buffers
-      return fc.size() + bufReady.size() + bufCurrent.size();
+      // file size - header size + size of both buffers
+      return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES
+                       + bufReady.size() + bufCurrent.size();
     }
 
     // allocate a big chunk of data
@@ -231,7 +255,16 @@
                               " at offset " +  newsize);
       }
     }
-    
+
+    /**
+     * Operations like OP_JSPOOL_START and OP_CHECKPOINT_TIME
+     * should not be written into edits file.
+     */
+    @Override
+    boolean isOperationSupported(byte op) {
+      return op < OP_JSPOOL_START - 1;
+    }
+
     /**
      * Returns the file associated with this stream
      */
@@ -240,6 +273,10 @@
     }
   }
 
+  /**
+   * An implementation of the abstract class {@link EditLogInputStream},
+   * which reads edits from a local file.
+   */
   static class EditLogFileInputStream extends EditLogInputStream {
     private File file;
     private FileInputStream fStream;
@@ -249,11 +286,16 @@
       fStream = new FileInputStream(name);
     }
 
-    @Override
-    String getName() {
+    @Override // JournalStream
+    public String getName() {
       return file.getPath();
     }
 
+    @Override // JournalStream
+    public JournalType getType() {
+      return JournalType.FILE;
+    }
+
     @Override
     public int available() throws IOException {
       return fStream.available();
@@ -296,14 +338,10 @@
     return fsimage.getEditNewFile(sd);
   }
   
-  private int getNumStorageDirs() {
- int numStorageDirs = 0;
- for (Iterator<StorageDirectory> it = 
-       fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext(); it.next())
-   numStorageDirs++;
-    return numStorageDirs;
+  private int getNumEditsDirs() {
+   return fsimage.getNumStorageDirs(NameNodeDirType.EDITS);
   }
-  
+
   synchronized int getNumEditStreams() {
     return editStreams == null ? 0 : editStreams.size();
   }
@@ -366,29 +404,43 @@
       } catch (InterruptedException ie) { 
       }
     }
-    if (editStreams == null) {
+    if (editStreams == null || editStreams.isEmpty()) {
       return;
     }
     printStatistics(true);
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
-    for (int idx = 0; idx < editStreams.size(); idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    Iterator<EditLogOutputStream> it = getOutputStreamIterator(null);
+    while(it.hasNext()) {
+      EditLogOutputStream eStream = it.next();
       try {
-        eStream.setReadyToFlush();
-        eStream.flush();
-        eStream.close();
+        closeStream(eStream);
       } catch (IOException e) {
         FSNamesystem.LOG.warn("FSEditLog:close - failed to close stream " 
             + eStream.getName());
-        processIOError(idx);
-        idx--;
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
     }
+    processIOError(errorStreams);
     editStreams.clear();
   }
 
   /**
+   * Close and remove edit log stream.
+   * @param index of the stream
+   */
+  synchronized private void removeStream(int index) {
+    EditLogOutputStream eStream = editStreams.get(index);
+    try {
+      eStream.close();
+    } catch (Exception e) {}
+    editStreams.remove(index);
+  }
+
+  /**
    * If there is an IO Error on any log operations, remove that
    * directory from the list of directories.
    * If no more directories remain, then exit.
@@ -399,19 +451,16 @@
       "Fatal Error : All storage directories are inaccessible."); 
       Runtime.getRuntime().exit(-1);
     }
-    assert(index < getNumStorageDirs());
-    assert(getNumStorageDirs() == editStreams.size());
-    
-    EditLogFileOutputStream eStream = (EditLogFileOutputStream)editStreams.get(index);
-    File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                      .get(index)).getFile()
-                                      .getParentFile().getParentFile();
-    
-    try {
-      eStream.close();
-    } catch (Exception e) {}
-    
-    editStreams.remove(index);
+    assert(index < getNumEditStreams());
+
+    EditLogOutputStream eStream = editStreams.get(index);
+    removeStream(index);
+
+    if(!(eStream instanceof EditLogFileOutputStream))
+      return; // non file streams don't have associated storage directories
+
+    EditLogFileOutputStream eFStream = (EditLogFileOutputStream)eStream;
+    File parentStorageDir = eFStream.getFile().getParentFile().getParentFile();
     //
     // Invoke the ioerror routine of the fsimage
     //
@@ -436,15 +485,15 @@
                                        .get(idx)).getFile()
                                        .getParentFile().getParentFile();
       if (parentStorageDir.getName().equals(sd.getRoot().getName()))
-        editStreams.remove(idx);
- }
+        removeStream(idx);
+    }
   }
   
   /**
    * The specified streams have IO errors. Remove them from logging
    * new transactions.
    */
-  private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
+  synchronized void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
     if (errorStreams == null) {
       return;                       // nothing to do
     }
@@ -454,31 +503,25 @@
       int numEditStreams = editStreams.size();
       for (j = 0; j < numEditStreams; j++) {
         if (editStreams.get(j) == eStream) {
+          FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName());
           break;
         }
       }
       if (j == numEditStreams) {
           FSNamesystem.LOG.error("Unable to find sync log on which " +
-                                 " IO error occured. " +
-                                 "Fatal Error.");
-          Runtime.getRuntime().exit(-1);
+                                 " IO error occured.");
+          continue;
       }
       processIOError(j);
     }
-    fsimage.incrementCheckpointTime();
+    incrementCheckpointTime();
   }
 
   /**
-   * check if ANY edits.new log exists
+   * check if edits.new log exists in the specified stoorage directory
    */
-  boolean existsNew() {
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      if (getEditNewFile(it.next()).exists()) { 
-        return true;
-      }
-    }
-    return false;
+  boolean existsNew(StorageDirectory sd) {
+    return getEditNewFile(sd).exists(); 
   }
 
   /**
@@ -487,20 +530,20 @@
    * along.
    */
   static int loadFSEdits(EditLogInputStream edits) throws IOException {
-    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
-    FSDirectory fsDir = fsNamesys.dir;
+    DataInputStream in = edits.getDataInputStream();
+    long startTime = FSNamesystem.now();
+    int numEdits = loadFSEdits(in, true);
+    FSImage.LOG.info("Edits file " + edits.getName() 
+        + " of size " + edits.length() + " edits # " + numEdits 
+        + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
+    return numEdits;
+  }
+
+  static int loadFSEdits(DataInputStream in,
+                         boolean closeOnExit) throws IOException {
     int numEdits = 0;
     int logVersion = 0;
-    String clientName = null;
-    String clientMachine = null;
-    String path = null;
-    int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
-        numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
-        numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpOther = 0;
-    long startTime = FSNamesystem.now();
 
-    DataInputStream in = new DataInputStream(new BufferedInputStream(edits));
     try {
       // Read log file version. Could be missing. 
       in.mark(4);
@@ -523,7 +566,29 @@
       }
       assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
                             "Unsupported version " + logVersion;
+      numEdits = loadEditRecords(logVersion, in, false);
+    } finally {
+      if(closeOnExit)
+        in.close();
+    }
+    if (logVersion != FSConstants.LAYOUT_VERSION) // other version
+      numEdits++; // save this image asap
+    return numEdits;
+  }
 
+  static int loadEditRecords(int logVersion, DataInputStream in,
+                             boolean closeOnExit) throws IOException {
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    FSDirectory fsDir = fsNamesys.dir;
+    int numEdits = 0;
+    String clientName = null;
+    String clientMachine = null;
+    String path = null;
+    int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
+        numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
+        numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
+        numOpTimes = 0, numOpOther = 0;
+    try {
       while (true) {
         long timestamp = 0;
         long mtime = 0;
@@ -531,10 +596,10 @@
         long blockSize = 0;
         byte opcode = -1;
         try {
+          in.mark(1);
           opcode = in.readByte();
           if (opcode == OP_INVALID) {
-            FSNamesystem.LOG.info("Invalid opcode, reached end of edit log " +
-                                   "Number of transactions found " + numEdits);
+            in.reset(); // reset back to end of file if somebody reads it again
             break; // no more transactions
           }
         } catch (EOFException e) {
@@ -791,12 +856,9 @@
         }
       }
     } finally {
-      in.close();
+      if(closeOnExit)
+        in.close();
     }
-    FSImage.LOG.info("Edits file " + edits.getName() 
-        + " of size " + edits.length() + " edits # " + numEdits 
-        + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
-
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
           + " numOpDelete = " + numOpDelete + " numOpRename = " + numOpRename 
@@ -807,9 +869,6 @@
           + " numOpTimes = " + numOpTimes
           + " numOpOther = " + numOpOther);
     }
-
-    if (logVersion != FSConstants.LAYOUT_VERSION) // other version
-      numEdits++; // save this image asap
     return numEdits;
   }
 
@@ -842,20 +901,27 @@
    * store yet.
    */
   synchronized void logEdit(byte op, Writable ... writables) {
-    assert this.getNumEditStreams() > 0 : "no editlog streams";
+    if(getNumEditStreams() == 0)
+      throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
+    ArrayList<EditLogOutputStream> errorStreams = null;
     long start = FSNamesystem.now();
-    for (int idx = 0; idx < editStreams.size(); idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
+    for(EditLogOutputStream eStream : editStreams) {
+      if(!eStream.isOperationSupported(op))
+        continue;
       try {
         eStream.write(op, writables);
       } catch (IOException ie) {
         FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
-        processIOError(idx);         
-        // processIOError will remove the idx's stream 
-        // from the editStreams collection, so we need to update idx
-        idx--; 
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
     }
+    processIOError(errorStreams);
+    recordTransaction(start);
+  }
+
+  private void recordTransaction(long start) {
     // get a new transactionId
     txid++;
 
@@ -883,10 +949,8 @@
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
 
-    final int numEditStreams;
     synchronized (this) {
-      numEditStreams = editStreams.size();
-      assert numEditStreams > 0 : "no editlog streams";
+      assert editStreams.size() > 0 : "no editlog streams";
       printStatistics(false);
 
       // if somebody is already syncing, then wait
@@ -912,14 +976,14 @@
       isSyncRunning = true;   
 
       // swap buffers
-      for (int idx = 0; idx < numEditStreams; idx++) {
-        editStreams.get(idx).setReadyToFlush();
+      for(EditLogOutputStream eStream : editStreams) {
+        eStream.setReadyToFlush();
       }
     }
 
     // do the sync
     long start = FSNamesystem.now();
-    for (int idx = 0; idx < numEditStreams; idx++) {
+    for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
         eStream.flush();
@@ -1112,14 +1176,16 @@
    * Return the size of the current EditLog
    */
   synchronized long getEditLogSize() throws IOException {
-    assert(getNumStorageDirs() == editStreams.size());
+    assert getNumEditsDirs() <= getNumEditStreams() : 
+        "Number of edits directories should not exceed the number of streams.";
     long size = 0;
-    for (int idx = 0; idx < editStreams.size(); idx++) {
+    for (int idx = 0; idx < getNumEditStreams(); idx++) {
       EditLogOutputStream es = editStreams.get(idx);
       try {
         long curSize = es.length();
-        assert (size == 0 || size == curSize) : "All streams must be the same";
-        size = curSize;
+        assert (size == 0 || size == curSize || curSize ==0) :
+          "Wrong streams size";
+        size = Math.max(size, curSize);
       } catch (IOException e) {
         FSImage.LOG.warn("getEditLogSize: editstream.length failed. removing editlog (" +
             idx + ") " + es.getName());
@@ -1131,93 +1197,127 @@
 
   /**
    * Closes the current edit log and opens edits.new. 
-   * Returns the lastModified time of the edits log.
    */
   synchronized void rollEditLog() throws IOException {
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    if(!it.hasNext()) 
+      return;
     //
     // If edits.new already exists in some directory, verify it
     // exists in all directories.
     //
-    if (existsNew()) {
-      for (Iterator<StorageDirectory> it = 
-               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editsNew = getEditNewFile(it.next());
-     if (!editsNew.exists()) { 
-          throw new IOException("Inconsistent existance of edits.new " +
-                                editsNew);
-        }
-      }
-      return; // nothing to do, edits.new exists!
+    boolean alreadyExists = existsNew(it.next());
+    while(it.hasNext()) {
+      StorageDirectory sd = it.next();
+      if(alreadyExists != existsNew(sd))
+        throw new IOException(getEditNewFile(sd) 
+              + "should " + (alreadyExists ? "" : "not ") + "exist.");
     }
-
-    close();                     // close existing edit log
+    if(alreadyExists)
+      return; // nothing to do, edits.new exists!
 
     // check if any of failed storage is now available and put it back
     fsimage.attemptRestoreRemovedStorage();
-    
-    //
-    // Open edits.new
-    //
-    boolean failedSd = false;
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
+
+    divertFileStreams(
+        Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
+  }
+
+  /**
+   * Divert file streams from file edits to file edits.new.<p>
+   * Close file streams, which are currently writing into edits files.
+   * Create new streams based on file getRoot()/dest.
+   * @param dest new stream path relative to the storage directory root.
+   * @throws IOException
+   */
+  void divertFileStreams(String dest) throws IOException {
+    assert getNumEditStreams() >= getNumEditsDirs() :
+      "Inconsistent number of streams";
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    EditStreamIterator itE = 
+      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
+    Iterator<StorageDirectory> itD = 
+      fsimage.dirIterator(NameNodeDirType.EDITS);
+    while(itE.hasNext() && itD.hasNext()) {
+      EditLogOutputStream eStream = itE.next();
+      StorageDirectory sd = itD.next();
+      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
+        throw new IOException("Inconsistent order of edit streams: " + eStream);
       try {
-        EditLogFileOutputStream eStream = 
-             new EditLogFileOutputStream(getEditNewFile(sd));
+        // close old stream
+        closeStream(eStream);
+        // create new stream
+        eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest));
         eStream.create();
-        editStreams.add(eStream);
+        // replace by the new stream
+        itE.replace(eStream);
       } catch (IOException e) {
-        failedSd = true;
-        // remove stream and this storage directory from list
-        FSImage.LOG.warn("rollEdidLog: removing storage " + sd.getRoot().getPath());
-        sd.unlock();
-        fsimage.removedStorageDirs.add(sd);
-        it.remove();
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
     }
-    if(failedSd)
-      fsimage.incrementCheckpointTime();  // update time for the valid ones
+    processIOError(errorStreams);
   }
 
   /**
-   * Removes the old edit log and renamed edits.new as edits.
+   * Removes the old edit log and renames edits.new to edits.
    * Reopens the edits file.
    */
   synchronized void purgeEditLog() throws IOException {
-    //
-    // If edits.new does not exists, then return error.
-    //
-    if (!existsNew()) {
-      throw new IOException("Attempt to purge edit log " +
-                            "but edits.new does not exist.");
-    }
-    close();
+    revertFileStreams(
+        Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
+  }
 
-    //
-    // Delete edits and rename edits.new to edits.
-    //
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
-        //
-        // renameTo() fails on Windows if the destination
-        // file exists.
-        //
-        getEditFile(sd).delete();
-        if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
-          // Should we also remove from edits
-          NameNode.LOG.warn("purgeEditLog: removing failed storage " + sd.getRoot().getPath());
-          fsimage.removedStorageDirs.add(sd);
-          it.remove(); 
+  /**
+   * Revert file streams from file edits.new back to file edits.<p>
+   * Close file streams, which are currently writing into getRoot()/source.
+   * Rename getRoot()/source to edits.
+   * Reopen streams so that they start writing into edits files.
+   * @param dest new stream path relative to the storage directory root.
+   * @throws IOException
+   */
+  synchronized void revertFileStreams(String source) throws IOException {
+    assert getNumEditStreams() >= getNumEditsDirs() :
+      "Inconsistent number of streams";
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    EditStreamIterator itE = 
+      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
+    Iterator<StorageDirectory> itD = 
+      fsimage.dirIterator(NameNodeDirType.EDITS);
+    while(itE.hasNext() && itD.hasNext()) {
+      EditLogOutputStream eStream = itE.next();
+      StorageDirectory sd = itD.next();
+      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
+        throw new IOException("Inconsistent order of edit streams: " + eStream);
+      try {
+        // close old stream
+        closeStream(eStream);
+        // rename edits.new to edits
+        File editFile = getEditFile(sd);
+        File prevEditFile = new File(sd.getRoot(), source);
+        if(prevEditFile.exists()) {
+          if(!prevEditFile.renameTo(editFile)) {
+            //
+            // renameTo() fails on Windows if the destination
+            // file exists.
+            //
+            if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
+              throw new IOException("Rename failed for " + sd.getRoot());
+            }
+          }
         }
+        // open new stream
+        eStream = new EditLogFileOutputStream(editFile);
+        // replace by the new stream
+        itE.replace(eStream);
+      } catch (IOException e) {
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
     }
-    //
-    // Reopen all the edits logs.
-    //
-    open();
+    processIOError(errorStreams);
   }
 
   /**
@@ -1309,4 +1409,178 @@
     }
     return blocks;
   }
+
+  boolean isEmpty() throws IOException {
+    return getEditLogSize() <= 0;
+  }
+
+  /**
+   * Create (or find if already exists) an edit output stream, which
+   * streams journal records (edits) to the specified backup node.<br>
+   * Send a record, prescribing to start journal spool.<br>
+   * This should be sent via regular stream of journal records so that
+   * the backup node new exactly after which record it should start spooling.
+   * 
+   * @param bnReg the backup node registration information.
+   * @param nnReg this (active) name-node registration.
+   * @throws IOException
+   */
+  void logJSpoolStart(NamenodeRegistration bnReg, // backup node
+                      NamenodeRegistration nnReg) // active name-node
+  throws IOException {
+    if(bnReg.isRole(NamenodeRole.CHECKPOINT))
+      return; // checkpoint node does not stream edits
+    if(editStreams == null)
+      editStreams = new ArrayList<EditLogOutputStream>();
+    EditLogOutputStream boStream = null;
+    for(EditLogOutputStream eStream : editStreams) {
+      if(eStream.getName().equals(bnReg.getAddress())) {
+        boStream = eStream; // already there
+        break;
+      }
+    }
+    if(boStream == null) {
+      boStream = new EditLogBackupOutputStream(bnReg, nnReg);
+      editStreams.add(boStream);
+    }
+    logEdit(OP_JSPOOL_START, (Writable[])null);
+  }
+
+  /**
+   * Write an operation to the edit log. Do not sync to persistent
+   * store yet.
+   */
+  synchronized void logEdit(int length, byte[] data) {
+    if(getNumEditStreams() == 0)
+      throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    long start = FSNamesystem.now();
+    for(EditLogOutputStream eStream : editStreams) {
+      try {
+        eStream.write(data, 0, length);
+      } catch (IOException ie) {
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
+      }
+    }
+    processIOError(errorStreams);
+    recordTransaction(start);
+  }
+
+  /**
+   * Iterates output streams based of the same type.
+   * Type null will iterate over all streams.
+   */
+  private class EditStreamIterator implements Iterator<EditLogOutputStream> {
+    JournalType type;
+    int prevIndex; // for remove()
+    int nextIndex; // for next()
+
+    EditStreamIterator(JournalType streamType) {
+      this.type = streamType;
+      this.nextIndex = 0;
+      this.prevIndex = 0;
+    }
+
+    public boolean hasNext() {
+      if(editStreams == null || 
+         editStreams.isEmpty() || nextIndex >= editStreams.size())
+        return false;
+      while(nextIndex < editStreams.size()
+            && !editStreams.get(nextIndex).getType().isOfType(type))
+        nextIndex++;
+      return nextIndex < editStreams.size();
+    }
+
+    public EditLogOutputStream next() {
+      EditLogOutputStream stream = editStreams.get(nextIndex);
+      prevIndex = nextIndex;
+      nextIndex++;
+      while(nextIndex < editStreams.size()
+          && !editStreams.get(nextIndex).getType().isOfType(type))
+      nextIndex++;
+      return stream;
+    }
+
+    public void remove() {
+      nextIndex = prevIndex; // restore previous state
+      removeStream(prevIndex); // remove last returned element
+      hasNext(); // reset nextIndex to correct place
+    }
+
+    void replace(EditLogOutputStream newStream) {
+      assert 0 <= prevIndex && prevIndex < editStreams.size() :
+                                                        "Index out of bound.";
+      editStreams.set(prevIndex, newStream);
+    }
+  }
+
+  /**
+   * Get stream iterator for the specified type.
+   */
+  public Iterator<EditLogOutputStream>
+  getOutputStreamIterator(JournalType streamType) {
+    return new EditStreamIterator(streamType);
+  }
+
+  private void closeStream(EditLogOutputStream eStream) throws IOException {
+    eStream.setReadyToFlush();
+    eStream.flush();
+    eStream.close();
+  }
+
+  void incrementCheckpointTime() {
+    fsimage.incrementCheckpointTime();
+    Writable[] args = {new LongWritable(fsimage.getCheckpointTime())};
+    logEdit(OP_CHECKPOINT_TIME, args);
+  }
+
+  synchronized void releaseBackupStream(NamenodeRegistration registration) {
+    Iterator<EditLogOutputStream> it =
+                                  getOutputStreamIterator(JournalType.BACKUP);
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    NamenodeRegistration backupNode = null;
+    while(it.hasNext()) {
+      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
+      backupNode = eStream.getRegistration();
+      if(backupNode.getAddress().equals(registration.getAddress()) &&
+            backupNode.isRole(registration.getRole())) {
+        errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
+        break;
+      }
+    }
+    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
+      "Not a backup node corresponds to a backup stream";
+    processIOError(errorStreams);
+  }
+
+  synchronized boolean checkBackupRegistration(
+      NamenodeRegistration registration) {
+    Iterator<EditLogOutputStream> it =
+                                  getOutputStreamIterator(JournalType.BACKUP);
+    boolean regAllowed = !it.hasNext();
+    NamenodeRegistration backupNode = null;
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    while(it.hasNext()) {
+      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
+      backupNode = eStream.getRegistration();
+      if(backupNode.getAddress().equals(registration.getAddress()) &&
+          backupNode.isRole(registration.getRole())) {
+        regAllowed = true; // same node re-registers
+        break;
+      }
+      if(!eStream.isAlive()) {
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
+        regAllowed = true; // previous backup node failed
+      }
+    }
+    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
+      "Not a backup node corresponds to a backup stream";
+    processIOError(errorStreams);
+    return regAllowed;
+  }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Mar 14 01:20:36 2009
@@ -41,7 +41,6 @@
 import java.lang.Math;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -49,6 +48,7 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.io.UTF8;
@@ -56,6 +56,10 @@
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -109,9 +113,9 @@
       return this == type;
     }
   }
-  
-  protected long checkpointTime = -1L;
-  private FSEditLog editLog = null;
+
+  protected long checkpointTime = -1L;  // The age of the image
+  protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
 
   /**
@@ -141,7 +145,7 @@
   /**
    * Can fs-image be rolled?
    */
-  volatile private CheckpointStates ckptState = FSImage.CheckpointStates.START; 
+  volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START; 
 
   /**
    * Used for saving the image to disk
@@ -228,28 +232,70 @@
     return getImageFile(sd, NameNodeFile.EDITS_NEW);
   }
 
-  File[] getFileNames(NameNodeFile type, NameNodeDirType dirType) {
+  Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
     ArrayList<File> list = new ArrayList<File>();
     Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
                                     dirIterator(dirType);
     for ( ;it.hasNext(); ) {
       list.add(getImageFile(it.next(), type));
     }
+    return list;
+  }
+
+  @Deprecated // Use getFiles() instead
+  File[] getFileNames(NameNodeFile type, NameNodeDirType dirType) {
+    Collection<File> list = getFiles(type, dirType);
     return list.toArray(new File[list.size()]);
   }
 
+  @Deprecated
   File[] getImageFiles() {
     return getFileNames(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
   }
 
+  @Deprecated
   File[] getEditsFiles() {
     return getFileNames(NameNodeFile.EDITS, NameNodeDirType.EDITS);
   }
 
+  @Deprecated // should be removed
   File[] getTimeFiles() {
     return getFileNames(NameNodeFile.TIME, null);
   }
 
+  Collection<File> getDirectories(NameNodeDirType dirType) {
+    ArrayList<File> list = new ArrayList<File>();
+    Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
+                                    dirIterator(dirType);
+    for ( ;it.hasNext(); ) {
+      list.add(it.next().getRoot());
+    }
+    return list;
+  }
+
+  Collection<File> getImageDirectories() {
+    return getDirectories(NameNodeDirType.IMAGE);
+  }
+
+  Collection<File> getEditsDirectories() {
+    return getDirectories(NameNodeDirType.EDITS);
+  }
+
+  /**
+   * Return number of storage directories of the given type.
+   * @param dirType directory type
+   * @return number of storage directories of type dirType
+   */
+  int getNumStorageDirs(NameNodeDirType dirType) {
+    if(dirType == null)
+      return getNumStorageDirs();
+    Iterator<StorageDirectory> it = dirIterator(dirType);
+    int numDirs = 0;
+    for(; it.hasNext(); it.next())
+      numDirs++;
+    return numDirs;
+  }
+
   /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
@@ -269,7 +315,8 @@
       "NameNode formatting should be performed before reading the image";
     
     // none of the data dirs exist
-    if (dataDirs.size() == 0 || editsDirs.size() == 0)  
+    if((dataDirs.size() == 0 || editsDirs.size() == 0) 
+                             && startOpt != StartupOption.IMPORT)  
       throw new IOException(
         "All specified directories are not accessible or do not exist.");
     
@@ -366,7 +413,7 @@
       return false; // upgrade saved image already
     case IMPORT:
       doImportCheckpoint();
-      return true;
+      return false; // import checkpoint saved image already
     case ROLLBACK:
       doRollback();
       break;
@@ -532,9 +579,10 @@
     }
     // return back the real image
     realImage.setStorageInfo(ckptImage);
+    checkpointTime = ckptImage.checkpointTime;
     fsNamesys.dir.fsImage = realImage;
-    // and save it
-    saveFSImage();
+    // and save it but keep the same checkpointTime
+    saveFSImage(false);
   }
 
   void finalizeUpgrade() throws IOException {
@@ -611,7 +659,10 @@
     if (checkpointTime < 0L)
       return; // do not write negative time
     File timeFile = getImageFile(sd, NameNodeFile.TIME);
-    if (timeFile.exists()) { timeFile.delete(); }
+    if (timeFile.exists() && ! timeFile.delete()) {
+        LOG.error("Cannot delete chekpoint time file: "
+                  + timeFile.getCanonicalPath());
+    }
     DataOutputStream out = new DataOutputStream(
                                                 new FileOutputStream(timeFile));
     try {
@@ -628,8 +679,21 @@
    * storage directory is removed from the list.
    */
   void incrementCheckpointTime() {
-    this.checkpointTime++;
-    
+    setCheckpointTime(checkpointTime + 1);
+  }
+
+  /**
+   * The age of the namespace state.<p>
+   * Reflects the latest time the image was saved.
+   * Modified with every save or a checkpoint.
+   * Persisted in VERSION file.
+   */
+  long getCheckpointTime() {
+    return checkpointTime;
+  }
+
+  void setCheckpointTime(long newCpT) {
+    checkpointTime = newCpT;
     // Write new checkpoint time in all storage directories
     for(Iterator<StorageDirectory> it =
                           dirIterator(); it.hasNext();) {
@@ -1057,6 +1121,14 @@
    * and create empty edits.
    */
   public void saveFSImage() throws IOException {
+    saveFSImage(true);
+  }
+
+  public void saveFSImage(boolean renewCheckpointTime) throws IOException {
+    assert editLog != null : "editLog must be initialized";
+    if(!editLog.isOpen())
+      editLog.open();
+
     editLog.createNewIfMissing();
     for (Iterator<StorageDirectory> it = 
                            dirIterator(); it.hasNext();) {
@@ -1072,7 +1144,7 @@
       }
     }
     ckptState = CheckpointStates.UPLOAD_DONE;
-    rollFSImage();
+    rollFSImage(renewCheckpointTime);
   }
 
   /**
@@ -1302,16 +1374,16 @@
    * Reopens the new edits file.
    */
   void rollFSImage() throws IOException {
-    if (ckptState != CheckpointStates.UPLOAD_DONE) {
+    rollFSImage(true);
+  }
+
+  void rollFSImage(boolean renewCheckpointTime) throws IOException {
+    if (ckptState != CheckpointStates.UPLOAD_DONE
+      && !(ckptState == CheckpointStates.ROLLED_EDITS
+      && getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) {
       throw new IOException("Cannot roll fsImage before rolling edits log.");
     }
-    //
-    // First, verify that edits.new and fsimage.ckpt exists in all
-    // checkpoint directories.
-    //
-    if (!editLog.existsNew()) {
-      throw new IOException("New Edits file does not exist");
-    }
+
     for (Iterator<StorageDirectory> it = 
                        dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -1326,6 +1398,14 @@
     //
     // Renames new image
     //
+    renameCheckpoint();
+    resetVersion(renewCheckpointTime);
+  }
+
+  /**
+   * Renames new image
+   */
+  void renameCheckpoint() {
     for (Iterator<StorageDirectory> it = 
                        dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -1335,8 +1415,7 @@
       // already exists.
       LOG.debug("renaming  " + ckpt.getAbsolutePath() + " to "  + curFile.getAbsolutePath());
       if (!ckpt.renameTo(curFile)) {
-        curFile.delete();
-        if (!ckpt.renameTo(curFile)) {
+        if (!curFile.delete() || !ckpt.renameTo(curFile)) {
           LOG.warn("renaming  " + ckpt.getAbsolutePath() + " to "  + 
               curFile.getAbsolutePath() + " FAILED");
           
@@ -1350,25 +1429,31 @@
         }
       }
     }
+  }
 
-    //
-    // Updates the fstime file on all directories (fsimage and edits)
-    // and write version file
-    //
+  /**
+   * Updates version and fstime files in all directories (fsimage and edits).
+   */
+  void resetVersion(boolean renewCheckpointTime) throws IOException {
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
-    this.checkpointTime = FSNamesystem.now();
+    if(renewCheckpointTime)
+      this.checkpointTime = FSNamesystem.now();
     for (Iterator<StorageDirectory> it = 
                            dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       // delete old edits if sd is the image only the directory
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
         File editsFile = getImageFile(sd, NameNodeFile.EDITS);
-        editsFile.delete();
+        if(editsFile.exists() && !editsFile.delete())
+          throw new IOException("Cannot delete edits file " 
+                                + editsFile.getCanonicalPath());
       }
       // delete old fsimage if sd is the edits only the directory
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
         File imageFile = getImageFile(sd, NameNodeFile.IMAGE);
-        imageFile.delete();
+        if(imageFile.exists() && !imageFile.delete())
+          throw new IOException("Cannot delete image file " 
+                                + imageFile.getCanonicalPath());
       }
       try {
         sd.write();
@@ -1388,6 +1473,8 @@
   CheckpointSignature rollEditLog() throws IOException {
     getEditLog().rollEditLog();
     ckptState = CheckpointStates.ROLLED_EDITS;
+    // If checkpoint fails this should be the most recent image, therefore
+    incrementCheckpointTime();
     return new CheckpointSignature(this);
   }
 
@@ -1415,6 +1502,99 @@
   }
 
   /**
+   * Start checkpoint.
+   * <p>
+   * If backup storage contains image that is newer than or incompatible with 
+   * what the active name-node has, then the backup node should shutdown.<br>
+   * If the backup image is older than the active one then it should 
+   * be discarded and downloaded from the active node.<br>
+   * If the images are the same then the backup image will be used as current.
+   * 
+   * @param bnReg the backup node registration.
+   * @param nnReg this (active) name-node registration.
+   * @return {@link NamenodeCommand} if backup node should shutdown or
+   * {@link CheckpointCommand} prescribing what backup node should 
+   *         do with its image.
+   * @throws IOException
+   */
+  NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
+                                  NamenodeRegistration nnReg) // active name-node
+  throws IOException {
+    String msg = null;
+    // Verify that checkpoint is allowed
+    if(bnReg.getNamespaceID() != this.getNamespaceID())
+      msg = "Name node " + bnReg.getAddress()
+            + " has incompatible namespace id: " + bnReg.getNamespaceID()
+            + " expected: " + getNamespaceID();
+    else if(bnReg.isRole(NamenodeRole.ACTIVE))
+      msg = "Name node " + bnReg.getAddress()
+            + " role " + bnReg.getRole() + ": checkpoint is not allowed.";
+    else if(bnReg.getLayoutVersion() < this.getLayoutVersion()
+        || (bnReg.getLayoutVersion() == this.getLayoutVersion()
+            && bnReg.getCTime() > this.getCTime())
+        || (bnReg.getLayoutVersion() == this.getLayoutVersion()
+            && bnReg.getCTime() == this.getCTime()
+            && bnReg.getCheckpointTime() > this.checkpointTime))
+      // remote node has newer image age
+      msg = "Name node " + bnReg.getAddress()
+            + " has newer image layout version: LV = " +bnReg.getLayoutVersion()
+            + " cTime = " + bnReg.getCTime()
+            + " checkpointTime = " + bnReg.getCheckpointTime()
+            + ". Current version: LV = " + getLayoutVersion()
+            + " cTime = " + getCTime()
+            + " checkpointTime = " + checkpointTime;
+    if(msg != null) {
+      LOG.error(msg);
+      return new NamenodeCommand(NamenodeProtocol.ACT_SHUTDOWN);
+    }
+    boolean isImgObsolete = true;
+    if(bnReg.getLayoutVersion() == this.getLayoutVersion()
+        && bnReg.getCTime() == this.getCTime()
+        && bnReg.getCheckpointTime() == this.checkpointTime)
+      isImgObsolete = false;
+    boolean needToReturnImg = true;
+    if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0)
+      // do not return image if there are no image directories
+      needToReturnImg = false;
+    CheckpointSignature sig = rollEditLog();
+    getEditLog().logJSpoolStart(bnReg, nnReg);
+    return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);
+  }
+
+  /**
+   * End checkpoint.
+   * <p>
+   * Rename uploaded checkpoint to the new image;
+   * purge old edits file;
+   * rename edits.new to edits;
+   * redirect edit log streams to the new edits;
+   * update checkpoint time if the remote node is a checkpoint only node.
+   * 
+   * @param sig
+   * @param remoteNNRole
+   * @throws IOException
+   */
+  void endCheckpoint(CheckpointSignature sig, 
+                     NamenodeRole remoteNNRole) throws IOException {
+    sig.validateStorageInfo(this);
+    // Renew checkpoint time for the active if the other is a checkpoint-node.
+    // The checkpoint-node should have older image for the next checkpoint 
+    // to take effect.
+    // The backup-node always has up-to-date image and will have the same
+    // checkpoint time as the active node.
+    boolean renewCheckpointTime = remoteNNRole.equals(NamenodeRole.CHECKPOINT);
+    rollFSImage(renewCheckpointTime);
+  }
+
+  CheckpointStates getCheckpointState() {
+    return ckptState;
+  }
+
+  void setCheckpointState(CheckpointStates cs) {
+    ckptState = cs;
+  }
+
+  /**
    * This is called when a checkpoint upload finishes successfully.
    */
   synchronized void checkpointUploadDone() {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Mar 14 01:20:36 2009
@@ -44,6 +44,8 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.fs.ContentSummary;
@@ -274,9 +276,9 @@
   /**
    * FSNamesystem constructor.
    */
-  FSNamesystem(NameNode nn, Configuration conf) throws IOException {
+  FSNamesystem(Configuration conf) throws IOException {
     try {
-      initialize(nn, conf);
+      initialize(conf, null);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -287,24 +289,36 @@
   /**
    * Initialize FSNamesystem.
    */
-  private void initialize(NameNode nn, Configuration conf) throws IOException {
+  private void initialize(Configuration conf, FSImage fsImage) throws IOException {
     this.systemStart = now();
     setConfigurationParameters(conf);
 
     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
-    this.dir = new FSDirectory(this, conf);
-    StartupOption startOpt = NameNode.getStartupOption(conf);
-    this.dir.loadFSImage(getNamespaceDirs(conf),
-                         getNamespaceEditsDirs(conf), startOpt);
-    long timeTakenToLoadFSImage = now() - systemStart;
-    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
-    NameNode.getNameNodeMetrics().fsImageLoadTime.set(
-                              (int) timeTakenToLoadFSImage);
+    if(fsImage == null) {
+      this.dir = new FSDirectory(this, conf);
+      StartupOption startOpt = NameNode.getStartupOption(conf);
+      this.dir.loadFSImage(getNamespaceDirs(conf),
+                           getNamespaceEditsDirs(conf), startOpt);
+      long timeTakenToLoadFSImage = now() - systemStart;
+      LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
+      NameNode.getNameNodeMetrics().fsImageLoadTime.set(
+                                (int) timeTakenToLoadFSImage);
+    } else {
+      this.dir = new FSDirectory(fsImage, this, conf);
+    }
     this.safeMode = new SafeModeInfo(conf);
-    setBlockTotal();
     pendingReplications = new PendingReplicationBlocks(
                             conf.getInt("dfs.replication.pending.timeout.sec", 
                                         -1) * 1000L);
+    this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
+                        conf.get("dfs.hosts.exclude",""));
+  }
+
+  /**
+   * Activate FSNamesystem daemons.
+   */
+  void activate(Configuration conf) throws IOException {
+    setBlockTotal();
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(leaseManager.new Monitor());
     this.replthread = new Daemon(new ReplicationMonitor());
@@ -312,8 +326,6 @@
     lmthread.start();
     replthread.start();
 
-    this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
-                                           conf.get("dfs.hosts.exclude",""));
     this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
         conf.getInt("dfs.namenode.decommission.interval", 30),
         conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
@@ -334,8 +346,34 @@
   }
 
   public static Collection<File> getNamespaceDirs(Configuration conf) {
-    Collection<String> dirNames = conf.getStringCollection("dfs.name.dir");
-    if (dirNames.isEmpty())
+    return getStorageDirs(conf, "dfs.name.dir");
+  }
+
+  public static Collection<File> getStorageDirs(Configuration conf,
+                                                String propertyName) {
+    Collection<String> dirNames = conf.getStringCollection(propertyName);
+    StartupOption startOpt = NameNode.getStartupOption(conf);
+    if(startOpt == StartupOption.IMPORT) {
+      // In case of IMPORT this will get rid of default directories 
+      // but will retain directories specified in hdfs-site.xml
+      // When importing image from a checkpoint, the name-node can
+      // start with empty set of storage directories.
+      Configuration cE = new Configuration(false);
+      cE.addResource("core-default.xml");
+      cE.addResource("core-site.xml");
+      cE.addResource("hdfs-default.xml");
+      Collection<String> dirNames2 = cE.getStringCollection(propertyName);
+      dirNames.removeAll(dirNames2);
+      if(dirNames.isEmpty())
+        LOG.warn("!!! WARNING !!!" +
+          "\n\tThe NameNode currently runs without persistent storage." +
+          "\n\tAny changes to the file system meta-data may be lost." +
+          "\n\tRecommended actions:" +
+          "\n\t\t- shutdown and restart NameNode with configured \"" 
+          + propertyName + "\" in hdfs-site.xml;" +
+          "\n\t\t- use Backup Node as a persistent and up-to-date storage " +
+          "of the file system meta-data.");
+    } else if (dirNames.isEmpty())
       dirNames.add("/tmp/hadoop/dfs/name");
     Collection<File> dirs = new ArrayList<File>(dirNames.size());
     for(String name : dirNames) {
@@ -343,17 +381,9 @@
     }
     return dirs;
   }
-  
+
   public static Collection<File> getNamespaceEditsDirs(Configuration conf) {
-    Collection<String> editsDirNames = 
-            conf.getStringCollection("dfs.name.edits.dir");
-    if (editsDirNames.isEmpty())
-      editsDirNames.add("/tmp/hadoop/dfs/name");
-    Collection<File> dirs = new ArrayList<File>(editsDirNames.size());
-    for(String name : editsDirNames) {
-      dirs.add(new File(name));
-    }
-    return dirs;
+    return getStorageDirs(conf, "dfs.name.edits.dir");
   }
 
   /**
@@ -366,6 +396,25 @@
   }
 
   /**
+   * Create FSNamesystem for {@link BackupNode}.
+   * Should do everything that would be done for the NameNode,
+   * except for loading the image.
+   * 
+   * @param bnImage {@link BackupStorage}
+   * @param conf configuration
+   * @throws IOException
+   */
+  FSNamesystem(Configuration conf, BackupStorage bnImage) throws IOException {
+    try {
+      initialize(conf, bnImage);
+    } catch(IOException e) {
+      LOG.error(getClass().getSimpleName() + " initialization failed.", e);
+      close();
+      throw e;
+    }
+  }
+
+  /**
    * Initializes some of the members from configuration
    */
   private void setConfigurationParameters(Configuration conf) 
@@ -2193,7 +2242,7 @@
         DatanodeDescriptor nodeinfo = null;
         try {
           nodeinfo = getDatanode(nodeReg);
-        } catch(UnregisteredDatanodeException e) {
+        } catch(UnregisteredNodeException e) {
           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
           
@@ -3788,7 +3837,7 @@
    * Returns TRUE if node is registered (including when it is on the 
    * exclude list and is being decommissioned). 
    */
-  private synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg, String ipAddr) 
+  private synchronized boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) 
     throws IOException {
     if (!inHostsList(nodeReg, ipAddr)) {
       return false;    
@@ -3824,12 +3873,12 @@
    * @throws IOException
    */
   public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
-    UnregisteredDatanodeException e = null;
+    UnregisteredNodeException e = null;
     DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
     if (node == null) 
       return null;
     if (!node.getName().equals(nodeID.getName())) {
-      e = new UnregisteredDatanodeException(nodeID, node);
+      e = new UnregisteredNodeException(nodeID, node);
       NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
                                     + e.getLocalizedMessage());
       throw e;
@@ -4310,7 +4359,7 @@
     safeMode.leave(checkForUpgrades);
   }
     
-  String getSafeModeTip() {
+  synchronized String getSafeModeTip() {
     if (!isInSafeMode())
       return "";
     return safeMode.getTurnOffTip();
@@ -4338,6 +4387,24 @@
     getFSImage().rollFSImage();
   }
 
+  NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
+                                  NamenodeRegistration nnReg) // active name-node
+  throws IOException {
+    NamenodeCommand cmd;
+    synchronized(this) {
+      cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+    }
+    LOG.info("Start checkpoint for " + bnReg.getAddress());
+    getEditLog().logSync();
+    return cmd;
+  }
+
+  synchronized void endCheckpoint(NamenodeRegistration registration,
+                            CheckpointSignature sig) throws IOException {
+    LOG.info("End checkpoint for " + registration.getAddress());
+    getFSImage().endCheckpoint(sig, registration.getRole());
+  }
+
   /**
    * Returns whether the given block is one pointed-to by a file.
    */
@@ -4649,4 +4716,45 @@
       }
     }
   }
+
+  /**
+   * Register a name-node.
+   * <p>
+   * Registration is allowed if there is no ongoing streaming to
+   * another backup node.
+   * We currently allow only one backup node, but multiple chackpointers 
+   * if there are no backups.
+   * 
+   * @param registration
+   * @throws IOException
+   */
+  synchronized void registerBackupNode(NamenodeRegistration registration)
+  throws IOException {
+    if(getFSImage().getNamespaceID() != registration.getNamespaceID())
+      throw new IOException("Incompatible namespaceIDs: " 
+          + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
+          + "; " + registration.getRole() +
+              " node namespaceID = " + registration.getNamespaceID());
+    boolean regAllowed = getEditLog().checkBackupRegistration(registration);
+    if(!regAllowed)
+      throw new IOException("Registration is not allowed. " +
+      		"Another node is registered as a backup.");
+  }
+
+  /**
+   * Release (unregister) backup node.
+   * <p>
+   * Find and remove the backup stream corresponding to the node.
+   * @param registration
+   * @throws IOException
+   */
+  synchronized void releaseBackupNode(NamenodeRegistration registration)
+  throws IOException {
+    if(getFSImage().getNamespaceID() != registration.getNamespaceID())
+      throw new IOException("Incompatible namespaceIDs: " 
+          + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
+          + "; " + registration.getRole() +
+              " node namespaceID = " + registration.getNamespaceID());
+    getEditLog().releaseBackupStream(registration);
+  }
 }

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * A generic interface for journal input and output streams.
+ */
+interface JournalStream {
+  /**
+   * Type of the underlying persistent storage type the stream is based upon.
+   * <ul>
+   * <li>{@link JournalType#FILE} - streams edits into a local file, see
+   * {@link FSEditLog.EditLogFileOutputStream} and 
+   * {@link FSEditLog.EditLogFileInputStream}</li>
+   * <li>{@link JournalType#BACKUP} - streams edits to a backup node, see
+   * {@link EditLogBackupOutputStream} and {@link EditLogBackupInputStream}</li>
+   * </ul>
+   */
+  static enum JournalType {
+    FILE,
+    BACKUP;
+    boolean isOfType(JournalType other) {
+      return other == null || this == other;
+    }
+  };
+
+  /**
+   * Get this stream name.
+   * 
+   * @return name of the stream
+   */
+  String getName();
+
+  /**
+   * Get the type of the stream.
+   * Determines the underlying persistent storage type.
+   * @see JournalType
+   * @return type
+   */
+  JournalType getType();
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Sat Mar 14 01:20:36 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.CompleteFileStatus;
@@ -37,7 +38,10 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.http.HttpServer;
@@ -123,18 +127,21 @@
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
 
-  private FSNamesystem namesystem; 
+  protected FSNamesystem namesystem; 
+  protected NamenodeRole role;
   /** RPC server */
-  private Server server;
+  protected Server server;
   /** RPC server address */
-  private InetSocketAddress serverAddress = null;
+  protected InetSocketAddress rpcAddress = null;
   /** httpServer */
-  private HttpServer httpServer;
+  protected HttpServer httpServer;
   /** HTTP server address */
-  private InetSocketAddress httpAddress = null;
+  protected InetSocketAddress httpAddress = null;
   private Thread emptier;
   /** only used for testing purposes  */
-  private boolean stopRequested = false;
+  protected boolean stopRequested = false;
+  /** Registration information of this name-node  */
+  protected NamenodeRegistration nodeRegistration;
   /** Is service level authorization enabled? */
   private boolean serviceAuthEnabled = false;
   
@@ -172,12 +179,64 @@
   }
 
   /**
+   * Compose a "host:port" string from the address.
+   */
+  public static String getHostPortString(InetSocketAddress addr) {
+    return addr.getHostName() + ":" + addr.getPort();
+  }
+
+  //
+  // Common NameNode methods implementation for the active name-node role.
+  //
+  public NamenodeRole getRole() {
+    return role;
+  }
+
+  boolean isRole(NamenodeRole that) {
+    return role.equals(that);
+  }
+
+  protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
+    return getAddress(conf);
+  }
+
+  protected void setRpcServerAddress(Configuration conf) {
+    FileSystem.setDefaultUri(conf, getUri(rpcAddress));
+  }
+
+  protected InetSocketAddress getHttpServerAddress(Configuration conf) {
+    String addr = NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
+                                "dfs.info.port", "dfs.http.address");
+    return NetUtils.createSocketAddr(addr);
+  }
+
+  protected void setHttpServerAddress(Configuration conf){
+    conf.set("dfs.http.address", getHostPortString(httpAddress));
+  }
+
+  protected void loadNamesystem(Configuration conf) throws IOException {
+    this.namesystem = new FSNamesystem(conf);
+  }
+
+  NamenodeRegistration getRegistration() {
+    return nodeRegistration;
+  }
+
+  NamenodeRegistration setRegistration() {
+    nodeRegistration = new NamenodeRegistration(
+        getHostPortString(rpcAddress),
+        getHostPortString(httpAddress),
+        getFSImage(), getRole(), getFSImage().getCheckpointTime());
+    return nodeRegistration;
+  }
+
+  /**
    * Initialize name-node.
    * 
    * @param conf the configuration
    */
-  private void initialize(Configuration conf) throws IOException {
-    InetSocketAddress socAddr = NameNode.getAddress(conf);
+  protected void initialize(Configuration conf) throws IOException {
+    InetSocketAddress socAddr = getRpcServerAddress(conf);
     int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
     
     // set service-level authorization security policy
@@ -197,15 +256,23 @@
                                 handlerCount, false, conf);
 
     // The rpc-server port can be ephemeral... ensure we have the correct info
-    this.serverAddress = this.server.getListenerAddress(); 
-    FileSystem.setDefaultUri(conf, getUri(serverAddress));
-    LOG.info("Namenode up at: " + this.serverAddress);
+    this.rpcAddress = this.server.getListenerAddress(); 
+    setRpcServerAddress(conf);
+    LOG.info(getRole() + " up at: " + rpcAddress);
+
+    myMetrics = new NameNodeMetrics(conf, role);
 
-    myMetrics = new NameNodeMetrics(conf, this);
+    loadNamesystem(conf);
+    activate(conf);
+  }
 
-    this.namesystem = new FSNamesystem(this, conf);
+  /**
+   * Activate name-node servers and threads.
+   */
+  void activate(Configuration conf) throws IOException {
+    namesystem.activate(conf);
     startHttpServer(conf);
-    this.server.start();  //start RPC server   
+    server.start();  //start RPC server
     startTrashEmptier(conf);
   }
 
@@ -219,10 +286,7 @@
   }
 
   private void startHttpServer(Configuration conf) throws IOException {
-    String infoAddr = 
-      NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
-                                "dfs.info.port", "dfs.http.address");
-    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+    InetSocketAddress infoSocAddr = getHttpServerAddress(conf);
     String infoHost = infoSocAddr.getHostName();
     int infoPort = infoSocAddr.getPort();
     this.httpServer = new HttpServer("hdfs", infoHost, infoPort, 
@@ -256,8 +320,8 @@
     // The web-server port can be ephemeral... ensure we have the correct info
     infoPort = this.httpServer.getPort();
     this.httpAddress = new InetSocketAddress(infoHost, infoPort);
-    conf.set("dfs.http.address", infoHost + ":" + infoPort);
-    LOG.info("Web-server up at: " + infoHost + ":" + infoPort);
+    setHttpServerAddress(conf);
+    LOG.info(getRole() + " Web-server up at: " + httpAddress);
   }
 
   /**
@@ -267,10 +331,15 @@
    * <ul> 
    * <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
    * <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
+   * <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
+   * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
    * upgrade and create a snapshot of the current file system state</li> 
    * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the  
    *            cluster back to the previous state</li>
+   * <li>{@link StartupOption#FINALIZE FINALIZE} - finalize 
+   *            previous upgrade</li>
+   * <li>{@link StartupOption#IMPORT IMPORT} - import checkpoint</li>
    * </ul>
    * The option is passed via configuration field: 
    * <tt>dfs.namenode.startup</tt>
@@ -283,6 +352,11 @@
    * @throws IOException
    */
   public NameNode(Configuration conf) throws IOException {
+    this(conf, NamenodeRole.ACTIVE);
+  }
+
+  protected NameNode(Configuration conf, NamenodeRole role) throws IOException {
+    this.role = role;
     try {
       initialize(conf);
     } catch (IOException e) {
@@ -328,13 +402,7 @@
   /////////////////////////////////////////////////////
   // NamenodeProtocol
   /////////////////////////////////////////////////////
-  /**
-   * return a list of blocks & their locations on <code>datanode</code> whose
-   * total size is <code>size</code>
-   * 
-   * @param datanode on which blocks are located
-   * @param size total size of blocks
-   */
+  @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
   throws IOException {
     if(size <= 0) {
@@ -344,7 +412,61 @@
 
     return namesystem.getBlocks(datanode, size); 
   }
-  
+
+  @Override // NamenodeProtocol
+  public void errorReport(NamenodeRegistration registration,
+                          int errorCode, 
+                          String msg) throws IOException {
+    verifyRequest(registration);
+    LOG.info("Error report from " + registration + ": " + msg);
+    if(errorCode == FATAL)
+      namesystem.releaseBackupNode(registration);
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeRegistration register(NamenodeRegistration registration)
+  throws IOException {
+    verifyVersion(registration.getVersion());
+    namesystem.registerBackupNode(registration);
+    return setRegistration();
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+  throws IOException {
+    verifyRequest(registration);
+    if(!isRole(NamenodeRole.ACTIVE))
+      throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
+    return namesystem.startCheckpoint(registration, setRegistration());
+  }
+
+  @Override // NamenodeProtocol
+  public void endCheckpoint(NamenodeRegistration registration,
+                            CheckpointSignature sig) throws IOException {
+    verifyRequest(registration);
+    if(!isRole(NamenodeRole.ACTIVE))
+      throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
+    namesystem.endCheckpoint(registration, sig);
+  }
+
+  @Override // NamenodeProtocol
+  public long journalSize(NamenodeRegistration registration)
+  throws IOException {
+    verifyRequest(registration);
+    return namesystem.getEditLogSize();
+  }
+
+  /*
+   * Active name-node cannot journal.
+   */
+  @Override // NamenodeProtocol
+  public void journal(NamenodeRegistration registration,
+                      int jAction,
+                      int length,
+                      byte[] args) throws IOException {
+    throw new UnsupportedActionException("journal");
+  }
+
   /////////////////////////////////////////////////////
   // ClientProtocol
   /////////////////////////////////////////////////////
@@ -778,10 +900,10 @@
    * @param nodeReg data node registration
    * @throws IOException
    */
-  public void verifyRequest(DatanodeRegistration nodeReg) throws IOException {
+  public void verifyRequest(NodeRegistration nodeReg) throws IOException {
     verifyVersion(nodeReg.getVersion());
     if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID()))
-      throw new UnregisteredDatanodeException(nodeReg);
+      throw new UnregisteredNodeException(nodeReg);
   }
     
   /**
@@ -819,7 +941,7 @@
    * @return the address on which the NameNodes is listening to.
    */
   public InetSocketAddress getNameNodeAddress() {
-    return serverAddress;
+    return rpcAddress;
   }
 
   /**
@@ -908,6 +1030,8 @@
   private static void printUsage() {
     System.err.println(
       "Usage: java NameNode [" +
+      StartupOption.BACKUP.getName() + "] | [" +
+      StartupOption.CHECKPOINT.getName() + "] | [" +
       StartupOption.FORMAT.getName() + "] | [" +
       StartupOption.UPGRADE.getName() + "] | [" +
       StartupOption.ROLLBACK.getName() + "] | [" +
@@ -924,6 +1048,10 @@
         startOpt = StartupOption.FORMAT;
       } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.REGULAR;
+      } else if (StartupOption.BACKUP.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.BACKUP;
+      } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.CHECKPOINT;
       } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.UPGRADE;
       } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
@@ -965,11 +1093,12 @@
       case FINALIZE:
         aborted = finalize(conf, true);
         System.exit(aborted ? 1 : 0);
+      case BACKUP:
+      case CHECKPOINT:
+        return new BackupNode(conf, startOpt.toNodeRole());
       default:
+        return new NameNode(conf);
     }
-
-    NameNode namenode = new NameNode(conf);
-    return namenode;
   }
     
   /**

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Sat Mar 14 01:20:36 2009
@@ -55,6 +55,7 @@
  * primary NameNode.
  *
  **********************************************************/
+@Deprecated // use BackupNode with -checkpoint argument instead.
 public class SecondaryNameNode implements Runnable {
     
   public static final Log LOG = 

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when an operation is not supported.
+ */
+public class UnsupportedActionException extends IOException {
+
+  public UnsupportedActionException(String action) {
+    super("Action " + action + "() is not supported.");
+  }
+
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Sat Mar 14 01:20:36 2009
@@ -20,7 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.metrics.*;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.metrics.util.MetricsBase;
@@ -81,18 +81,18 @@
                     new MetricsIntValue("BlocksCorrupted", registry);
 
       
-    public NameNodeMetrics(Configuration conf, NameNode nameNode) {
+    public NameNodeMetrics(Configuration conf, NamenodeRole nameNodeRole) {
       String sessionId = conf.get("session.id");
       // Initiate Java VM metrics
-      JvmMetrics.init("NameNode", sessionId);
+      String processName = nameNodeRole.toString();
+      JvmMetrics.init(processName, sessionId);
 
-      
       // Now the Mbean for the name node - this alos registers the MBean
       namenodeActivityMBean = new NameNodeActivityMBean(registry);
       
       // Create a record for NameNode metrics
       MetricsContext metricsContext = MetricsUtil.getContext("dfs");
-      metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
+      metricsRecord = MetricsUtil.createRecord(metricsContext, processName.toLowerCase());
       metricsRecord.setTag("sessionId", sessionId);
       metricsContext.registerUpdater(this);
       log.info("Initializing NameNodeMeterics using context object:" +

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+
+/**
+ * Checkpoint command.
+ * <p>
+ * Returned to the backup node by the name-node as a reply to the
+ * {@link NamenodeProtocol#startCheckpoint(NamenodeRegistration)}
+ * request.<br>
+ * Contains:
+ * <ul>
+ * <li>{@link CheckpointSignature} identifying the particular checkpoint</li>
+ * <li>indicator whether the backup image should be discarded before starting 
+ * the checkpoint</li>
+ * <li>indicator whether the image should be transfered back to the name-node
+ * upon completion of the checkpoint.</li>
+ * </ul>
+ */
+public class CheckpointCommand extends NamenodeCommand {
+  private CheckpointSignature cSig;
+  private boolean isImageObsolete;
+  private boolean needToReturnImage;
+
+  public CheckpointCommand() {
+    this(null, false, false);
+  }
+
+  public CheckpointCommand(CheckpointSignature sig,
+                           boolean isImgObsolete,
+                           boolean needToReturnImg) {
+    super(NamenodeProtocol.ACT_CHECKPOINT);
+    this.cSig = sig;
+    this.isImageObsolete = isImgObsolete;
+    this.needToReturnImage = needToReturnImg;
+  }
+
+  /**
+   * Checkpoint signature is used to ensure 
+   * that nodes are talking about the same checkpoint.
+   */
+  public CheckpointSignature getSignature() {
+    return cSig;
+  }
+
+  /**
+   * Indicates whether current backup image is obsolete, and therefore
+   * need to be discarded?
+   * 
+   * @return true if current image should be discarded.
+   */
+  public boolean isImageObsolete() {
+    return isImageObsolete;
+  }
+
+  /**
+   * Indicates whether the new checkpoint image needs to be transfered 
+   * back to the name-node after the checkpoint is done.
+   * 
+   * @return true if the checkpoint should be returned back.
+   */
+  public boolean needToReturnImage() {
+    return needToReturnImage;
+  }
+
+  ///////////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////////
+  static {
+    WritableFactories.setFactory(CheckpointCommand.class,
+        new WritableFactory() {
+          public Writable newInstance() {return new CheckpointCommand();}
+        });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    cSig.write(out);
+    out.writeBoolean(isImageObsolete);
+    out.writeBoolean(needToReturnImage);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    cSig = new CheckpointSignature();
+    cSig.readFields(in);
+    isImageObsolete = in.readBoolean();
+    needToReturnImage = in.readBoolean();
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java Sat Mar 14 01:20:36 2009
@@ -17,11 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.io.*;
-
-import org.apache.hadoop.io.*;
-
-public abstract class DatanodeCommand implements Writable {
+import java.io.DataInput;
+import java.io.DataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableFactories;
+
+/**
+ * Base class for data-node command.
+ * Issued by the name-node to notify data-nodes what should be done.
+ */
+public abstract class DatanodeCommand extends ServerCommand {
   static class Register extends DatanodeCommand {
     private Register() {super(DatanodeProtocol.DNA_REGISTER);}
     public void readFields(DataInput in) {}
@@ -47,29 +53,12 @@
 
   public static final DatanodeCommand REGISTER = new Register();
   public static final DatanodeCommand FINALIZE = new Finalize();
-
-  private int action;
   
   public DatanodeCommand() {
-    this(DatanodeProtocol.DNA_UNKNOWN);
+    super();
   }
   
   DatanodeCommand(int action) {
-    this.action = action;
-  }
-
-  public int getAction() {
-    return this.action;
-  }
-  
-  ///////////////////////////////////////////
-  // Writable
-  ///////////////////////////////////////////
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(this.action);
-  }
-  
-  public void readFields(DataInput in) throws IOException {
-    this.action = in.readInt();
+    super(action);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Sat Mar 14 01:20:36 2009
@@ -31,12 +31,12 @@
 import org.apache.hadoop.io.WritableFactory;
 
 /** 
- * DatanodeRegistration class conatins all information the Namenode needs
- * to identify and verify a Datanode when it contacts the Namenode.
- * This information is sent by Datanode with each communication request.
- * 
+ * DatanodeRegistration class contains all information the name-node needs
+ * to identify and verify a data-node when it contacts the name-node.
+ * This information is sent by data-node with each communication request.
  */
-public class DatanodeRegistration extends DatanodeID implements Writable {
+public class DatanodeRegistration extends DatanodeID
+implements Writable, NodeRegistration {
   static {                                      // register a ctor
     WritableFactories.setFactory
       (DatanodeRegistration.class,
@@ -79,18 +79,22 @@
     this.name = name;
   }
 
-  /**
-   */
+  @Override // NodeRegistration
   public int getVersion() {
     return storageInfo.getLayoutVersion();
   }
   
-  /**
-   */
+  @Override // NodeRegistration
   public String getRegistrationID() {
     return Storage.getRegistrationID(storageInfo);
   }
 
+  @Override // NodeRegistration
+  public String getAddress() {
+    return getName();
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName()
       + "(" + name
@@ -99,6 +103,7 @@
       + ", ipcPort=" + ipcPort
       + ")";
   }
+
   /////////////////////////////////////////////////
   // Writable
   /////////////////////////////////////////////////
@@ -109,9 +114,7 @@
     //TODO: move it to DatanodeID once HADOOP-2797 has been committed
     out.writeShort(ipcPort);
 
-    out.writeInt(storageInfo.getLayoutVersion());
-    out.writeInt(storageInfo.getNamespaceID());
-    out.writeLong(storageInfo.getCTime());
+    storageInfo.write(out);
   }
 
   /** {@inheritDoc} */
@@ -121,8 +124,6 @@
     //TODO: move it to DatanodeID once HADOOP-2797 has been committed
     this.ipcPort = in.readShort() & 0x0000ffff;
 
-    storageInfo.layoutVersion = in.readInt();
-    storageInfo.namespaceID = in.readInt();
-    storageInfo.cTime = in.readLong();
+    storageInfo.readFields(in);
   }
 }

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Base class for name-node command.
+ * Issued by the name-node to notify other name-nodes what should be done.
+ */
+public class NamenodeCommand extends ServerCommand {
+  static {
+    WritableFactories.setFactory(NamenodeCommand.class,
+        new WritableFactory() {
+          public Writable newInstance() {return new NamenodeCommand();}
+        });
+  }
+
+  public NamenodeCommand() {
+    super();
+  }
+
+  public NamenodeCommand(int action) {
+    super(action);
+  }
+}



Mime
View raw message