hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r753481 [1/3] - in /hadoop/core/trunk: ./ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/common/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ s...
Date Sat, 14 Mar 2009 01:20:37 GMT
Author: shv
Date: Sat Mar 14 01:20:36 2009
New Revision: 753481

URL: http://svn.apache.org/viewvc?rev=753481&view=rev
Log:
HADOOP-4539. Introduce backup node and checkpoint node. Contributed by Konstantin Shvachko.

Added:
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NodeRegistration.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
Removed:
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredDatanodeException.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/hdfs-default.xml
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/UpgradeUtilities.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
    hadoop/core/trunk/src/webapps/hdfs/dfshealth.jsp
    hadoop/core/trunk/src/webapps/hdfs/dfsnodelist.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar 14 01:20:36 2009
@@ -58,6 +58,8 @@
     HADOOP-4756. A command line tool to access JMX properties on NameNode
     and DataNode. (Boris Shkolnik via rangadi)
 
+    HADOOP-4539. Introduce backup node and checkpoint node. (shv)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information
@@ -941,7 +943,7 @@
     formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
 
     HADOOP-5142. Fix MapWritable#putAll to store key/value classes. 
-    (Doğacan Güney via enis)
+    (Do??acan G??ney via enis)
 
     HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort is invoked on
     the connector. The workaround patch retries a few times before failing.

Modified: hadoop/core/trunk/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/hdfs-default.xml?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/hdfs-default.xml (original)
+++ hadoop/core/trunk/src/hdfs/hdfs-default.xml Sat Mar 14 01:20:36 2009
@@ -123,8 +123,24 @@
   </description>
  </property>
  
+ <property>
+  <name>dfs.backup.address</name>
+  <value>0.0.0.0:50100</value>
+  <description>
+    The backup node server address and port.
+    If the port is 0 then the server will start on a free port.
+  </description>
+</property>
  
- 
+ <property>
+  <name>dfs.backup.http.address</name>
+  <value>0.0.0.0:50105</value>
+  <description>
+    The backup node http server address and port.
+    If the port is 0 then the server will start on a free port.
+  </description>
+</property>
+
 <property>
   <name>dfs.replication.considerLoad</name>
   <value>true</value>

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+
+/**
+ * This exception is thrown when a node that has not previously 
+ * registered is trying to access the name node.
+ */
+public class UnregisteredNodeException extends IOException {
+  private static final long serialVersionUID = -5620209396945970810L;
+
+  public UnregisteredNodeException(NodeRegistration nodeReg) {
+    super("Unregistered server: " + nodeReg.toString());
+  }
+
+  /**
+   * The exception is thrown if a different data-node claims the same
+   * storage id as the existing one.
+   *  
+   * @param nodeID unregistered data-node
+   * @param storedNode data-node stored in the system with this storage id
+   */
+  public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) {
+    super("Data node " + nodeID.getName() 
+          + " is attempting to report storage ID "
+          + nodeID.getStorageID() + ". Node " 
+          + storedNode.getName() + " is expected to serve this storage.");
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Sat Mar 14 01:20:36 2009
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
-
 /************************************
  * Some handy internal HDFS constants
  *
@@ -32,10 +31,12 @@
     DATA_NODE;
   }
 
-  // Startup options
+  /** Startup options */
   static public enum StartupOption{
     FORMAT  ("-format"),
     REGULAR ("-regular"),
+    BACKUP  ("-backup"),
+    CHECKPOINT("-checkpoint"),
     UPGRADE ("-upgrade"),
     ROLLBACK("-rollback"),
     FINALIZE("-finalize"),
@@ -44,6 +45,17 @@
     private String name = null;
     private StartupOption(String arg) {this.name = arg;}
     public String getName() {return name;}
+    public NamenodeRole toNodeRole() {
+      switch(this) {
+      case BACKUP: 
+        return NamenodeRole.BACKUP;
+      case CHECKPOINT: 
+        return NamenodeRole.CHECKPOINT;
+      default:
+        return NamenodeRole.ACTIVE;
+      }
+    }
+
   }
 
   // Timeouts for communicating with DataNode for streaming writes/reads
@@ -51,5 +63,21 @@
   public static int WRITE_TIMEOUT = 8 * 60 * 1000;
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
 
+  /**
+   * Defines the NameNode role.
+   */
+  static public enum NamenodeRole {
+    ACTIVE    ("NameNode"),
+    BACKUP    ("Backup Node"),
+    CHECKPOINT("Checkpoint Node"),
+    STANDBY   ("Standby Node");
+
+    private String description = null;
+    private NamenodeRole(String arg) {this.description = arg;}
+  
+    public String toString() {
+      return description;
+    }
+  }
 }
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java Sat Mar 14 01:20:36 2009
@@ -1,15 +1,21 @@
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
 
 /**
  * Common class for storage information.
  * 
  * TODO namespaceID should be long and computed as hash(address + port)
  */
-public class StorageInfo {
-  public int   layoutVersion;  // Version read from the stored file.
-  public int   namespaceID;    // namespace id of the storage
-  public long  cTime;          // creation timestamp
+public class StorageInfo implements Writable {
+  public int   layoutVersion;   // layout version of the storage data
+  public int   namespaceID;     // id of the file system
+  public long  cTime;           // creation time of the file system state
   
   public StorageInfo () {
     this(0, 0, 0L);
@@ -25,8 +31,22 @@
     setStorageInfo(from);
   }
 
+  /**
+   * Layout version of the storage data.
+   */
   public int    getLayoutVersion(){ return layoutVersion; }
+
+  /**
+   * Namespace id of the file system.<p>
+   * Assigned to the file system at formatting and never changes after that.
+   * Shared by all file system components.
+   */
   public int    getNamespaceID()  { return namespaceID; }
+
+  /**
+   * Creation time of the file system state.<p>
+   * Modified during upgrades.
+   */
   public long   getCTime()        { return cTime; }
 
   public void   setStorageInfo(StorageInfo from) {
@@ -34,4 +54,19 @@
     namespaceID = from.namespaceID;
     cTime = from.cTime;
   }
-}
\ No newline at end of file
+
+  /////////////////////////////////////////////////
+  // Writable
+  /////////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(getLayoutVersion());
+    out.writeInt(getNamespaceID());
+    out.writeLong(getCTime());
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    layoutVersion = in.readInt();
+    namespaceID = in.readInt();
+    cTime = in.readLong();
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Mar 14 01:20:36 2009
@@ -54,7 +54,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -803,7 +803,7 @@
         } // synchronized
       } catch(RemoteException re) {
         String reClass = re.getClassName();
-        if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
+        if (UnregisteredNodeException.class.getName().equals(reClass) ||
             DisallowedDatanodeException.class.getName().equals(reClass) ||
             IncorrectVersionException.class.getName().equals(reClass)) {
           LOG.warn("DataNode is shutting down: " + 

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Daemon;
+
+/**
+ * BackupNode.
+ * <p>
+ * Backup node can play two roles.
+ * <ol>
+ * <li>{@link NamenodeRole#CHECKPOINT} node periodically creates checkpoints, 
+ * that is downloads image and edits from the active node, merges them, and
+ * uploads the new image back to the active.</li>
+ * <li>{@link NamenodeRole#BACKUP} node keeps its namespace in sync with the
+ * active node, and periodically creates checkpoints by simply saving the
+ * namespace image to local disk(s).</li>
+ * </ol>
+ */
+class BackupNode extends NameNode {
+  private static final String BN_ADDRESS_NAME_KEY = "dfs.backup.address";
+  private static final String BN_ADDRESS_DEFAULT = "localhost:50100";
+  private static final String BN_HTTP_ADDRESS_NAME_KEY = "dfs.backup.http.address";
+  private static final String BN_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
+
+  /** Name-node proxy */
+  NamenodeProtocol namenode;
+  /** Name-node RPC address */
+  String nnRpcAddress;
+  /** Name-node HTTP address */
+  String nnHttpAddress;
+  /** Checkpoint manager */
+  Checkpointer checkpointManager;
+  /** Checkpoint daemon */
+  private Daemon cpDaemon;
+
+  BackupNode(Configuration conf, NamenodeRole role) throws IOException {
+    super(conf, role);
+  }
+
+  /////////////////////////////////////////////////////
+  // Common NameNode methods implementation for backup node.
+  /////////////////////////////////////////////////////
+  @Override // NameNode
+  protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
+    String addr = conf.get(BN_ADDRESS_NAME_KEY, BN_ADDRESS_DEFAULT);
+    int port = NetUtils.createSocketAddr(addr).getPort();
+    String hostName = DNS.getDefaultHost("default");
+    return new InetSocketAddress(hostName, port);
+  }
+
+  @Override // NameNode
+  protected void setRpcServerAddress(Configuration conf) {
+    conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress));
+  }
+
+  @Override // NameNode
+  protected InetSocketAddress getHttpServerAddress(Configuration conf) {
+    assert rpcAddress != null : "rpcAddress should be calculated first";
+    String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
+    int port = NetUtils.createSocketAddr(addr).getPort();
+    String hostName = rpcAddress.getHostName();
+    return new InetSocketAddress(hostName, port);
+  }
+
+  @Override // NameNode
+  protected void setHttpServerAddress(Configuration conf){
+    conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(httpAddress));
+  }
+
+  @Override // NameNode
+  protected void loadNamesystem(Configuration conf) throws IOException {
+    BackupStorage bnImage = new BackupStorage();
+    this.namesystem = new FSNamesystem(conf, bnImage);
+    bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
+                              FSNamesystem.getNamespaceEditsDirs(conf));
+  }
+
+  @Override // NameNode
+  protected void initialize(Configuration conf) throws IOException {
+    // Trash is disabled in BackupNameNode,
+    // but should be turned back on if it ever becomes active.
+    conf.setLong("fs.trash.interval", 0L);
+    NamespaceInfo nsInfo = handshake(conf);
+    super.initialize(conf);
+    // Backup node should never do lease recovery,
+    // therefore lease hard limit should never expire.
+    namesystem.leaseManager.setLeasePeriod(
+        FSConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);
+    // register with the active name-node 
+    registerWith(nsInfo);
+    // Checkpoint daemon should start after the rpc server started
+    runCheckpointDaemon(conf);
+  }
+
+  @Override // NameNode
+  public void stop() {
+    if(checkpointManager != null) checkpointManager.shouldRun = false;
+    if(cpDaemon != null) cpDaemon.interrupt();
+    if(namenode != null && getRegistration() != null) {
+      try {
+        namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
+            "Shutting down.");
+      } catch(IOException e) {
+        LOG.error("Failed to report to name-node.", e);
+      }
+    }
+    RPC.stopProxy(namenode); // stop the RPC threads
+    super.stop();
+  }
+
+  /////////////////////////////////////////////////////
+  // NamenodeProtocol implementation for backup node.
+  /////////////////////////////////////////////////////
+  @Override // NamenodeProtocol
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+  throws IOException {
+    throw new UnsupportedActionException("getBlocks");
+  }
+
+  // Only active name-node can register other nodes.
+  @Override // NamenodeProtocol
+  public NamenodeRegistration register(NamenodeRegistration registration
+  ) throws IOException {
+    throw new UnsupportedActionException("journal");
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+  throws IOException {
+    throw new UnsupportedActionException("startCheckpoint");
+  }
+
+  @Override // NamenodeProtocol
+  public void endCheckpoint(NamenodeRegistration registration,
+                            CheckpointSignature sig) throws IOException {
+    throw new UnsupportedActionException("endCheckpoint");
+  }
+
+  @Override // NamenodeProtocol
+  public void journal(NamenodeRegistration nnReg,
+                      int jAction,
+                      int length,
+                      byte[] args) throws IOException {
+    verifyRequest(nnReg);
+    if(!nnRpcAddress.equals(nnReg.getAddress()))
+      throw new IOException("Journal request from unexpected name-node: "
+          + nnReg.getAddress() + " expecting " + nnRpcAddress);
+    BackupStorage bnImage = (BackupStorage)getFSImage();
+    switch(jAction) {
+      case (int)JA_IS_ALIVE:
+        return;
+      case (int)JA_JOURNAL:
+        bnImage.journal(length, args);
+        return;
+      case (int)JA_JSPOOL_START:
+        bnImage.startJournalSpool(nnReg);
+        return;
+      case (int)JA_CHECKPOINT_TIME:
+        bnImage.setCheckpointTime(length, args);
+        setRegistration(); // keep registration up to date
+        return;
+      default:
+        throw new IOException("Unexpected journal action: " + jAction);
+    }
+  }
+
+  boolean shouldCheckpointAtStartup() {
+    FSImage fsImage = getFSImage();
+    if(isRole(NamenodeRole.CHECKPOINT)) {
+      assert fsImage.getNumStorageDirs() > 0;
+      return ! fsImage.getStorageDir(0).getVersionFile().exists();
+    }
+    if(namesystem == null || namesystem.dir == null || getFSImage() == null)
+      return true;
+    return fsImage.getEditLog().getNumEditStreams() == 0;
+  }
+
+  private NamespaceInfo handshake(Configuration conf) throws IOException {
+    // connect to name node
+    InetSocketAddress nnAddress = super.getRpcServerAddress(conf);
+    this.namenode =
+      (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
+          NamenodeProtocol.versionID, nnAddress, conf);
+    this.nnRpcAddress = getHostPortString(nnAddress);
+    this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
+    // get version and id info from the name-node
+    NamespaceInfo nsInfo = null;
+    while(!stopRequested) {
+      try {
+        nsInfo = handshake(namenode);
+        break;
+      } catch(SocketTimeoutException e) {  // name-node is busy
+        LOG.info("Problem connecting to server: " + nnAddress);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {}
+      }
+    }
+    return nsInfo;
+  }
+
+  /**
+   * Start a backup node daemon.
+   */
+  private void runCheckpointDaemon(Configuration conf) throws IOException {
+    checkpointManager = new Checkpointer(conf, this);
+    cpDaemon = new Daemon(checkpointManager);
+    cpDaemon.start();
+  }
+
+  /**
+   * Checkpoint.<br>
+   * Tests may use it to initiate a checkpoint process.
+   * @throws IOException
+   */
+  void doCheckpoint() throws IOException {
+    checkpointManager.doCheckpoint();
+  }
+
+  CheckpointStates getCheckpointState() {
+    return getFSImage().getCheckpointState();
+  }
+
+  void setCheckpointState(CheckpointStates cs) {
+    getFSImage().setCheckpointState(cs);
+  }
+
+  /**
+   * Register this backup node with the active name-node.
+   * @param nsInfo
+   * @throws IOException
+   */
+  private void registerWith(NamespaceInfo nsInfo) throws IOException {
+    BackupStorage bnImage = (BackupStorage)getFSImage();
+    // verify namespaceID
+    if(bnImage.getNamespaceID() == 0) // new backup storage
+      bnImage.setStorageInfo(nsInfo);
+    else if(bnImage.getNamespaceID() != nsInfo.getNamespaceID())
+      throw new IOException("Incompatible namespaceIDs"
+          + ": active node namespaceID = " + nsInfo.getNamespaceID() 
+          + "; backup node namespaceID = " + bnImage.getNamespaceID());
+
+    setRegistration();
+    NamenodeRegistration nnReg = null;
+    while(!stopRequested) {
+      try {
+        nnReg = namenode.register(getRegistration());
+        break;
+      } catch(SocketTimeoutException e) {  // name-node is busy
+        LOG.info("Problem connecting to name-node: " + nnRpcAddress);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {}
+      }
+    }
+
+    String msg = null;
+    if(nnReg == null) // consider as a rejection
+      msg = "Registration rejected by " + nnRpcAddress;
+    else if(!nnReg.isRole(NamenodeRole.ACTIVE)) {
+      msg = "Name-node " + nnRpcAddress + " is not active";
+    }
+    if(msg != null) {
+      msg += ". Shutting down.";
+      LOG.error(msg);
+      throw new IOException(msg); // stop the node
+    }
+    nnRpcAddress = nnReg.getAddress();
+  }
+
+  /**
+   * Reset node namespace state in memory and in storage directories.
+   * @throws IOException
+   */
+  void resetNamespace() throws IOException {
+    ((BackupStorage)getFSImage()).reset();
+  }
+
+  /**
+   * Get size of the local journal (edit log).
+   * @return size of the current journal
+   * @throws IOException
+   */
+  long journalSize() throws IOException {
+    return namesystem.getEditLogSize();
+  }
+
+  // TODO: move to a common with DataNode util class
+  private static NamespaceInfo handshake(NamenodeProtocol namenode)
+  throws IOException, SocketTimeoutException {
+    NamespaceInfo nsInfo;
+    nsInfo = namenode.versionRequest();  // throws SocketTimeoutException 
+    String errorMsg = null;
+    // verify build version
+    if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion())) {
+      errorMsg = "Incompatible build versions: active name-node BV = " 
+        + nsInfo.getBuildVersion() + "; backup node BV = "
+        + Storage.getBuildVersion();
+      LOG.fatal(errorMsg);
+      throw new IOException(errorMsg);
+    }
+    assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+      "Active and backup node layout versions must be the same. Expected: "
+      + FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
+    return nsInfo;
+  }
+}

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.io.LongWritable;
+
+public class BackupStorage extends FSImage {
+  // Names of the journal spool directory and the spool file
+  private static final String STORAGE_JSPOOL_DIR = "jspool";
+  private static final String STORAGE_JSPOOL_FILE = 
+                                              NameNodeFile.EDITS_NEW.getName();
+
+  /** Backup input stream for loading edits into memory */
+  private EditLogBackupInputStream backupInputStream;
+  /** Is journal spooling in progress */
+  volatile JSpoolState jsState;
+
+  static enum JSpoolState {
+    OFF,
+    INPROGRESS,
+    WAIT;
+  }
+
+  /**
+   */
+  BackupStorage() {
+    super();
+    jsState = JSpoolState.OFF;
+  }
+
+  @Override
+  public boolean isConversionNeeded(StorageDirectory sd) {
+    return false;
+  }
+
+  /**
+   * Analyze backup storage directories for consistency.<br>
+   * Recover from incomplete checkpoints if required.<br>
+   * Read VERSION and fstime files if exist.<br>
+   * Do not load image or edits.
+   * 
+   * @param imageDirs list of image directories.
+   * @param editsDirs list of edits directories.
+   * @throws IOException if the node should shutdown.
+   */
+  void recoverCreateRead(Collection<File> imageDirs,
+                         Collection<File> editsDirs) throws IOException {
+    setStorageDirectories(imageDirs, editsDirs);
+    this.checkpointTime = 0L;
+    for(Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      StorageState curState;
+      try {
+        curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
+        // sd is locked but not opened
+        switch(curState) {
+        case NON_EXISTENT:
+          // fail if any of the configured storage dirs are inaccessible 
+          throw new InconsistentFSStateException(sd.getRoot(),
+                "checkpoint directory does not exist or is not accessible.");
+        case NOT_FORMATTED:
+          // for backup node all directories may be unformatted initially
+          LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
+          LOG.info("Formatting ...");
+          sd.clearDirectory(); // create empty current
+          break;
+        case NORMAL:
+          break;
+        default:  // recovery is possible
+          sd.doRecover(curState);
+        }
+        if(curState != StorageState.NOT_FORMATTED) {
+          sd.read(); // read and verify consistency with other directories
+        }
+      } catch(IOException ioe) {
+        sd.unlock();
+        throw ioe;
+      }
+    }
+  }
+
+  /**
+   * Reset storage directories.
+   * <p>
+   * Unlock the storage.
+   * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
+   * and recreate empty <code>current</code>.
+   * @throws IOException
+   */
+  synchronized void reset() throws IOException {
+    // reset NameSpace tree
+    FSDirectory fsDir = getFSNamesystem().dir;
+    fsDir.reset();
+
+    // unlock, close and rename storage directories
+    unlockAll();
+    // recover from unsuccessful checkpoint if necessary
+    recoverCreateRead(getImageDirectories(), getEditsDirectories());
+    // rename and recreate
+    for(StorageDirectory sd : storageDirs) {
+      File curDir = sd.getCurrentDir();
+      File tmpCkptDir = sd.getLastCheckpointTmp();
+      assert !tmpCkptDir.exists() : 
+        tmpCkptDir.getName() + " directory must not exist.";
+      if(!sd.getVersionFile().exists())
+        continue;
+      // rename current to lastcheckpoint.tmp
+      rename(curDir, tmpCkptDir);
+      if(!curDir.mkdir())
+        throw new IOException("Cannot create directory " + curDir);
+    }
+  }
+
+  /**
+   * Load checkpoint from local files only if the memory state is empty.<br>
+   * Set new checkpoint time received from the name-node.<br>
+   * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+   * @throws IOException
+   */
+  void loadCheckpoint(CheckpointSignature sig) throws IOException {
+    // load current image and journal if it is not in memory already
+    if(!editLog.isOpen())
+      editLog.open();
+
+    FSDirectory fsDir = getFSNamesystem().dir;
+    if(fsDir.isEmpty()) {
+      Iterator<StorageDirectory> itImage = dirIterator(NameNodeDirType.IMAGE);
+      Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
+      if(!itImage.hasNext() || ! itEdits.hasNext())
+        throw new IOException("Could not locate checkpoint directories");
+      StorageDirectory sdName = itImage.next();
+      StorageDirectory sdEdits = itEdits.next();
+      synchronized(getFSDirectoryRootLock()) { // load image under rootDir lock
+        loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+      }
+      loadFSEdits(sdEdits);
+    }
+
+    // set storage fields
+    setStorageInfo(sig);
+    checkpointTime = sig.checkpointTime;
+  }
+
+  /**
+   * Save meta-data into fsimage files.
+   * and create empty edits.
+   */
+  void saveCheckpoint() throws IOException {
+    // save image into fsimage.ckpt and purge edits file
+    for (Iterator<StorageDirectory> it = 
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
+      if (dirType.isOfType(NameNodeDirType.IMAGE))
+        saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
+      if (dirType.isOfType(NameNodeDirType.EDITS))
+        editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+    }
+
+    ckptState = CheckpointStates.UPLOAD_DONE;
+    renameCheckpoint();
+  }
+
+  private FSNamesystem getFSNamesystem() {
+    // HADOOP-5119 should get rid of this.
+    return FSNamesystem.getFSNamesystem();
+  }
+
+  private Object getFSDirectoryRootLock() {
+    return getFSNamesystem().dir.rootDir;
+  }
+
+  static File getJSpoolDir(StorageDirectory sd) {
+    return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
+  }
+
+  static File getJSpoolFile(StorageDirectory sd) {
+    return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+  }
+
+  /**
+   * Journal writer journals new meta-data state.
+   * <ol>
+   * <li> If Journal Spool state is OFF then journal records (edits)
+   * are applied directly to meta-data state in memory and are written 
+   * to the edits file(s).</li>
+   * <li> If Journal Spool state is INPROGRESS then records are only 
+   * written to edits.new file, which is called Spooling.</li>
+   * <li> Journal Spool state WAIT blocks journaling until the
+   * Journal Spool reader finalizes merging of the spooled data and
+   * switches to applying journal to memory.</li>
+   * </ol>
+   * @param length length of data.
+   * @param data serialized journal records.
+   * @throws IOException
+   * @see #convergeJournalSpool()
+   */
+  synchronized void journal(int length, byte[] data) throws IOException {
+    assert backupInputStream.length() == 0 : "backup input stream is not empty";
+    try {
+      switch(jsState) {
+        case WAIT:
+        case OFF:
+          // wait until spooling is off
+          waitSpoolEnd();
+          // update NameSpace in memory
+          backupInputStream.setBytes(data);
+          FSEditLog.loadEditRecords(getLayoutVersion(),
+                    backupInputStream.getDataInputStream(), true);
+          getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
+          break;
+        case INPROGRESS:
+          break;
+      }
+      // write to files
+      editLog.logEdit(length, data);
+      editLog.logSync();
+    } finally {
+      backupInputStream.clear();
+    }
+  }
+
+  private synchronized void waitSpoolEnd() {
+    while(jsState == JSpoolState.WAIT) {
+      try {
+        wait();
+      } catch (InterruptedException  e) {}
+    }
+    // now spooling should be off, verifying just in case
+    assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+  }
+
+  /**
+   * Start journal spool.
+   * Switch to writing into edits.new instead of edits.
+   * 
+   * edits.new for spooling is in separate directory "spool" rather than in
+   * "current" because the two directories should be independent.
+   * While spooling a checkpoint can happen and current will first
+   * move to lastcheckpoint.tmp and then to previous.checkpoint
+   * spool/edits.new will remain in place during that.
+   */
+  synchronized void startJournalSpool(NamenodeRegistration nnReg)
+  throws IOException {
+    switch(jsState) {
+      case OFF:
+        break;
+      case INPROGRESS:
+        return;
+      case WAIT:
+        waitSpoolEnd();
+    }
+
+    // create journal spool directories
+    for(Iterator<StorageDirectory> it = 
+                          dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      File jsDir = getJSpoolDir(sd);
+      if (!jsDir.exists() && !jsDir.mkdirs()) {
+        throw new IOException("Mkdirs failed to create "
+                              + jsDir.getCanonicalPath());
+      }
+      // create edit file if missing
+      File eFile = getEditFile(sd);
+      if(!eFile.exists()) {
+        editLog.createEditLogFile(eFile);
+      }
+    }
+
+    if(!editLog.isOpen())
+      editLog.open();
+
+    // create streams pointing to the journal spool files
+    // subsequent journal records will go directly to the spool
+    editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+    setCheckpointState(CheckpointStates.ROLLED_EDITS);
+
+    // set up spooling
+    if(backupInputStream == null)
+      backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
+    jsState = JSpoolState.INPROGRESS;
+  }
+
+  synchronized void setCheckpointTime(int length, byte[] data)
+  throws IOException {
+    assert backupInputStream.length() == 0 : "backup input stream is not empty";
+    try {
+      // unpack new checkpoint time
+      backupInputStream.setBytes(data);
+      DataInputStream in = backupInputStream.getDataInputStream();
+      byte op = in.readByte();
+      assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
+      LongWritable lw = new LongWritable();
+      lw.readFields(in);
+      setCheckpointTime(lw.get());
+    } finally {
+      backupInputStream.clear();
+    }
+  }
+
+  /**
+   * Merge Journal Spool to memory.<p>
+   * Journal Spool reader reads journal records from edits.new.
+   * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
+   * This blocks journaling (see {@link #journal(int,byte[])}.
+   * The reader
+   * <ul>
+   * <li> reads remaining journal records if any,</li>
+   * <li> renames edits.new to edits,</li>
+   * <li> sets {@link JSpoolState} to OFF,</li> 
+   * <li> and notifies the journaling thread.</li>
+   * </ul>
+   * Journaling resumes with applying new journal records to the memory state,
+   * and writing them into edits file(s).
+   */
+  void convergeJournalSpool() throws IOException {
+    Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
+    if(! itEdits.hasNext())
+      throw new IOException("Could not locate checkpoint directories");
+    StorageDirectory sdEdits = itEdits.next();
+    int numEdits = 0;
+    File jSpoolFile = getJSpoolFile(sdEdits);
+    long startTime = FSNamesystem.now();
+    if(jSpoolFile.exists()) {
+      // load edits.new
+      EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
+      DataInputStream in = edits.getDataInputStream();
+      numEdits += FSEditLog.loadFSEdits(in, false);
+  
+      // first time reached the end of spool
+      jsState = JSpoolState.WAIT;
+      numEdits += FSEditLog.loadEditRecords(getLayoutVersion(), in, true);
+      getFSNamesystem().dir.updateCountForINodeWithQuota();
+      edits.close();
+    }
+
+    FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath() 
+        + " of size " + jSpoolFile.length() + " edits # " + numEdits 
+        + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
+
+    // rename spool edits.new to edits making it in sync with the active node
+    // subsequent journal records will go directly to edits
+    editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+
+    // write version file
+    resetVersion(false);
+
+    // wake up journal writer
+    synchronized(this) {
+      jsState = JSpoolState.OFF;
+      notifyAll();
+    }
+
+    // Rename lastcheckpoint.tmp to previous.checkpoint
+    for(StorageDirectory sd : storageDirs) {
+      File tmpCkptDir = sd.getLastCheckpointTmp();
+      File prevCkptDir = sd.getPreviousCheckpoint();
+      // delete previous directory
+      if (prevCkptDir.exists())
+        deleteDir(prevCkptDir);
+      // rename tmp to previous
+      if (tmpCkptDir.exists())
+        rename(tmpCkptDir, prevCkptDir);
+    }
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Sat Mar 14 01:20:36 2009
@@ -22,6 +22,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -33,12 +34,12 @@
   long editsTime = -1L;
   long checkpointTime = -1L;
 
-  CheckpointSignature() {}
+  public CheckpointSignature() {}
 
   CheckpointSignature(FSImage fsImage) {
     super(fsImage);
     editsTime = fsImage.getEditLog().getFsEditTime();
-    checkpointTime = fsImage.checkpointTime;
+    checkpointTime = fsImage.getCheckpointTime();
   }
 
   CheckpointSignature(String str) {
@@ -59,14 +60,17 @@
          + String.valueOf(checkpointTime);
   }
 
-  void validateStorageInfo(StorageInfo si) throws IOException {
+  void validateStorageInfo(FSImage si) throws IOException {
     if(layoutVersion != si.layoutVersion
-        || namespaceID != si.namespaceID || cTime != si.cTime) {
+        || namespaceID != si.namespaceID || cTime != si.cTime
+        || checkpointTime != si.checkpointTime) {
       // checkpointTime can change when the image is saved - do not compare
-      throw new IOException("Inconsistent checkpoint fileds. "
+      throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
-          + " cTime = " + cTime + ". Expecting respectively: "
-          + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime);
+          + " cTime = " + cTime + "; checkpointTime = " + checkpointTime 
+          + ".\nExpecting respectively: "
+          + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime
+          + "; " + si.checkpointTime);
     }
   }
 
@@ -100,17 +104,13 @@
   // Writable
   /////////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    out.writeInt(getLayoutVersion());
-    out.writeInt(getNamespaceID());
-    out.writeLong(getCTime());
+    super.write(out);
     out.writeLong(editsTime);
     out.writeLong(checkpointTime);
   }
 
   public void readFields(DataInput in) throws IOException {
-    layoutVersion = in.readInt();
-    namespaceID = in.readInt();
-    cTime = in.readLong();
+    super.readFields(in);
     editsTime = in.readLong();
     checkpointTime = in.readLong();
   }

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+
+/**
+ * The Checkpointer is responsible for supporting periodic checkpoints 
+ * of the HDFS metadata.
+ *
+ * The Checkpointer is a daemon that periodically wakes up
+ * up (determined by the schedule specified in the configuration),
+ * triggers a periodic checkpoint and then goes back to sleep.
+ * 
+ * The start of a checkpoint is triggered by one of the two factors:
+ * (1) time or (2) the size of the edits file.
+ */
+class Checkpointer implements Runnable {
+  public static final Log LOG = 
+    LogFactory.getLog(Checkpointer.class.getName());
+
+  private BackupNode backupNode;
+  volatile boolean shouldRun;
+  private long checkpointPeriod;	// in seconds
+  private long checkpointSize;    // size (in MB) of current Edit Log
+
+  private BackupStorage getFSImage() {
+    return (BackupStorage)backupNode.getFSImage();
+  }
+
+  private NamenodeProtocol getNamenode(){
+    return backupNode.namenode;
+  }
+
+  /**
+   * Create a connection to the primary namenode.
+   */
+  Checkpointer(Configuration conf, BackupNode bnNode)  throws IOException {
+    this.backupNode = bnNode;
+    try {
+      initialize(conf);
+    } catch(IOException e) {
+      shutdown();
+      throw e;
+    }
+  }
+
+  /**
+   * Initialize checkpoint.
+   */
+  private void initialize(Configuration conf) throws IOException {
+    // Create connection to the namenode.
+    shouldRun = true;
+
+    // Initialize other scheduling parameters from the configuration
+    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
+    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
+
+    HttpServer httpServer = backupNode.httpServer;
+    httpServer.setAttribute("name.system.image", getFSImage());
+    httpServer.setAttribute("name.conf", conf);
+    httpServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+
+    LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
+             "(" + checkpointPeriod/60 + " min)");
+    LOG.info("Log Size Trigger  : " + checkpointSize + " bytes " +
+             "(" + checkpointSize/1024 + " KB)");
+  }
+
+  /**
+   * Shut down the checkpointer.
+   */
+  void shutdown() {
+    shouldRun = false;
+    backupNode.stop();
+  }
+
+  //
+  // The main work loop
+  //
+  public void run() {
+    // Check the size of the edit log once every 5 minutes.
+    long periodMSec = 5 * 60;   // 5 minutes
+    if(checkpointPeriod < periodMSec) {
+      periodMSec = checkpointPeriod;
+    }
+    periodMSec *= 1000;
+
+    long lastCheckpointTime = 0;
+    if(!backupNode.shouldCheckpointAtStartup())
+      lastCheckpointTime = FSNamesystem.now();
+    while(shouldRun) {
+      try {
+        long now = FSNamesystem.now();
+        boolean shouldCheckpoint = false;
+        if(now >= lastCheckpointTime + periodMSec) {
+          shouldCheckpoint = true;
+        } else {
+          long size = getJournalSize();
+          if(size >= checkpointSize)
+            shouldCheckpoint = true;
+        }
+        if(shouldCheckpoint) {
+          doCheckpoint();
+          lastCheckpointTime = now;
+        }
+      } catch(IOException e) {
+        LOG.error("Exception in doCheckpoint: ", e);
+      } catch(Throwable e) {
+        LOG.error("Throwable Exception in doCheckpoint: ", e);
+        Runtime.getRuntime().exit(-1);
+      }
+      try {
+        Thread.sleep(periodMSec);
+      } catch(InterruptedException ie) {
+        // do nothing
+      }
+    }
+  }
+
+  private long getJournalSize() throws IOException {
+    // If BACKUP node has been loaded
+    // get edits size from the local file. ACTIVE has the same.
+    if(backupNode.isRole(NamenodeRole.BACKUP)
+        && getFSImage().getEditLog().isOpen())
+      return backupNode.journalSize();
+    // Go to the ACTIVE node for its size
+    return getNamenode().journalSize(backupNode.getRegistration());
+  }
+
+  /**
+   * Download <code>fsimage</code> and <code>edits</code>
+   * files from the remote name-node.
+   */
+  private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
+    // Retrieve image file
+    String fileid = "getimage=1";
+    File[] files = getFSImage().getImageFiles();
+    assert files.length > 0 : "No checkpoint targets.";
+    String nnHttpAddr = backupNode.nnHttpAddress;
+    TransferFsImage.getFileClient(nnHttpAddr, fileid, files);
+    LOG.info("Downloaded file " + files[0].getName() + " size " +
+             files[0].length() + " bytes.");
+
+    // Retrieve edits file
+    fileid = "getedit=1";
+    files = getFSImage().getEditsFiles();
+    assert files.length > 0 : "No checkpoint targets.";
+    TransferFsImage.getFileClient(nnHttpAddr, fileid, files);
+    LOG.info("Downloaded file " + files[0].getName() + " size " +
+        files[0].length() + " bytes.");
+  }
+
+  /**
+   * Copy the new image into remote name-node.
+   */
+  private void uploadCheckpoint(CheckpointSignature sig) throws IOException {
+    InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
+    int httpPort = httpSocAddr.getPort();
+    String fileid = "putimage=1&port=" + httpPort +
+      "&machine=" +
+      InetAddress.getLocalHost().getHostAddress() +
+      "&token=" + sig.toString();
+    LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
+    TransferFsImage.getFileClient(backupNode.nnHttpAddress, fileid, (File[])null);
+  }
+
+  /**
+   * Create a new checkpoint
+   */
+  void doCheckpoint() throws IOException {
+    long startTime = FSNamesystem.now();
+    NamenodeCommand cmd = 
+      getNamenode().startCheckpoint(backupNode.getRegistration());
+    CheckpointCommand cpCmd = null;
+    switch(cmd.getAction()) {
+      case NamenodeProtocol.ACT_SHUTDOWN:
+        shutdown();
+        throw new IOException("Name-node " + backupNode.nnRpcAddress
+                                           + " requested shutdown.");
+      case NamenodeProtocol.ACT_CHECKPOINT:
+        cpCmd = (CheckpointCommand)cmd;
+        break;
+      default:
+        throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
+    }
+
+    CheckpointSignature sig = cpCmd.getSignature();
+    assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :
+      "Signature should have current layout version. Expected: "
+      + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();
+    assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||
+      cpCmd.isImageObsolete() : "checkpoint node should always download image.";
+    backupNode.setCheckpointState(CheckpointStates.UPLOAD_START);
+    if(cpCmd.isImageObsolete()) {
+      // First reset storage on disk and memory state
+      backupNode.resetNamespace();
+      downloadCheckpoint(sig);
+    }
+
+    BackupStorage bnImage = getFSImage();
+    bnImage.loadCheckpoint(sig);
+    sig.validateStorageInfo(bnImage);
+    bnImage.saveCheckpoint();
+
+    if(cpCmd.needToReturnImage())
+      uploadCheckpoint(sig);
+
+    getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
+
+    bnImage.convergeJournalSpool();
+    backupNode.setRegistration(); // keep registration up to date
+    if(backupNode.isRole(NamenodeRole.CHECKPOINT))
+        getFSImage().getEditLog().close();
+    LOG.info("Checkpoint completed in "
+        + (FSNamesystem.now() - startTime)/1000 + " seconds."
+        +	" New Image Size: " + bnImage.getFsImageName().length());
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Mar 14 01:20:36 2009
@@ -367,9 +367,9 @@
 
   void reportDiff(BlocksMap blocksMap,
                   BlockListAsLongs newReport,
-                  Collection<Block> toAdd,
-                  Collection<Block> toRemove,
-                  Collection<Block> toInvalidate) {
+                  Collection<Block> toAdd,    // add to DatanodeDescriptor
+                  Collection<Block> toRemove, // remove from DatanodeDescriptor
+                  Collection<Block> toInvalidate) { // should be removed from DN
     // place a deilimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Sat Mar 14 01:20:36 2009
@@ -64,7 +64,7 @@
         try {
           Thread.sleep(recheckInterval);
         } catch (InterruptedException ie) {
-          LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
+          LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie);
         }
       }
     }

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * An implementation of the abstract class {@link EditLogInputStream},
+ * which is used to updates HDFS meta-data state on a backup node.
+ * 
+ * @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal
+ * (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration,
+ *  int, int, byte[])
+ */
+class EditLogBackupInputStream extends EditLogInputStream {
+  String address; // sender address 
+  private ByteBufferInputStream inner;
+  private DataInputStream in;
+
+  /**
+   * A ByteArrayInputStream, which lets modify the underlying byte array.
+   */
+  private static class ByteBufferInputStream extends ByteArrayInputStream {
+    ByteBufferInputStream() {
+      super(new byte[0]);
+    }
+
+    byte[] getData() {
+      return super.buf;
+    }
+
+    void setData(byte[] newBytes) {
+      super.buf = newBytes;
+      super.count = newBytes == null ? 0 : newBytes.length;
+      super.mark = 0;
+      reset();
+    }
+
+    /**
+     * Number of bytes read from the stream so far.
+     */
+    int length() {
+      return count;
+    }
+  }
+
+  EditLogBackupInputStream(String name) throws IOException {
+    address = name;
+    inner = new ByteBufferInputStream();
+    in = new DataInputStream(inner);
+  }
+
+  @Override // JournalStream
+  public String getName() {
+    return address;
+  }
+
+  @Override // JournalStream
+  public JournalType getType() {
+    return JournalType.BACKUP;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return in.available();
+  }
+
+  @Override
+  public int read() throws IOException {
+    return in.read();
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return in.read(b, off, len);
+  }
+
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+
+  @Override
+  long length() throws IOException {
+    // file size + size of both buffers
+    return inner.length();
+  }
+
+  DataInputStream getDataInputStream() {
+    return in;
+  }
+
+  void setBytes(byte[] newBytes) throws IOException {
+    inner.setData(newBytes);
+    in.reset();
+  }
+
+  void clear() throws IOException {
+    setBytes(null);
+  }
+}

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,190 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * An implementation of the abstract class {@link EditLogOutputStream},
+ * which streams edits to a backup node.
+ * 
+ * @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal
+ * (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration,
+ *  int, int, byte[])
+ */
+class EditLogBackupOutputStream extends EditLogOutputStream {
+  static int DEFAULT_BUFFER_SIZE = 256;
+
+  private NamenodeProtocol backupNode;          // RPC proxy to backup node
+  private NamenodeRegistration bnRegistration;  // backup node registration
+  private NamenodeRegistration nnRegistration;  // active node registration
+  private ArrayList<JournalRecord> bufCurrent;  // current buffer for writing
+  private ArrayList<JournalRecord> bufReady;    // buffer ready for flushing
+  private DataOutputBuffer out;     // serialized output sent to backup node
+
+  static class JournalRecord {
+    byte op;
+    Writable[] args;
+
+    JournalRecord(byte op, Writable ... writables) {
+      this.op = op;
+      this.args = writables;
+    }
+
+    void write(DataOutputStream out) throws IOException {
+      out.write(op);
+      if(args == null)
+        return;
+      for(Writable w : args)
+        w.write(out);
+    }
+  }
+
+  EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
+                            NamenodeRegistration nnReg) // active name-node
+  throws IOException {
+    super();
+    this.bnRegistration = bnReg;
+    this.nnRegistration = nnReg;
+    InetSocketAddress bnAddress =
+      NetUtils.createSocketAddr(bnRegistration.getAddress());
+    Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
+    try {
+      this.backupNode =
+        (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
+            NamenodeProtocol.versionID, bnAddress, new Configuration());
+    } catch(IOException e) {
+      Storage.LOG.error("Error connecting to: " + bnAddress, e);
+      throw e;
+    }
+    this.bufCurrent = new ArrayList<JournalRecord>();
+    this.bufReady = new ArrayList<JournalRecord>();
+    this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
+  }
+
+  @Override // JournalStream
+  public String getName() {
+    return bnRegistration.getAddress();
+  }
+
+  @Override // JournalStream
+  public JournalType getType() {
+    return JournalType.BACKUP;
+  }
+
+  @Override // EditLogOutputStream
+  public void write(int b) throws IOException {
+    throw new IOException("Not implemented");
+  }
+
+  @Override // EditLogOutputStream
+  void write(byte op, Writable ... writables) throws IOException {
+    bufCurrent.add(new JournalRecord(op, writables));
+  }
+
+  /**
+   * There is no persistent storage. Just clear the buffers.
+   */
+  @Override // EditLogOutputStream
+  void create() throws IOException {
+    bufCurrent.clear();
+    assert bufReady.size() == 0 : "previous data is not flushed yet";
+  }
+
+  @Override // EditLogOutputStream
+  public void close() throws IOException {
+    // close should have been called after all pending transactions 
+    // have been flushed & synced.
+    int size = bufCurrent.size();
+    if (size != 0) {
+      throw new IOException("BackupEditStream has " + size +
+                          " records still to be flushed and cannot be closed.");
+    } 
+    RPC.stopProxy(backupNode); // stop the RPC threads
+    bufCurrent = bufReady = null;
+  }
+
+  @Override // EditLogOutputStream
+  void setReadyToFlush() throws IOException {
+    assert bufReady.size() == 0 : "previous data is not flushed yet";
+    ArrayList<JournalRecord>  tmp = bufReady;
+    bufReady = bufCurrent;
+    bufCurrent = tmp;
+  }
+
+  @Override // EditLogOutputStream
+  protected void flushAndSync() throws IOException {
+    assert out.size() == 0 : "Output buffer is not empty";
+    int bufReadySize = bufReady.size();
+    for(int idx = 0; idx < bufReadySize; idx++) {
+      JournalRecord jRec = null;
+      for(; idx < bufReadySize; idx++) {
+        jRec = bufReady.get(idx);
+        if(jRec.op >= FSEditLog.OP_JSPOOL_START)
+          break;  // special operation should be sent in a separate call to BN
+        jRec.write(out);
+      }
+      if(out.size() > 0)
+        send(NamenodeProtocol.JA_JOURNAL);
+      if(idx == bufReadySize)
+        break;
+      // operation like start journal spool or increment checkpoint time
+      // is a separate call to BN
+      jRec.write(out);
+      send(jRec.op);
+    }
+    bufReady.clear();         // erase all data in the buffer
+    out.reset();              // reset buffer to the start position
+  }
+
+  /**
+   * There is no persistent storage. Therefore length is 0.<p>
+   * Length is used to check when it is large enough to start a checkpoint.
+   * This criteria should not be used for backup streams.
+   */
+  @Override // EditLogOutputStream
+  long length() throws IOException {
+    return 0;
+  }
+
+  private void send(int ja) throws IOException {
+    try {
+      int length = out.getLength();
+      out.write(FSEditLog.OP_INVALID);
+      backupNode.journal(nnRegistration, ja, length, out.getData());
+    } finally {
+      out.reset();
+    }
+  }
+
+  /**
+   * Get backup node registration.
+   */
+  NamenodeRegistration getRegistration() {
+    return bnRegistration;
+  }
+
+  /**
+   * Verify that the backup node is alive.
+   */
+  boolean isAlive() {
+    try {
+      send(NamenodeProtocol.JA_IS_ALIVE);
+    } catch(IOException ei) {
+      Storage.LOG.info(bnRegistration.getRole() + " "
+                      + bnRegistration.getAddress() + " is not alive. ", ei);
+      return false;
+    }
+    return true;
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Sat Mar 14 01:20:36 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -27,14 +29,8 @@
  * It should stream bytes from the storage exactly as they were written
  * into the #{@link EditLogOutputStream}.
  */
-abstract class EditLogInputStream extends InputStream {
-  /**
-   * Get this stream name.
-   * 
-   * @return name of the stream
-   */
-  abstract String getName();
-
+abstract class EditLogInputStream extends InputStream
+implements JournalStream {
   /** {@inheritDoc} */
   public abstract int available() throws IOException;
 
@@ -51,4 +47,11 @@
    * Return the size of the current edits log.
    */
   abstract long length() throws IOException;
+
+  /**
+   * Return DataInputStream based on this edit stream.
+   */
+  DataInputStream getDataInputStream() {
+    return new DataInputStream(new BufferedInputStream(this));
+  }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Sat Mar 14 01:20:36 2009
@@ -26,7 +26,8 @@
  * A generic abstract class to support journaling of edits logs into 
  * a persistent storage.
  */
-abstract class EditLogOutputStream extends OutputStream {
+abstract class EditLogOutputStream extends OutputStream 
+implements JournalStream {
   // these are statistics counters
   private long numSync;        // number of sync(s) to disk
   private long totalTimeSync;  // total time to sync
@@ -35,13 +36,6 @@
     numSync = totalTimeSync = 0;
   }
 
-  /**
-   * Get this stream name.
-   * 
-   * @return name of the stream
-   */
-  abstract String getName();
-
   /** {@inheritDoc} */
   abstract public void write(int b) throws IOException;
 
@@ -57,7 +51,7 @@
   abstract void write(byte op, Writable ... writables) throws IOException;
 
   /**
-   * Create and initialize new edits log storage.
+   * Create and initialize underlying persistent edits log storage.
    * 
    * @throws IOException
    */
@@ -97,6 +91,10 @@
    */
   abstract long length() throws IOException;
 
+  boolean isOperationSupported(byte op) {
+    return true;
+  }
+
   /**
    * Return total time spent in {@link #flushAndSync()}
    */
@@ -110,4 +108,9 @@
   long getNumSync() {
     return numSync;
   }
+
+  @Override // Object
+  public String toString() {
+    return getName();
+  }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sat Mar 14 01:20:36 2009
@@ -46,7 +46,7 @@
 class FSDirectory implements Closeable {
 
   final FSNamesystem namesystem;
-  final INodeDirectoryWithQuota rootDir;
+  INodeDirectoryWithQuota rootDir;
   FSImage fsImage;  
   private boolean ready = false;
   // Metrics record
@@ -93,8 +93,6 @@
       }
       FSEditLog editLog = fsImage.getEditLog();
       assert editLog != null : "editLog must be initialized";
-      if (!editLog.isOpen())
-        editLog.open();
       fsImage.setCheckpointDirectories(null, null);
     } catch(IOException e) {
       fsImage.close();
@@ -594,7 +592,11 @@
     }
     return dirNotEmpty;
   }
-  
+
+  boolean isEmpty() {
+    return isDirEmpty("/");
+  }
+
   /**
    * Delete a path from the name space
    * Update the count at each ancestor directory with quota
@@ -1251,7 +1253,16 @@
     } 
     return status;
   }
-  
+
+  /**
+   * Reset the entire namespace tree.
+   */
+  void reset() {
+    rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
+        namesystem.createFsOwnerPermissions(new FsPermission((short)0755)),
+        Integer.MAX_VALUE, -1);
+  }
+
   /**
    * Create FileStatus by file INode 
    */



Mime
View raw message