Author: shv
Date: Sat Mar 14 01:20:36 2009
New Revision: 753481
URL: http://svn.apache.org/viewvc?rev=753481&view=rev
Log:
HADOOP-4539. Introduce backup node and checkpoint node. Contributed by Konstantin Shvachko.
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NodeRegistration.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
Removed:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredDatanodeException.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/hdfs-default.xml
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/UpgradeUtilities.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
hadoop/core/trunk/src/webapps/hdfs/dfshealth.jsp
hadoop/core/trunk/src/webapps/hdfs/dfsnodelist.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar 14 01:20:36 2009
@@ -58,6 +58,8 @@
HADOOP-4756. A command line tool to access JMX properties on NameNode
and DataNode. (Boris Shkolnik via rangadi)
+ HADOOP-4539. Introduce backup node and checkpoint node. (shv)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
@@ -941,7 +943,7 @@
formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
HADOOP-5142. Fix MapWritable#putAll to store key/value classes.
- (DoÄacan Güney via enis)
+ (Do??acan G??ney via enis)
HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort is invoked on
the connector. The workaround patch retries a few times before failing.
Modified: hadoop/core/trunk/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/hdfs-default.xml?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/hdfs-default.xml (original)
+++ hadoop/core/trunk/src/hdfs/hdfs-default.xml Sat Mar 14 01:20:36 2009
@@ -123,8 +123,24 @@
</description>
</property>
+ <property>
+ <name>dfs.backup.address</name>
+ <value>0.0.0.0:50100</value>
+ <description>
+ The backup node server address and port.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
-
+ <property>
+ <name>dfs.backup.http.address</name>
+ <value>0.0.0.0:50105</value>
+ <description>
+ The backup node http server address and port.
+ If the port is 0 then the server will start on a free port.
+ </description>
+</property>
+
<property>
<name>dfs.replication.considerLoad</name>
<value>true</value>
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+
+/**
+ * This exception is thrown when a node that has not previously
+ * registered is trying to access the name node.
+ */
+public class UnregisteredNodeException extends IOException {
+ private static final long serialVersionUID = -5620209396945970810L;
+
+ public UnregisteredNodeException(NodeRegistration nodeReg) {
+ super("Unregistered server: " + nodeReg.toString());
+ }
+
+ /**
+ * The exception is thrown if a different data-node claims the same
+ * storage id as the existing one.
+ *
+ * @param nodeID unregistered data-node
+ * @param storedNode data-node stored in the system with this storage id
+ */
+ public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) {
+ super("Data node " + nodeID.getName()
+ + " is attempting to report storage ID "
+ + nodeID.getStorageID() + ". Node "
+ + storedNode.getName() + " is expected to serve this storage.");
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Sat Mar 14 01:20:36 2009
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.common;
-
/************************************
* Some handy internal HDFS constants
*
@@ -32,10 +31,12 @@
DATA_NODE;
}
- // Startup options
+ /** Startup options */
static public enum StartupOption{
FORMAT ("-format"),
REGULAR ("-regular"),
+ BACKUP ("-backup"),
+ CHECKPOINT("-checkpoint"),
UPGRADE ("-upgrade"),
ROLLBACK("-rollback"),
FINALIZE("-finalize"),
@@ -44,6 +45,17 @@
private String name = null;
private StartupOption(String arg) {this.name = arg;}
public String getName() {return name;}
+ public NamenodeRole toNodeRole() {
+ switch(this) {
+ case BACKUP:
+ return NamenodeRole.BACKUP;
+ case CHECKPOINT:
+ return NamenodeRole.CHECKPOINT;
+ default:
+ return NamenodeRole.ACTIVE;
+ }
+ }
+
}
// Timeouts for communicating with DataNode for streaming writes/reads
@@ -51,5 +63,21 @@
public static int WRITE_TIMEOUT = 8 * 60 * 1000;
public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
+ /**
+ * Defines the NameNode role.
+ */
+ static public enum NamenodeRole {
+ ACTIVE ("NameNode"),
+ BACKUP ("Backup Node"),
+ CHECKPOINT("Checkpoint Node"),
+ STANDBY ("Standby Node");
+
+ private String description = null;
+ private NamenodeRole(String arg) {this.description = arg;}
+
+ public String toString() {
+ return description;
+ }
+ }
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java Sat Mar 14 01:20:36 2009
@@ -1,15 +1,21 @@
package org.apache.hadoop.hdfs.server.common;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
/**
* Common class for storage information.
*
* TODO namespaceID should be long and computed as hash(address + port)
*/
-public class StorageInfo {
- public int layoutVersion; // Version read from the stored file.
- public int namespaceID; // namespace id of the storage
- public long cTime; // creation timestamp
+public class StorageInfo implements Writable {
+ public int layoutVersion; // layout version of the storage data
+ public int namespaceID; // id of the file system
+ public long cTime; // creation time of the file system state
public StorageInfo () {
this(0, 0, 0L);
@@ -25,8 +31,22 @@
setStorageInfo(from);
}
+ /**
+ * Layout version of the storage data.
+ */
public int getLayoutVersion(){ return layoutVersion; }
+
+ /**
+ * Namespace id of the file system.<p>
+ * Assigned to the file system at formatting and never changes after that.
+ * Shared by all file system components.
+ */
public int getNamespaceID() { return namespaceID; }
+
+ /**
+ * Creation time of the file system state.<p>
+ * Modified during upgrades.
+ */
public long getCTime() { return cTime; }
public void setStorageInfo(StorageInfo from) {
@@ -34,4 +54,19 @@
namespaceID = from.namespaceID;
cTime = from.cTime;
}
-}
\ No newline at end of file
+
+ /////////////////////////////////////////////////
+ // Writable
+ /////////////////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(getLayoutVersion());
+ out.writeInt(getNamespaceID());
+ out.writeLong(getCTime());
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ layoutVersion = in.readInt();
+ namespaceID = in.readInt();
+ cTime = in.readLong();
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Mar 14 01:20:36 2009
@@ -54,7 +54,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -803,7 +803,7 @@
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
- if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
+ if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("DataNode is shutting down: " +
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Daemon;
+
+/**
+ * BackupNode.
+ * <p>
+ * Backup node can play two roles.
+ * <ol>
+ * <li>{@link NamenodeRole#CHECKPOINT} node periodically creates checkpoints,
+ * that is downloads image and edits from the active node, merges them, and
+ * uploads the new image back to the active.</li>
+ * <li>{@link NamenodeRole#BACKUP} node keeps its namespace in sync with the
+ * active node, and periodically creates checkpoints by simply saving the
+ * namespace image to local disk(s).</li>
+ * </ol>
+ */
+class BackupNode extends NameNode {
+ private static final String BN_ADDRESS_NAME_KEY = "dfs.backup.address";
+ private static final String BN_ADDRESS_DEFAULT = "localhost:50100";
+ private static final String BN_HTTP_ADDRESS_NAME_KEY = "dfs.backup.http.address";
+ private static final String BN_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
+
+ /** Name-node proxy */
+ NamenodeProtocol namenode;
+ /** Name-node RPC address */
+ String nnRpcAddress;
+ /** Name-node HTTP address */
+ String nnHttpAddress;
+ /** Checkpoint manager */
+ Checkpointer checkpointManager;
+ /** Checkpoint daemon */
+ private Daemon cpDaemon;
+
+ BackupNode(Configuration conf, NamenodeRole role) throws IOException {
+ super(conf, role);
+ }
+
+ /////////////////////////////////////////////////////
+ // Common NameNode methods implementation for backup node.
+ /////////////////////////////////////////////////////
+ @Override // NameNode
+ protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException {
+ String addr = conf.get(BN_ADDRESS_NAME_KEY, BN_ADDRESS_DEFAULT);
+ int port = NetUtils.createSocketAddr(addr).getPort();
+ String hostName = DNS.getDefaultHost("default");
+ return new InetSocketAddress(hostName, port);
+ }
+
+ @Override // NameNode
+ protected void setRpcServerAddress(Configuration conf) {
+ conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress));
+ }
+
+ @Override // NameNode
+ protected InetSocketAddress getHttpServerAddress(Configuration conf) {
+ assert rpcAddress != null : "rpcAddress should be calculated first";
+ String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT);
+ int port = NetUtils.createSocketAddr(addr).getPort();
+ String hostName = rpcAddress.getHostName();
+ return new InetSocketAddress(hostName, port);
+ }
+
+ @Override // NameNode
+ protected void setHttpServerAddress(Configuration conf){
+ conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(httpAddress));
+ }
+
+ @Override // NameNode
+ protected void loadNamesystem(Configuration conf) throws IOException {
+ BackupStorage bnImage = new BackupStorage();
+ this.namesystem = new FSNamesystem(conf, bnImage);
+ bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
+ FSNamesystem.getNamespaceEditsDirs(conf));
+ }
+
+ @Override // NameNode
+ protected void initialize(Configuration conf) throws IOException {
+ // Trash is disabled in BackupNameNode,
+ // but should be turned back on if it ever becomes active.
+ conf.setLong("fs.trash.interval", 0L);
+ NamespaceInfo nsInfo = handshake(conf);
+ super.initialize(conf);
+ // Backup node should never do lease recovery,
+ // therefore lease hard limit should never expire.
+ namesystem.leaseManager.setLeasePeriod(
+ FSConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);
+ // register with the active name-node
+ registerWith(nsInfo);
+ // Checkpoint daemon should start after the rpc server started
+ runCheckpointDaemon(conf);
+ }
+
+ @Override // NameNode
+ public void stop() {
+ if(checkpointManager != null) checkpointManager.shouldRun = false;
+ if(cpDaemon != null) cpDaemon.interrupt();
+ if(namenode != null && getRegistration() != null) {
+ try {
+ namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
+ "Shutting down.");
+ } catch(IOException e) {
+ LOG.error("Failed to report to name-node.", e);
+ }
+ }
+ RPC.stopProxy(namenode); // stop the RPC threads
+ super.stop();
+ }
+
+ /////////////////////////////////////////////////////
+ // NamenodeProtocol implementation for backup node.
+ /////////////////////////////////////////////////////
+ @Override // NamenodeProtocol
+ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+ throws IOException {
+ throw new UnsupportedActionException("getBlocks");
+ }
+
+ // Only active name-node can register other nodes.
+ @Override // NamenodeProtocol
+ public NamenodeRegistration register(NamenodeRegistration registration
+ ) throws IOException {
+ throw new UnsupportedActionException("journal");
+ }
+
+ @Override // NamenodeProtocol
+ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+ throws IOException {
+ throw new UnsupportedActionException("startCheckpoint");
+ }
+
+ @Override // NamenodeProtocol
+ public void endCheckpoint(NamenodeRegistration registration,
+ CheckpointSignature sig) throws IOException {
+ throw new UnsupportedActionException("endCheckpoint");
+ }
+
+ @Override // NamenodeProtocol
+ public void journal(NamenodeRegistration nnReg,
+ int jAction,
+ int length,
+ byte[] args) throws IOException {
+ verifyRequest(nnReg);
+ if(!nnRpcAddress.equals(nnReg.getAddress()))
+ throw new IOException("Journal request from unexpected name-node: "
+ + nnReg.getAddress() + " expecting " + nnRpcAddress);
+ BackupStorage bnImage = (BackupStorage)getFSImage();
+ switch(jAction) {
+ case (int)JA_IS_ALIVE:
+ return;
+ case (int)JA_JOURNAL:
+ bnImage.journal(length, args);
+ return;
+ case (int)JA_JSPOOL_START:
+ bnImage.startJournalSpool(nnReg);
+ return;
+ case (int)JA_CHECKPOINT_TIME:
+ bnImage.setCheckpointTime(length, args);
+ setRegistration(); // keep registration up to date
+ return;
+ default:
+ throw new IOException("Unexpected journal action: " + jAction);
+ }
+ }
+
+ boolean shouldCheckpointAtStartup() {
+ FSImage fsImage = getFSImage();
+ if(isRole(NamenodeRole.CHECKPOINT)) {
+ assert fsImage.getNumStorageDirs() > 0;
+ return ! fsImage.getStorageDir(0).getVersionFile().exists();
+ }
+ if(namesystem == null || namesystem.dir == null || getFSImage() == null)
+ return true;
+ return fsImage.getEditLog().getNumEditStreams() == 0;
+ }
+
+ private NamespaceInfo handshake(Configuration conf) throws IOException {
+ // connect to name node
+ InetSocketAddress nnAddress = super.getRpcServerAddress(conf);
+ this.namenode =
+ (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
+ NamenodeProtocol.versionID, nnAddress, conf);
+ this.nnRpcAddress = getHostPortString(nnAddress);
+ this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
+ // get version and id info from the name-node
+ NamespaceInfo nsInfo = null;
+ while(!stopRequested) {
+ try {
+ nsInfo = handshake(namenode);
+ break;
+ } catch(SocketTimeoutException e) { // name-node is busy
+ LOG.info("Problem connecting to server: " + nnAddress);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+ return nsInfo;
+ }
+
+ /**
+ * Start a backup node daemon.
+ */
+ private void runCheckpointDaemon(Configuration conf) throws IOException {
+ checkpointManager = new Checkpointer(conf, this);
+ cpDaemon = new Daemon(checkpointManager);
+ cpDaemon.start();
+ }
+
+ /**
+ * Checkpoint.<br>
+ * Tests may use it to initiate a checkpoint process.
+ * @throws IOException
+ */
+ void doCheckpoint() throws IOException {
+ checkpointManager.doCheckpoint();
+ }
+
+ CheckpointStates getCheckpointState() {
+ return getFSImage().getCheckpointState();
+ }
+
+ void setCheckpointState(CheckpointStates cs) {
+ getFSImage().setCheckpointState(cs);
+ }
+
+ /**
+ * Register this backup node with the active name-node.
+ * @param nsInfo
+ * @throws IOException
+ */
+ private void registerWith(NamespaceInfo nsInfo) throws IOException {
+ BackupStorage bnImage = (BackupStorage)getFSImage();
+ // verify namespaceID
+ if(bnImage.getNamespaceID() == 0) // new backup storage
+ bnImage.setStorageInfo(nsInfo);
+ else if(bnImage.getNamespaceID() != nsInfo.getNamespaceID())
+ throw new IOException("Incompatible namespaceIDs"
+ + ": active node namespaceID = " + nsInfo.getNamespaceID()
+ + "; backup node namespaceID = " + bnImage.getNamespaceID());
+
+ setRegistration();
+ NamenodeRegistration nnReg = null;
+ while(!stopRequested) {
+ try {
+ nnReg = namenode.register(getRegistration());
+ break;
+ } catch(SocketTimeoutException e) { // name-node is busy
+ LOG.info("Problem connecting to name-node: " + nnRpcAddress);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+ String msg = null;
+ if(nnReg == null) // consider as a rejection
+ msg = "Registration rejected by " + nnRpcAddress;
+ else if(!nnReg.isRole(NamenodeRole.ACTIVE)) {
+ msg = "Name-node " + nnRpcAddress + " is not active";
+ }
+ if(msg != null) {
+ msg += ". Shutting down.";
+ LOG.error(msg);
+ throw new IOException(msg); // stop the node
+ }
+ nnRpcAddress = nnReg.getAddress();
+ }
+
+ /**
+ * Reset node namespace state in memory and in storage directories.
+ * @throws IOException
+ */
+ void resetNamespace() throws IOException {
+ ((BackupStorage)getFSImage()).reset();
+ }
+
+ /**
+ * Get size of the local journal (edit log).
+ * @return size of the current journal
+ * @throws IOException
+ */
+ long journalSize() throws IOException {
+ return namesystem.getEditLogSize();
+ }
+
+ // TODO: move to a common with DataNode util class
+ private static NamespaceInfo handshake(NamenodeProtocol namenode)
+ throws IOException, SocketTimeoutException {
+ NamespaceInfo nsInfo;
+ nsInfo = namenode.versionRequest(); // throws SocketTimeoutException
+ String errorMsg = null;
+ // verify build version
+ if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion())) {
+ errorMsg = "Incompatible build versions: active name-node BV = "
+ + nsInfo.getBuildVersion() + "; backup node BV = "
+ + Storage.getBuildVersion();
+ LOG.fatal(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+ "Active and backup node layout versions must be the same. Expected: "
+ + FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
+ return nsInfo;
+ }
+}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.io.LongWritable;
+
+public class BackupStorage extends FSImage {
+ // Names of the journal spool directory and the spool file
+ private static final String STORAGE_JSPOOL_DIR = "jspool";
+ private static final String STORAGE_JSPOOL_FILE =
+ NameNodeFile.EDITS_NEW.getName();
+
+ /** Backup input stream for loading edits into memory */
+ private EditLogBackupInputStream backupInputStream;
+ /** Is journal spooling in progress */
+ volatile JSpoolState jsState;
+
+ static enum JSpoolState {
+ OFF,
+ INPROGRESS,
+ WAIT;
+ }
+
+ /**
+ */
+ BackupStorage() {
+ super();
+ jsState = JSpoolState.OFF;
+ }
+
+ @Override
+ public boolean isConversionNeeded(StorageDirectory sd) {
+ return false;
+ }
+
+ /**
+ * Analyze backup storage directories for consistency.<br>
+ * Recover from incomplete checkpoints if required.<br>
+ * Read VERSION and fstime files if exist.<br>
+ * Do not load image or edits.
+ *
+ * @param imageDirs list of image directories.
+ * @param editsDirs list of edits directories.
+ * @throws IOException if the node should shutdown.
+ */
+ void recoverCreateRead(Collection<File> imageDirs,
+ Collection<File> editsDirs) throws IOException {
+ setStorageDirectories(imageDirs, editsDirs);
+ this.checkpointTime = 0L;
+ for(Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ StorageState curState;
+ try {
+ curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
+ // sd is locked but not opened
+ switch(curState) {
+ case NON_EXISTENT:
+ // fail if any of the configured storage dirs are inaccessible
+ throw new InconsistentFSStateException(sd.getRoot(),
+ "checkpoint directory does not exist or is not accessible.");
+ case NOT_FORMATTED:
+ // for backup node all directories may be unformatted initially
+ LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
+ LOG.info("Formatting ...");
+ sd.clearDirectory(); // create empty current
+ break;
+ case NORMAL:
+ break;
+ default: // recovery is possible
+ sd.doRecover(curState);
+ }
+ if(curState != StorageState.NOT_FORMATTED) {
+ sd.read(); // read and verify consistency with other directories
+ }
+ } catch(IOException ioe) {
+ sd.unlock();
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Reset storage directories.
+ * <p>
+ * Unlock the storage.
+ * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
+ * and recreate empty <code>current</code>.
+ * @throws IOException
+ */
+ synchronized void reset() throws IOException {
+ // reset NameSpace tree
+ FSDirectory fsDir = getFSNamesystem().dir;
+ fsDir.reset();
+
+ // unlock, close and rename storage directories
+ unlockAll();
+ // recover from unsuccessful checkpoint if necessary
+ recoverCreateRead(getImageDirectories(), getEditsDirectories());
+ // rename and recreate
+ for(StorageDirectory sd : storageDirs) {
+ File curDir = sd.getCurrentDir();
+ File tmpCkptDir = sd.getLastCheckpointTmp();
+ assert !tmpCkptDir.exists() :
+ tmpCkptDir.getName() + " directory must not exist.";
+ if(!sd.getVersionFile().exists())
+ continue;
+ // rename current to lastcheckpoint.tmp
+ rename(curDir, tmpCkptDir);
+ if(!curDir.mkdir())
+ throw new IOException("Cannot create directory " + curDir);
+ }
+ }
+
+ /**
+ * Load checkpoint from local files only if the memory state is empty.<br>
+ * Set new checkpoint time received from the name-node.<br>
+ * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+ * @throws IOException
+ */
+ void loadCheckpoint(CheckpointSignature sig) throws IOException {
+ // load current image and journal if it is not in memory already
+ if(!editLog.isOpen())
+ editLog.open();
+
+ FSDirectory fsDir = getFSNamesystem().dir;
+ if(fsDir.isEmpty()) {
+ Iterator<StorageDirectory> itImage = dirIterator(NameNodeDirType.IMAGE);
+ Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
+ if(!itImage.hasNext() || ! itEdits.hasNext())
+ throw new IOException("Could not locate checkpoint directories");
+ StorageDirectory sdName = itImage.next();
+ StorageDirectory sdEdits = itEdits.next();
+ synchronized(getFSDirectoryRootLock()) { // load image under rootDir lock
+ loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+ }
+ loadFSEdits(sdEdits);
+ }
+
+ // set storage fields
+ setStorageInfo(sig);
+ checkpointTime = sig.checkpointTime;
+ }
+
+ /**
+ * Save meta-data into fsimage files.
+ * and create empty edits.
+ */
+ void saveCheckpoint() throws IOException {
+ // save image into fsimage.ckpt and purge edits file
+ for (Iterator<StorageDirectory> it =
+ dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
+ if (dirType.isOfType(NameNodeDirType.IMAGE))
+ saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
+ if (dirType.isOfType(NameNodeDirType.EDITS))
+ editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
+ }
+
+ ckptState = CheckpointStates.UPLOAD_DONE;
+ renameCheckpoint();
+ }
+
+ private FSNamesystem getFSNamesystem() {
+ // HADOOP-5119 should get rid of this.
+ return FSNamesystem.getFSNamesystem();
+ }
+
+ private Object getFSDirectoryRootLock() {
+ return getFSNamesystem().dir.rootDir;
+ }
+
+ static File getJSpoolDir(StorageDirectory sd) {
+ return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
+ }
+
+ static File getJSpoolFile(StorageDirectory sd) {
+ return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+ }
+
+ /**
+ * Journal writer journals new meta-data state.
+ * <ol>
+ * <li> If Journal Spool state is OFF then journal records (edits)
+ * are applied directly to meta-data state in memory and are written
+ * to the edits file(s).</li>
+ * <li> If Journal Spool state is INPROGRESS then records are only
+ * written to edits.new file, which is called Spooling.</li>
+ * <li> Journal Spool state WAIT blocks journaling until the
+ * Journal Spool reader finalizes merging of the spooled data and
+ * switches to applying journal to memory.</li>
+ * </ol>
+ * @param length length of data.
+ * @param data serialized journal records.
+ * @throws IOException
+ * @see #convergeJournalSpool()
+ */
+ synchronized void journal(int length, byte[] data) throws IOException {
+ assert backupInputStream.length() == 0 : "backup input stream is not empty";
+ try {
+ switch(jsState) {
+ case WAIT:
+ case OFF:
+ // wait until spooling is off
+ waitSpoolEnd();
+ // update NameSpace in memory
+ backupInputStream.setBytes(data);
+ FSEditLog.loadEditRecords(getLayoutVersion(),
+ backupInputStream.getDataInputStream(), true);
+ getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
+ break;
+ case INPROGRESS:
+ break;
+ }
+ // write to files
+ editLog.logEdit(length, data);
+ editLog.logSync();
+ } finally {
+ backupInputStream.clear();
+ }
+ }
+
+ private synchronized void waitSpoolEnd() {
+ while(jsState == JSpoolState.WAIT) {
+ try {
+ wait();
+ } catch (InterruptedException e) {}
+ }
+ // now spooling should be off, verifying just in case
+ assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+ }
+
+ /**
+ * Start journal spool.
+ * Switch to writing into edits.new instead of edits.
+ *
+ * edits.new for spooling is in separate directory "spool" rather than in
+ * "current" because the two directories should be independent.
+ * While spooling a checkpoint can happen and current will first
+ * move to lastcheckpoint.tmp and then to previous.checkpoint
+ * spool/edits.new will remain in place during that.
+ */
+ synchronized void startJournalSpool(NamenodeRegistration nnReg)
+ throws IOException {
+ switch(jsState) {
+ case OFF:
+ break;
+ case INPROGRESS:
+ return;
+ case WAIT:
+ waitSpoolEnd();
+ }
+
+ // create journal spool directories
+ for(Iterator<StorageDirectory> it =
+ dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ File jsDir = getJSpoolDir(sd);
+ if (!jsDir.exists() && !jsDir.mkdirs()) {
+ throw new IOException("Mkdirs failed to create "
+ + jsDir.getCanonicalPath());
+ }
+ // create edit file if missing
+ File eFile = getEditFile(sd);
+ if(!eFile.exists()) {
+ editLog.createEditLogFile(eFile);
+ }
+ }
+
+ if(!editLog.isOpen())
+ editLog.open();
+
+ // create streams pointing to the journal spool files
+ // subsequent journal records will go directly to the spool
+ editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+ setCheckpointState(CheckpointStates.ROLLED_EDITS);
+
+ // set up spooling
+ if(backupInputStream == null)
+ backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
+ jsState = JSpoolState.INPROGRESS;
+ }
+
+ synchronized void setCheckpointTime(int length, byte[] data)
+ throws IOException {
+ assert backupInputStream.length() == 0 : "backup input stream is not empty";
+ try {
+ // unpack new checkpoint time
+ backupInputStream.setBytes(data);
+ DataInputStream in = backupInputStream.getDataInputStream();
+ byte op = in.readByte();
+ assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
+ LongWritable lw = new LongWritable();
+ lw.readFields(in);
+ setCheckpointTime(lw.get());
+ } finally {
+ backupInputStream.clear();
+ }
+ }
+
+ /**
+ * Merge Journal Spool to memory.<p>
+ * Journal Spool reader reads journal records from edits.new.
+ * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
+ * This blocks journaling (see {@link #journal(int,byte[])}.
+ * The reader
+ * <ul>
+ * <li> reads remaining journal records if any,</li>
+ * <li> renames edits.new to edits,</li>
+ * <li> sets {@link JSpoolState} to OFF,</li>
+ * <li> and notifies the journaling thread.</li>
+ * </ul>
+ * Journaling resumes with applying new journal records to the memory state,
+ * and writing them into edits file(s).
+ */
+ void convergeJournalSpool() throws IOException {
+ Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
+ if(! itEdits.hasNext())
+ throw new IOException("Could not locate checkpoint directories");
+ StorageDirectory sdEdits = itEdits.next();
+ int numEdits = 0;
+ File jSpoolFile = getJSpoolFile(sdEdits);
+ long startTime = FSNamesystem.now();
+ if(jSpoolFile.exists()) {
+ // load edits.new
+ EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
+ DataInputStream in = edits.getDataInputStream();
+ numEdits += FSEditLog.loadFSEdits(in, false);
+
+ // first time reached the end of spool
+ jsState = JSpoolState.WAIT;
+ numEdits += FSEditLog.loadEditRecords(getLayoutVersion(), in, true);
+ getFSNamesystem().dir.updateCountForINodeWithQuota();
+ edits.close();
+ }
+
+ FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
+ + " of size " + jSpoolFile.length() + " edits # " + numEdits
+ + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
+
+ // rename spool edits.new to edits making it in sync with the active node
+ // subsequent journal records will go directly to edits
+ editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+
+ // write version file
+ resetVersion(false);
+
+ // wake up journal writer
+ synchronized(this) {
+ jsState = JSpoolState.OFF;
+ notifyAll();
+ }
+
+ // Rename lastcheckpoint.tmp to previous.checkpoint
+ for(StorageDirectory sd : storageDirs) {
+ File tmpCkptDir = sd.getLastCheckpointTmp();
+ File prevCkptDir = sd.getPreviousCheckpoint();
+ // delete previous directory
+ if (prevCkptDir.exists())
+ deleteDir(prevCkptDir);
+ // rename tmp to previous
+ if (tmpCkptDir.exists())
+ rename(tmpCkptDir, prevCkptDir);
+ }
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Sat Mar 14 01:20:36 2009
@@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.io.WritableComparable;
/**
@@ -33,12 +34,12 @@
long editsTime = -1L;
long checkpointTime = -1L;
- CheckpointSignature() {}
+ public CheckpointSignature() {}
CheckpointSignature(FSImage fsImage) {
super(fsImage);
editsTime = fsImage.getEditLog().getFsEditTime();
- checkpointTime = fsImage.checkpointTime;
+ checkpointTime = fsImage.getCheckpointTime();
}
CheckpointSignature(String str) {
@@ -59,14 +60,17 @@
+ String.valueOf(checkpointTime);
}
- void validateStorageInfo(StorageInfo si) throws IOException {
+ void validateStorageInfo(FSImage si) throws IOException {
if(layoutVersion != si.layoutVersion
- || namespaceID != si.namespaceID || cTime != si.cTime) {
+ || namespaceID != si.namespaceID || cTime != si.cTime
+ || checkpointTime != si.checkpointTime) {
// checkpointTime can change when the image is saved - do not compare
- throw new IOException("Inconsistent checkpoint fileds. "
+ throw new IOException("Inconsistent checkpoint fields.\n"
+ "LV = " + layoutVersion + " namespaceID = " + namespaceID
- + " cTime = " + cTime + ". Expecting respectively: "
- + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime);
+ + " cTime = " + cTime + "; checkpointTime = " + checkpointTime
+ + ".\nExpecting respectively: "
+ + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime
+ + "; " + si.checkpointTime);
}
}
@@ -100,17 +104,13 @@
// Writable
/////////////////////////////////////////////////
public void write(DataOutput out) throws IOException {
- out.writeInt(getLayoutVersion());
- out.writeInt(getNamespaceID());
- out.writeLong(getCTime());
+ super.write(out);
out.writeLong(editsTime);
out.writeLong(checkpointTime);
}
public void readFields(DataInput in) throws IOException {
- layoutVersion = in.readInt();
- namespaceID = in.readInt();
- cTime = in.readLong();
+ super.readFields(in);
editsTime = in.readLong();
checkpointTime = in.readLong();
}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+
+/**
+ * The Checkpointer is responsible for supporting periodic checkpoints
+ * of the HDFS metadata.
+ *
+ * The Checkpointer is a daemon that periodically wakes up
+ * up (determined by the schedule specified in the configuration),
+ * triggers a periodic checkpoint and then goes back to sleep.
+ *
+ * The start of a checkpoint is triggered by one of the two factors:
+ * (1) time or (2) the size of the edits file.
+ */
+class Checkpointer implements Runnable {
+ public static final Log LOG =
+ LogFactory.getLog(Checkpointer.class.getName());
+
+ private BackupNode backupNode;
+ volatile boolean shouldRun;
+ private long checkpointPeriod; // in seconds
+ private long checkpointSize; // size (in MB) of current Edit Log
+
+ private BackupStorage getFSImage() {
+ return (BackupStorage)backupNode.getFSImage();
+ }
+
+ private NamenodeProtocol getNamenode(){
+ return backupNode.namenode;
+ }
+
+ /**
+ * Create a connection to the primary namenode.
+ */
+ Checkpointer(Configuration conf, BackupNode bnNode) throws IOException {
+ this.backupNode = bnNode;
+ try {
+ initialize(conf);
+ } catch(IOException e) {
+ shutdown();
+ throw e;
+ }
+ }
+
+ /**
+ * Initialize checkpoint.
+ */
+ private void initialize(Configuration conf) throws IOException {
+ // Create connection to the namenode.
+ shouldRun = true;
+
+ // Initialize other scheduling parameters from the configuration
+ checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
+ checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
+
+ HttpServer httpServer = backupNode.httpServer;
+ httpServer.setAttribute("name.system.image", getFSImage());
+ httpServer.setAttribute("name.conf", conf);
+ httpServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+
+ LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
+ "(" + checkpointPeriod/60 + " min)");
+ LOG.info("Log Size Trigger : " + checkpointSize + " bytes " +
+ "(" + checkpointSize/1024 + " KB)");
+ }
+
+ /**
+ * Shut down the checkpointer.
+ */
+ void shutdown() {
+ shouldRun = false;
+ backupNode.stop();
+ }
+
+ //
+ // The main work loop
+ //
+ public void run() {
+ // Check the size of the edit log once every 5 minutes.
+ long periodMSec = 5 * 60; // 5 minutes
+ if(checkpointPeriod < periodMSec) {
+ periodMSec = checkpointPeriod;
+ }
+ periodMSec *= 1000;
+
+ long lastCheckpointTime = 0;
+ if(!backupNode.shouldCheckpointAtStartup())
+ lastCheckpointTime = FSNamesystem.now();
+ while(shouldRun) {
+ try {
+ long now = FSNamesystem.now();
+ boolean shouldCheckpoint = false;
+ if(now >= lastCheckpointTime + periodMSec) {
+ shouldCheckpoint = true;
+ } else {
+ long size = getJournalSize();
+ if(size >= checkpointSize)
+ shouldCheckpoint = true;
+ }
+ if(shouldCheckpoint) {
+ doCheckpoint();
+ lastCheckpointTime = now;
+ }
+ } catch(IOException e) {
+ LOG.error("Exception in doCheckpoint: ", e);
+ } catch(Throwable e) {
+ LOG.error("Throwable Exception in doCheckpoint: ", e);
+ Runtime.getRuntime().exit(-1);
+ }
+ try {
+ Thread.sleep(periodMSec);
+ } catch(InterruptedException ie) {
+ // do nothing
+ }
+ }
+ }
+
+ private long getJournalSize() throws IOException {
+ // If BACKUP node has been loaded
+ // get edits size from the local file. ACTIVE has the same.
+ if(backupNode.isRole(NamenodeRole.BACKUP)
+ && getFSImage().getEditLog().isOpen())
+ return backupNode.journalSize();
+ // Go to the ACTIVE node for its size
+ return getNamenode().journalSize(backupNode.getRegistration());
+ }
+
+ /**
+ * Download <code>fsimage</code> and <code>edits</code>
+ * files from the remote name-node.
+ */
+ private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
+ // Retrieve image file
+ String fileid = "getimage=1";
+ File[] files = getFSImage().getImageFiles();
+ assert files.length > 0 : "No checkpoint targets.";
+ String nnHttpAddr = backupNode.nnHttpAddress;
+ TransferFsImage.getFileClient(nnHttpAddr, fileid, files);
+ LOG.info("Downloaded file " + files[0].getName() + " size " +
+ files[0].length() + " bytes.");
+
+ // Retrieve edits file
+ fileid = "getedit=1";
+ files = getFSImage().getEditsFiles();
+ assert files.length > 0 : "No checkpoint targets.";
+ TransferFsImage.getFileClient(nnHttpAddr, fileid, files);
+ LOG.info("Downloaded file " + files[0].getName() + " size " +
+ files[0].length() + " bytes.");
+ }
+
+ /**
+ * Copy the new image into remote name-node.
+ */
+ private void uploadCheckpoint(CheckpointSignature sig) throws IOException {
+ InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
+ int httpPort = httpSocAddr.getPort();
+ String fileid = "putimage=1&port=" + httpPort +
+ "&machine=" +
+ InetAddress.getLocalHost().getHostAddress() +
+ "&token=" + sig.toString();
+ LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
+ TransferFsImage.getFileClient(backupNode.nnHttpAddress, fileid, (File[])null);
+ }
+
+ /**
+ * Create a new checkpoint
+ */
+ void doCheckpoint() throws IOException {
+ long startTime = FSNamesystem.now();
+ NamenodeCommand cmd =
+ getNamenode().startCheckpoint(backupNode.getRegistration());
+ CheckpointCommand cpCmd = null;
+ switch(cmd.getAction()) {
+ case NamenodeProtocol.ACT_SHUTDOWN:
+ shutdown();
+ throw new IOException("Name-node " + backupNode.nnRpcAddress
+ + " requested shutdown.");
+ case NamenodeProtocol.ACT_CHECKPOINT:
+ cpCmd = (CheckpointCommand)cmd;
+ break;
+ default:
+ throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
+ }
+
+ CheckpointSignature sig = cpCmd.getSignature();
+ assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :
+ "Signature should have current layout version. Expected: "
+ + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();
+ assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||
+ cpCmd.isImageObsolete() : "checkpoint node should always download image.";
+ backupNode.setCheckpointState(CheckpointStates.UPLOAD_START);
+ if(cpCmd.isImageObsolete()) {
+ // First reset storage on disk and memory state
+ backupNode.resetNamespace();
+ downloadCheckpoint(sig);
+ }
+
+ BackupStorage bnImage = getFSImage();
+ bnImage.loadCheckpoint(sig);
+ sig.validateStorageInfo(bnImage);
+ bnImage.saveCheckpoint();
+
+ if(cpCmd.needToReturnImage())
+ uploadCheckpoint(sig);
+
+ getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
+
+ bnImage.convergeJournalSpool();
+ backupNode.setRegistration(); // keep registration up to date
+ if(backupNode.isRole(NamenodeRole.CHECKPOINT))
+ getFSImage().getEditLog().close();
+ LOG.info("Checkpoint completed in "
+ + (FSNamesystem.now() - startTime)/1000 + " seconds."
+ + " New Image Size: " + bnImage.getFsImageName().length());
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Mar 14 01:20:36 2009
@@ -367,9 +367,9 @@
void reportDiff(BlocksMap blocksMap,
BlockListAsLongs newReport,
- Collection<Block> toAdd,
- Collection<Block> toRemove,
- Collection<Block> toInvalidate) {
+ Collection<Block> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toRemove, // remove from DatanodeDescriptor
+ Collection<Block> toInvalidate) { // should be removed from DN
// place a deilimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Sat Mar 14 01:20:36 2009
@@ -64,7 +64,7 @@
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
- LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
+ LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie);
}
}
}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * An implementation of the abstract class {@link EditLogInputStream},
+ * which is used to updates HDFS meta-data state on a backup node.
+ *
+ * @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal
+ * (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration,
+ * int, int, byte[])
+ */
+class EditLogBackupInputStream extends EditLogInputStream {
+ String address; // sender address
+ private ByteBufferInputStream inner;
+ private DataInputStream in;
+
+ /**
+ * A ByteArrayInputStream, which lets modify the underlying byte array.
+ */
+ private static class ByteBufferInputStream extends ByteArrayInputStream {
+ ByteBufferInputStream() {
+ super(new byte[0]);
+ }
+
+ byte[] getData() {
+ return super.buf;
+ }
+
+ void setData(byte[] newBytes) {
+ super.buf = newBytes;
+ super.count = newBytes == null ? 0 : newBytes.length;
+ super.mark = 0;
+ reset();
+ }
+
+ /**
+ * Number of bytes read from the stream so far.
+ */
+ int length() {
+ return count;
+ }
+ }
+
+ EditLogBackupInputStream(String name) throws IOException {
+ address = name;
+ inner = new ByteBufferInputStream();
+ in = new DataInputStream(inner);
+ }
+
+ @Override // JournalStream
+ public String getName() {
+ return address;
+ }
+
+ @Override // JournalStream
+ public JournalType getType() {
+ return JournalType.BACKUP;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ long length() throws IOException {
+ // file size + size of both buffers
+ return inner.length();
+ }
+
+ DataInputStream getDataInputStream() {
+ return in;
+ }
+
+ void setBytes(byte[] newBytes) throws IOException {
+ inner.setData(newBytes);
+ in.reset();
+ }
+
+ void clear() throws IOException {
+ setBytes(null);
+ }
+}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=753481&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Mar 14 01:20:36 2009
@@ -0,0 +1,190 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * An implementation of the abstract class {@link EditLogOutputStream},
+ * which streams edits to a backup node.
+ *
+ * @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal
+ * (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration,
+ * int, int, byte[])
+ */
+class EditLogBackupOutputStream extends EditLogOutputStream {
+ static int DEFAULT_BUFFER_SIZE = 256;
+
+ private NamenodeProtocol backupNode; // RPC proxy to backup node
+ private NamenodeRegistration bnRegistration; // backup node registration
+ private NamenodeRegistration nnRegistration; // active node registration
+ private ArrayList<JournalRecord> bufCurrent; // current buffer for writing
+ private ArrayList<JournalRecord> bufReady; // buffer ready for flushing
+ private DataOutputBuffer out; // serialized output sent to backup node
+
+ static class JournalRecord {
+ byte op;
+ Writable[] args;
+
+ JournalRecord(byte op, Writable ... writables) {
+ this.op = op;
+ this.args = writables;
+ }
+
+ void write(DataOutputStream out) throws IOException {
+ out.write(op);
+ if(args == null)
+ return;
+ for(Writable w : args)
+ w.write(out);
+ }
+ }
+
+ EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
+ NamenodeRegistration nnReg) // active name-node
+ throws IOException {
+ super();
+ this.bnRegistration = bnReg;
+ this.nnRegistration = nnReg;
+ InetSocketAddress bnAddress =
+ NetUtils.createSocketAddr(bnRegistration.getAddress());
+ Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
+ try {
+ this.backupNode =
+ (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
+ NamenodeProtocol.versionID, bnAddress, new Configuration());
+ } catch(IOException e) {
+ Storage.LOG.error("Error connecting to: " + bnAddress, e);
+ throw e;
+ }
+ this.bufCurrent = new ArrayList<JournalRecord>();
+ this.bufReady = new ArrayList<JournalRecord>();
+ this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
+ }
+
+ @Override // JournalStream
+ public String getName() {
+ return bnRegistration.getAddress();
+ }
+
+ @Override // JournalStream
+ public JournalType getType() {
+ return JournalType.BACKUP;
+ }
+
+ @Override // EditLogOutputStream
+ public void write(int b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override // EditLogOutputStream
+ void write(byte op, Writable ... writables) throws IOException {
+ bufCurrent.add(new JournalRecord(op, writables));
+ }
+
+ /**
+ * There is no persistent storage. Just clear the buffers.
+ */
+ @Override // EditLogOutputStream
+ void create() throws IOException {
+ bufCurrent.clear();
+ assert bufReady.size() == 0 : "previous data is not flushed yet";
+ }
+
+ @Override // EditLogOutputStream
+ public void close() throws IOException {
+ // close should have been called after all pending transactions
+ // have been flushed & synced.
+ int size = bufCurrent.size();
+ if (size != 0) {
+ throw new IOException("BackupEditStream has " + size +
+ " records still to be flushed and cannot be closed.");
+ }
+ RPC.stopProxy(backupNode); // stop the RPC threads
+ bufCurrent = bufReady = null;
+ }
+
+ @Override // EditLogOutputStream
+ void setReadyToFlush() throws IOException {
+ assert bufReady.size() == 0 : "previous data is not flushed yet";
+ ArrayList<JournalRecord> tmp = bufReady;
+ bufReady = bufCurrent;
+ bufCurrent = tmp;
+ }
+
+ @Override // EditLogOutputStream
+ protected void flushAndSync() throws IOException {
+ assert out.size() == 0 : "Output buffer is not empty";
+ int bufReadySize = bufReady.size();
+ for(int idx = 0; idx < bufReadySize; idx++) {
+ JournalRecord jRec = null;
+ for(; idx < bufReadySize; idx++) {
+ jRec = bufReady.get(idx);
+ if(jRec.op >= FSEditLog.OP_JSPOOL_START)
+ break; // special operation should be sent in a separate call to BN
+ jRec.write(out);
+ }
+ if(out.size() > 0)
+ send(NamenodeProtocol.JA_JOURNAL);
+ if(idx == bufReadySize)
+ break;
+ // operation like start journal spool or increment checkpoint time
+ // is a separate call to BN
+ jRec.write(out);
+ send(jRec.op);
+ }
+ bufReady.clear(); // erase all data in the buffer
+ out.reset(); // reset buffer to the start position
+ }
+
+ /**
+ * There is no persistent storage. Therefore length is 0.<p>
+ * Length is used to check when it is large enough to start a checkpoint.
+ * This criteria should not be used for backup streams.
+ */
+ @Override // EditLogOutputStream
+ long length() throws IOException {
+ return 0;
+ }
+
+ private void send(int ja) throws IOException {
+ try {
+ int length = out.getLength();
+ out.write(FSEditLog.OP_INVALID);
+ backupNode.journal(nnRegistration, ja, length, out.getData());
+ } finally {
+ out.reset();
+ }
+ }
+
+ /**
+ * Get backup node registration.
+ */
+ NamenodeRegistration getRegistration() {
+ return bnRegistration;
+ }
+
+ /**
+ * Verify that the backup node is alive.
+ */
+ boolean isAlive() {
+ try {
+ send(NamenodeProtocol.JA_IS_ALIVE);
+ } catch(IOException ei) {
+ Storage.LOG.info(bnRegistration.getRole() + " "
+ + bnRegistration.getAddress() + " is not alive. ", ei);
+ return false;
+ }
+ return true;
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Sat Mar 14 01:20:36 2009
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -27,14 +29,8 @@
* It should stream bytes from the storage exactly as they were written
* into the #{@link EditLogOutputStream}.
*/
-abstract class EditLogInputStream extends InputStream {
- /**
- * Get this stream name.
- *
- * @return name of the stream
- */
- abstract String getName();
-
+abstract class EditLogInputStream extends InputStream
+implements JournalStream {
/** {@inheritDoc} */
public abstract int available() throws IOException;
@@ -51,4 +47,11 @@
* Return the size of the current edits log.
*/
abstract long length() throws IOException;
+
+ /**
+ * Return DataInputStream based on this edit stream.
+ */
+ DataInputStream getDataInputStream() {
+ return new DataInputStream(new BufferedInputStream(this));
+ }
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Sat Mar 14 01:20:36 2009
@@ -26,7 +26,8 @@
* A generic abstract class to support journaling of edits logs into
* a persistent storage.
*/
-abstract class EditLogOutputStream extends OutputStream {
+abstract class EditLogOutputStream extends OutputStream
+implements JournalStream {
// these are statistics counters
private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync
@@ -35,13 +36,6 @@
numSync = totalTimeSync = 0;
}
- /**
- * Get this stream name.
- *
- * @return name of the stream
- */
- abstract String getName();
-
/** {@inheritDoc} */
abstract public void write(int b) throws IOException;
@@ -57,7 +51,7 @@
abstract void write(byte op, Writable ... writables) throws IOException;
/**
- * Create and initialize new edits log storage.
+ * Create and initialize underlying persistent edits log storage.
*
* @throws IOException
*/
@@ -97,6 +91,10 @@
*/
abstract long length() throws IOException;
+ boolean isOperationSupported(byte op) {
+ return true;
+ }
+
/**
* Return total time spent in {@link #flushAndSync()}
*/
@@ -110,4 +108,9 @@
long getNumSync() {
return numSync;
}
+
+ @Override // Object
+ public String toString() {
+ return getName();
+ }
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=753481&r1=753480&r2=753481&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sat Mar 14 01:20:36 2009
@@ -46,7 +46,7 @@
class FSDirectory implements Closeable {
final FSNamesystem namesystem;
- final INodeDirectoryWithQuota rootDir;
+ INodeDirectoryWithQuota rootDir;
FSImage fsImage;
private boolean ready = false;
// Metrics record
@@ -93,8 +93,6 @@
}
FSEditLog editLog = fsImage.getEditLog();
assert editLog != null : "editLog must be initialized";
- if (!editLog.isOpen())
- editLog.open();
fsImage.setCheckpointDirectories(null, null);
} catch(IOException e) {
fsImage.close();
@@ -594,7 +592,11 @@
}
return dirNotEmpty;
}
-
+
+ boolean isEmpty() {
+ return isDirEmpty("/");
+ }
+
/**
* Delete a path from the name space
* Update the count at each ancestor directory with quota
@@ -1251,7 +1253,16 @@
}
return status;
}
-
+
+ /**
+ * Reset the entire namespace tree.
+ */
+ void reset() {
+ rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
+ namesystem.createFsOwnerPermissions(new FsPermission((short)0755)),
+ Integer.MAX_VALUE, -1);
+ }
+
/**
* Create FileStatus by file INode
*/
|