Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 83071 invoked from network); 14 Mar 2009 01:21:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Mar 2009 01:21:16 -0000 Received: (qmail 67833 invoked by uid 500); 14 Mar 2009 01:21:14 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 67776 invoked by uid 500); 14 Mar 2009 01:21:14 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 67739 invoked by uid 99); 14 Mar 2009 01:21:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 18:21:13 -0700 X-ASF-Spam-Status: No, hits=-1998.9 required=10.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Mar 2009 01:21:01 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DFCF32388A50; Sat, 14 Mar 2009 01:20:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r753481 [1/3] - in /hadoop/core/trunk: ./ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/common/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ s... Date: Sat, 14 Mar 2009 01:20:37 -0000 To: core-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090314012039.DFCF32388A50@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Sat Mar 14 01:20:36 2009 New Revision: 753481 URL: http://svn.apache.org/viewvc?rev=753481&view=rev Log: HADOOP-4539. Introduce backup node and checkpoint node. Contributed by Konstantin Shvachko. Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JournalStream.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeCommand.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NodeRegistration.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java Removed: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredDatanodeException.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/hdfs/hdfs-default.xml hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/UpgradeUtilities.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java hadoop/core/trunk/src/webapps/hdfs/dfshealth.jsp hadoop/core/trunk/src/webapps/hdfs/dfsnodelist.jsp Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Sat Mar 14 01:20:36 2009 @@ -58,6 +58,8 @@ HADOOP-4756. A command line tool to access JMX properties on NameNode and DataNode. (Boris Shkolnik via rangadi) + HADOOP-4539. Introduce backup node and checkpoint node. (shv) + IMPROVEMENTS HADOOP-4565. Added CombineFileInputFormat to use data locality information @@ -941,7 +943,7 @@ formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas) HADOOP-5142. Fix MapWritable#putAll to store key/value classes. - (Doğacan Güney via enis) + (Do??acan G??ney via enis) HADOOP-4744. Workaround for jetty6 returning -1 when getLocalPort is invoked on the connector. The workaround patch retries a few times before failing. Modified: hadoop/core/trunk/src/hdfs/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/hdfs-default.xml?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/hdfs-default.xml (original) +++ hadoop/core/trunk/src/hdfs/hdfs-default.xml Sat Mar 14 01:20:36 2009 @@ -123,8 +123,24 @@ + + dfs.backup.address + 0.0.0.0:50100 + + The backup node server address and port. + If the port is 0 then the server will start on a free port. + + - + + dfs.backup.http.address + 0.0.0.0:50105 + + The backup node http server address and port. + If the port is 0 then the server will start on a free port. + + + dfs.replication.considerLoad true Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java?rev=753481&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java Sat Mar 14 01:20:36 2009 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; + +/** + * This exception is thrown when a node that has not previously + * registered is trying to access the name node. + */ +public class UnregisteredNodeException extends IOException { + private static final long serialVersionUID = -5620209396945970810L; + + public UnregisteredNodeException(NodeRegistration nodeReg) { + super("Unregistered server: " + nodeReg.toString()); + } + + /** + * The exception is thrown if a different data-node claims the same + * storage id as the existing one. + * + * @param nodeID unregistered data-node + * @param storedNode data-node stored in the system with this storage id + */ + public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) { + super("Data node " + nodeID.getName() + + " is attempting to report storage ID " + + nodeID.getStorageID() + ". Node " + + storedNode.getName() + " is expected to serve this storage."); + } +} Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Sat Mar 14 01:20:36 2009 @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.common; - /************************************ * Some handy internal HDFS constants * @@ -32,10 +31,12 @@ DATA_NODE; } - // Startup options + /** Startup options */ static public enum StartupOption{ FORMAT ("-format"), REGULAR ("-regular"), + BACKUP ("-backup"), + CHECKPOINT("-checkpoint"), UPGRADE ("-upgrade"), ROLLBACK("-rollback"), FINALIZE("-finalize"), @@ -44,6 +45,17 @@ private String name = null; private StartupOption(String arg) {this.name = arg;} public String getName() {return name;} + public NamenodeRole toNodeRole() { + switch(this) { + case BACKUP: + return NamenodeRole.BACKUP; + case CHECKPOINT: + return NamenodeRole.CHECKPOINT; + default: + return NamenodeRole.ACTIVE; + } + } + } // Timeouts for communicating with DataNode for streaming writes/reads @@ -51,5 +63,21 @@ public static int WRITE_TIMEOUT = 8 * 60 * 1000; public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline + /** + * Defines the NameNode role. + */ + static public enum NamenodeRole { + ACTIVE ("NameNode"), + BACKUP ("Backup Node"), + CHECKPOINT("Checkpoint Node"), + STANDBY ("Standby Node"); + + private String description = null; + private NamenodeRole(String arg) {this.description = arg;} + + public String toString() { + return description; + } + } } Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java Sat Mar 14 01:20:36 2009 @@ -1,15 +1,21 @@ package org.apache.hadoop.hdfs.server.common; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + /** * Common class for storage information. * * TODO namespaceID should be long and computed as hash(address + port) */ -public class StorageInfo { - public int layoutVersion; // Version read from the stored file. - public int namespaceID; // namespace id of the storage - public long cTime; // creation timestamp +public class StorageInfo implements Writable { + public int layoutVersion; // layout version of the storage data + public int namespaceID; // id of the file system + public long cTime; // creation time of the file system state public StorageInfo () { this(0, 0, 0L); @@ -25,8 +31,22 @@ setStorageInfo(from); } + /** + * Layout version of the storage data. + */ public int getLayoutVersion(){ return layoutVersion; } + + /** + * Namespace id of the file system.

+ * Assigned to the file system at formatting and never changes after that. + * Shared by all file system components. + */ public int getNamespaceID() { return namespaceID; } + + /** + * Creation time of the file system state.

+ * Modified during upgrades. + */ public long getCTime() { return cTime; } public void setStorageInfo(StorageInfo from) { @@ -34,4 +54,19 @@ namespaceID = from.namespaceID; cTime = from.cTime; } -} \ No newline at end of file + + ///////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////// + public void write(DataOutput out) throws IOException { + out.writeInt(getLayoutVersion()); + out.writeInt(getNamespaceID()); + out.writeLong(getCTime()); + } + + public void readFields(DataInput in) throws IOException { + layoutVersion = in.readInt(); + namespaceID = in.readInt(); + cTime = in.readLong(); + } +} Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Mar 14 01:20:36 2009 @@ -54,7 +54,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.GenerationStamp; @@ -803,7 +803,7 @@ } // synchronized } catch(RemoteException re) { String reClass = re.getClassName(); - if (UnregisteredDatanodeException.class.getName().equals(reClass) || + if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) { LOG.warn("DataNode is shutting down: " + Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=753481&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sat Mar 14 01:20:36 2009 @@ -0,0 +1,342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.Daemon; + +/** + * BackupNode. + *

+ * Backup node can play two roles. + *

    + *
  1. {@link NamenodeRole#CHECKPOINT} node periodically creates checkpoints, + * that is downloads image and edits from the active node, merges them, and + * uploads the new image back to the active.
  2. + *
  3. {@link NamenodeRole#BACKUP} node keeps its namespace in sync with the + * active node, and periodically creates checkpoints by simply saving the + * namespace image to local disk(s).
  4. + *
+ */ +class BackupNode extends NameNode { + private static final String BN_ADDRESS_NAME_KEY = "dfs.backup.address"; + private static final String BN_ADDRESS_DEFAULT = "localhost:50100"; + private static final String BN_HTTP_ADDRESS_NAME_KEY = "dfs.backup.http.address"; + private static final String BN_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105"; + + /** Name-node proxy */ + NamenodeProtocol namenode; + /** Name-node RPC address */ + String nnRpcAddress; + /** Name-node HTTP address */ + String nnHttpAddress; + /** Checkpoint manager */ + Checkpointer checkpointManager; + /** Checkpoint daemon */ + private Daemon cpDaemon; + + BackupNode(Configuration conf, NamenodeRole role) throws IOException { + super(conf, role); + } + + ///////////////////////////////////////////////////// + // Common NameNode methods implementation for backup node. + ///////////////////////////////////////////////////// + @Override // NameNode + protected InetSocketAddress getRpcServerAddress(Configuration conf) throws IOException { + String addr = conf.get(BN_ADDRESS_NAME_KEY, BN_ADDRESS_DEFAULT); + int port = NetUtils.createSocketAddr(addr).getPort(); + String hostName = DNS.getDefaultHost("default"); + return new InetSocketAddress(hostName, port); + } + + @Override // NameNode + protected void setRpcServerAddress(Configuration conf) { + conf.set(BN_ADDRESS_NAME_KEY, getHostPortString(rpcAddress)); + } + + @Override // NameNode + protected InetSocketAddress getHttpServerAddress(Configuration conf) { + assert rpcAddress != null : "rpcAddress should be calculated first"; + String addr = conf.get(BN_HTTP_ADDRESS_NAME_KEY, BN_HTTP_ADDRESS_DEFAULT); + int port = NetUtils.createSocketAddr(addr).getPort(); + String hostName = rpcAddress.getHostName(); + return new InetSocketAddress(hostName, port); + } + + @Override // NameNode + protected void setHttpServerAddress(Configuration conf){ + conf.set(BN_HTTP_ADDRESS_NAME_KEY, getHostPortString(httpAddress)); + } + + @Override // NameNode + protected void loadNamesystem(Configuration conf) throws IOException { + BackupStorage bnImage = new BackupStorage(); + this.namesystem = new FSNamesystem(conf, bnImage); + bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf), + FSNamesystem.getNamespaceEditsDirs(conf)); + } + + @Override // NameNode + protected void initialize(Configuration conf) throws IOException { + // Trash is disabled in BackupNameNode, + // but should be turned back on if it ever becomes active. + conf.setLong("fs.trash.interval", 0L); + NamespaceInfo nsInfo = handshake(conf); + super.initialize(conf); + // Backup node should never do lease recovery, + // therefore lease hard limit should never expire. + namesystem.leaseManager.setLeasePeriod( + FSConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE); + // register with the active name-node + registerWith(nsInfo); + // Checkpoint daemon should start after the rpc server started + runCheckpointDaemon(conf); + } + + @Override // NameNode + public void stop() { + if(checkpointManager != null) checkpointManager.shouldRun = false; + if(cpDaemon != null) cpDaemon.interrupt(); + if(namenode != null && getRegistration() != null) { + try { + namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL, + "Shutting down."); + } catch(IOException e) { + LOG.error("Failed to report to name-node.", e); + } + } + RPC.stopProxy(namenode); // stop the RPC threads + super.stop(); + } + + ///////////////////////////////////////////////////// + // NamenodeProtocol implementation for backup node. + ///////////////////////////////////////////////////// + @Override // NamenodeProtocol + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + throw new UnsupportedActionException("getBlocks"); + } + + // Only active name-node can register other nodes. + @Override // NamenodeProtocol + public NamenodeRegistration register(NamenodeRegistration registration + ) throws IOException { + throw new UnsupportedActionException("journal"); + } + + @Override // NamenodeProtocol + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + throw new UnsupportedActionException("startCheckpoint"); + } + + @Override // NamenodeProtocol + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + throw new UnsupportedActionException("endCheckpoint"); + } + + @Override // NamenodeProtocol + public void journal(NamenodeRegistration nnReg, + int jAction, + int length, + byte[] args) throws IOException { + verifyRequest(nnReg); + if(!nnRpcAddress.equals(nnReg.getAddress())) + throw new IOException("Journal request from unexpected name-node: " + + nnReg.getAddress() + " expecting " + nnRpcAddress); + BackupStorage bnImage = (BackupStorage)getFSImage(); + switch(jAction) { + case (int)JA_IS_ALIVE: + return; + case (int)JA_JOURNAL: + bnImage.journal(length, args); + return; + case (int)JA_JSPOOL_START: + bnImage.startJournalSpool(nnReg); + return; + case (int)JA_CHECKPOINT_TIME: + bnImage.setCheckpointTime(length, args); + setRegistration(); // keep registration up to date + return; + default: + throw new IOException("Unexpected journal action: " + jAction); + } + } + + boolean shouldCheckpointAtStartup() { + FSImage fsImage = getFSImage(); + if(isRole(NamenodeRole.CHECKPOINT)) { + assert fsImage.getNumStorageDirs() > 0; + return ! fsImage.getStorageDir(0).getVersionFile().exists(); + } + if(namesystem == null || namesystem.dir == null || getFSImage() == null) + return true; + return fsImage.getEditLog().getNumEditStreams() == 0; + } + + private NamespaceInfo handshake(Configuration conf) throws IOException { + // connect to name node + InetSocketAddress nnAddress = super.getRpcServerAddress(conf); + this.namenode = + (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class, + NamenodeProtocol.versionID, nnAddress, conf); + this.nnRpcAddress = getHostPortString(nnAddress); + this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf)); + // get version and id info from the name-node + NamespaceInfo nsInfo = null; + while(!stopRequested) { + try { + nsInfo = handshake(namenode); + break; + } catch(SocketTimeoutException e) { // name-node is busy + LOG.info("Problem connecting to server: " + nnAddress); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + } + } + return nsInfo; + } + + /** + * Start a backup node daemon. + */ + private void runCheckpointDaemon(Configuration conf) throws IOException { + checkpointManager = new Checkpointer(conf, this); + cpDaemon = new Daemon(checkpointManager); + cpDaemon.start(); + } + + /** + * Checkpoint.
+ * Tests may use it to initiate a checkpoint process. + * @throws IOException + */ + void doCheckpoint() throws IOException { + checkpointManager.doCheckpoint(); + } + + CheckpointStates getCheckpointState() { + return getFSImage().getCheckpointState(); + } + + void setCheckpointState(CheckpointStates cs) { + getFSImage().setCheckpointState(cs); + } + + /** + * Register this backup node with the active name-node. + * @param nsInfo + * @throws IOException + */ + private void registerWith(NamespaceInfo nsInfo) throws IOException { + BackupStorage bnImage = (BackupStorage)getFSImage(); + // verify namespaceID + if(bnImage.getNamespaceID() == 0) // new backup storage + bnImage.setStorageInfo(nsInfo); + else if(bnImage.getNamespaceID() != nsInfo.getNamespaceID()) + throw new IOException("Incompatible namespaceIDs" + + ": active node namespaceID = " + nsInfo.getNamespaceID() + + "; backup node namespaceID = " + bnImage.getNamespaceID()); + + setRegistration(); + NamenodeRegistration nnReg = null; + while(!stopRequested) { + try { + nnReg = namenode.register(getRegistration()); + break; + } catch(SocketTimeoutException e) { // name-node is busy + LOG.info("Problem connecting to name-node: " + nnRpcAddress); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + } + } + + String msg = null; + if(nnReg == null) // consider as a rejection + msg = "Registration rejected by " + nnRpcAddress; + else if(!nnReg.isRole(NamenodeRole.ACTIVE)) { + msg = "Name-node " + nnRpcAddress + " is not active"; + } + if(msg != null) { + msg += ". Shutting down."; + LOG.error(msg); + throw new IOException(msg); // stop the node + } + nnRpcAddress = nnReg.getAddress(); + } + + /** + * Reset node namespace state in memory and in storage directories. + * @throws IOException + */ + void resetNamespace() throws IOException { + ((BackupStorage)getFSImage()).reset(); + } + + /** + * Get size of the local journal (edit log). + * @return size of the current journal + * @throws IOException + */ + long journalSize() throws IOException { + return namesystem.getEditLogSize(); + } + + // TODO: move to a common with DataNode util class + private static NamespaceInfo handshake(NamenodeProtocol namenode) + throws IOException, SocketTimeoutException { + NamespaceInfo nsInfo; + nsInfo = namenode.versionRequest(); // throws SocketTimeoutException + String errorMsg = null; + // verify build version + if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion())) { + errorMsg = "Incompatible build versions: active name-node BV = " + + nsInfo.getBuildVersion() + "; backup node BV = " + + Storage.getBuildVersion(); + LOG.fatal(errorMsg); + throw new IOException(errorMsg); + } + assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : + "Active and backup node layout versions must be the same. Expected: " + + FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion(); + return nsInfo; + } +} Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=753481&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Sat Mar 14 01:20:36 2009 @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.hadoop.hdfs.server.common.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; +import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.io.LongWritable; + +public class BackupStorage extends FSImage { + // Names of the journal spool directory and the spool file + private static final String STORAGE_JSPOOL_DIR = "jspool"; + private static final String STORAGE_JSPOOL_FILE = + NameNodeFile.EDITS_NEW.getName(); + + /** Backup input stream for loading edits into memory */ + private EditLogBackupInputStream backupInputStream; + /** Is journal spooling in progress */ + volatile JSpoolState jsState; + + static enum JSpoolState { + OFF, + INPROGRESS, + WAIT; + } + + /** + */ + BackupStorage() { + super(); + jsState = JSpoolState.OFF; + } + + @Override + public boolean isConversionNeeded(StorageDirectory sd) { + return false; + } + + /** + * Analyze backup storage directories for consistency.
+ * Recover from incomplete checkpoints if required.
+ * Read VERSION and fstime files if exist.
+ * Do not load image or edits. + * + * @param imageDirs list of image directories. + * @param editsDirs list of edits directories. + * @throws IOException if the node should shutdown. + */ + void recoverCreateRead(Collection imageDirs, + Collection editsDirs) throws IOException { + setStorageDirectories(imageDirs, editsDirs); + this.checkpointTime = 0L; + for(Iterator it = dirIterator(); it.hasNext();) { + StorageDirectory sd = it.next(); + StorageState curState; + try { + curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR); + // sd is locked but not opened + switch(curState) { + case NON_EXISTENT: + // fail if any of the configured storage dirs are inaccessible + throw new InconsistentFSStateException(sd.getRoot(), + "checkpoint directory does not exist or is not accessible."); + case NOT_FORMATTED: + // for backup node all directories may be unformatted initially + LOG.info("Storage directory " + sd.getRoot() + " is not formatted."); + LOG.info("Formatting ..."); + sd.clearDirectory(); // create empty current + break; + case NORMAL: + break; + default: // recovery is possible + sd.doRecover(curState); + } + if(curState != StorageState.NOT_FORMATTED) { + sd.read(); // read and verify consistency with other directories + } + } catch(IOException ioe) { + sd.unlock(); + throw ioe; + } + } + } + + /** + * Reset storage directories. + *

+ * Unlock the storage. + * Rename current to lastcheckpoint.tmp + * and recreate empty current. + * @throws IOException + */ + synchronized void reset() throws IOException { + // reset NameSpace tree + FSDirectory fsDir = getFSNamesystem().dir; + fsDir.reset(); + + // unlock, close and rename storage directories + unlockAll(); + // recover from unsuccessful checkpoint if necessary + recoverCreateRead(getImageDirectories(), getEditsDirectories()); + // rename and recreate + for(StorageDirectory sd : storageDirs) { + File curDir = sd.getCurrentDir(); + File tmpCkptDir = sd.getLastCheckpointTmp(); + assert !tmpCkptDir.exists() : + tmpCkptDir.getName() + " directory must not exist."; + if(!sd.getVersionFile().exists()) + continue; + // rename current to lastcheckpoint.tmp + rename(curDir, tmpCkptDir); + if(!curDir.mkdir()) + throw new IOException("Cannot create directory " + curDir); + } + } + + /** + * Load checkpoint from local files only if the memory state is empty.
+ * Set new checkpoint time received from the name-node.
+ * Move lastcheckpoint.tmp to previous.checkpoint. + * @throws IOException + */ + void loadCheckpoint(CheckpointSignature sig) throws IOException { + // load current image and journal if it is not in memory already + if(!editLog.isOpen()) + editLog.open(); + + FSDirectory fsDir = getFSNamesystem().dir; + if(fsDir.isEmpty()) { + Iterator itImage = dirIterator(NameNodeDirType.IMAGE); + Iterator itEdits = dirIterator(NameNodeDirType.EDITS); + if(!itImage.hasNext() || ! itEdits.hasNext()) + throw new IOException("Could not locate checkpoint directories"); + StorageDirectory sdName = itImage.next(); + StorageDirectory sdEdits = itEdits.next(); + synchronized(getFSDirectoryRootLock()) { // load image under rootDir lock + loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE)); + } + loadFSEdits(sdEdits); + } + + // set storage fields + setStorageInfo(sig); + checkpointTime = sig.checkpointTime; + } + + /** + * Save meta-data into fsimage files. + * and create empty edits. + */ + void saveCheckpoint() throws IOException { + // save image into fsimage.ckpt and purge edits file + for (Iterator it = + dirIterator(); it.hasNext();) { + StorageDirectory sd = it.next(); + NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType(); + if (dirType.isOfType(NameNodeDirType.IMAGE)) + saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW)); + if (dirType.isOfType(NameNodeDirType.EDITS)) + editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS)); + } + + ckptState = CheckpointStates.UPLOAD_DONE; + renameCheckpoint(); + } + + private FSNamesystem getFSNamesystem() { + // HADOOP-5119 should get rid of this. + return FSNamesystem.getFSNamesystem(); + } + + private Object getFSDirectoryRootLock() { + return getFSNamesystem().dir.rootDir; + } + + static File getJSpoolDir(StorageDirectory sd) { + return new File(sd.getRoot(), STORAGE_JSPOOL_DIR); + } + + static File getJSpoolFile(StorageDirectory sd) { + return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE); + } + + /** + * Journal writer journals new meta-data state. + *

    + *
  1. If Journal Spool state is OFF then journal records (edits) + * are applied directly to meta-data state in memory and are written + * to the edits file(s).
  2. + *
  3. If Journal Spool state is INPROGRESS then records are only + * written to edits.new file, which is called Spooling.
  4. + *
  5. Journal Spool state WAIT blocks journaling until the + * Journal Spool reader finalizes merging of the spooled data and + * switches to applying journal to memory.
  6. + *
+ * @param length length of data. + * @param data serialized journal records. + * @throws IOException + * @see #convergeJournalSpool() + */ + synchronized void journal(int length, byte[] data) throws IOException { + assert backupInputStream.length() == 0 : "backup input stream is not empty"; + try { + switch(jsState) { + case WAIT: + case OFF: + // wait until spooling is off + waitSpoolEnd(); + // update NameSpace in memory + backupInputStream.setBytes(data); + FSEditLog.loadEditRecords(getLayoutVersion(), + backupInputStream.getDataInputStream(), true); + getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient! + break; + case INPROGRESS: + break; + } + // write to files + editLog.logEdit(length, data); + editLog.logSync(); + } finally { + backupInputStream.clear(); + } + } + + private synchronized void waitSpoolEnd() { + while(jsState == JSpoolState.WAIT) { + try { + wait(); + } catch (InterruptedException e) {} + } + // now spooling should be off, verifying just in case + assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState; + } + + /** + * Start journal spool. + * Switch to writing into edits.new instead of edits. + * + * edits.new for spooling is in separate directory "spool" rather than in + * "current" because the two directories should be independent. + * While spooling a checkpoint can happen and current will first + * move to lastcheckpoint.tmp and then to previous.checkpoint + * spool/edits.new will remain in place during that. + */ + synchronized void startJournalSpool(NamenodeRegistration nnReg) + throws IOException { + switch(jsState) { + case OFF: + break; + case INPROGRESS: + return; + case WAIT: + waitSpoolEnd(); + } + + // create journal spool directories + for(Iterator it = + dirIterator(NameNodeDirType.EDITS); it.hasNext();) { + StorageDirectory sd = it.next(); + File jsDir = getJSpoolDir(sd); + if (!jsDir.exists() && !jsDir.mkdirs()) { + throw new IOException("Mkdirs failed to create " + + jsDir.getCanonicalPath()); + } + // create edit file if missing + File eFile = getEditFile(sd); + if(!eFile.exists()) { + editLog.createEditLogFile(eFile); + } + } + + if(!editLog.isOpen()) + editLog.open(); + + // create streams pointing to the journal spool files + // subsequent journal records will go directly to the spool + editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE); + setCheckpointState(CheckpointStates.ROLLED_EDITS); + + // set up spooling + if(backupInputStream == null) + backupInputStream = new EditLogBackupInputStream(nnReg.getAddress()); + jsState = JSpoolState.INPROGRESS; + } + + synchronized void setCheckpointTime(int length, byte[] data) + throws IOException { + assert backupInputStream.length() == 0 : "backup input stream is not empty"; + try { + // unpack new checkpoint time + backupInputStream.setBytes(data); + DataInputStream in = backupInputStream.getDataInputStream(); + byte op = in.readByte(); + assert op == NamenodeProtocol.JA_CHECKPOINT_TIME; + LongWritable lw = new LongWritable(); + lw.readFields(in); + setCheckpointTime(lw.get()); + } finally { + backupInputStream.clear(); + } + } + + /** + * Merge Journal Spool to memory.

+ * Journal Spool reader reads journal records from edits.new. + * When it reaches the end of the file it sets {@link JSpoolState} to WAIT. + * This blocks journaling (see {@link #journal(int,byte[])}. + * The reader + *

    + *
  • reads remaining journal records if any,
  • + *
  • renames edits.new to edits,
  • + *
  • sets {@link JSpoolState} to OFF,
  • + *
  • and notifies the journaling thread.
  • + *
+ * Journaling resumes with applying new journal records to the memory state, + * and writing them into edits file(s). + */ + void convergeJournalSpool() throws IOException { + Iterator itEdits = dirIterator(NameNodeDirType.EDITS); + if(! itEdits.hasNext()) + throw new IOException("Could not locate checkpoint directories"); + StorageDirectory sdEdits = itEdits.next(); + int numEdits = 0; + File jSpoolFile = getJSpoolFile(sdEdits); + long startTime = FSNamesystem.now(); + if(jSpoolFile.exists()) { + // load edits.new + EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile); + DataInputStream in = edits.getDataInputStream(); + numEdits += FSEditLog.loadFSEdits(in, false); + + // first time reached the end of spool + jsState = JSpoolState.WAIT; + numEdits += FSEditLog.loadEditRecords(getLayoutVersion(), in, true); + getFSNamesystem().dir.updateCountForINodeWithQuota(); + edits.close(); + } + + FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath() + + " of size " + jSpoolFile.length() + " edits # " + numEdits + + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds."); + + // rename spool edits.new to edits making it in sync with the active node + // subsequent journal records will go directly to edits + editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE); + + // write version file + resetVersion(false); + + // wake up journal writer + synchronized(this) { + jsState = JSpoolState.OFF; + notifyAll(); + } + + // Rename lastcheckpoint.tmp to previous.checkpoint + for(StorageDirectory sd : storageDirs) { + File tmpCkptDir = sd.getLastCheckpointTmp(); + File prevCkptDir = sd.getPreviousCheckpoint(); + // delete previous directory + if (prevCkptDir.exists()) + deleteDir(prevCkptDir); + // rename tmp to previous + if (tmpCkptDir.exists()) + rename(tmpCkptDir, prevCkptDir); + } + } +} Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Sat Mar 14 01:20:36 2009 @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.io.WritableComparable; /** @@ -33,12 +34,12 @@ long editsTime = -1L; long checkpointTime = -1L; - CheckpointSignature() {} + public CheckpointSignature() {} CheckpointSignature(FSImage fsImage) { super(fsImage); editsTime = fsImage.getEditLog().getFsEditTime(); - checkpointTime = fsImage.checkpointTime; + checkpointTime = fsImage.getCheckpointTime(); } CheckpointSignature(String str) { @@ -59,14 +60,17 @@ + String.valueOf(checkpointTime); } - void validateStorageInfo(StorageInfo si) throws IOException { + void validateStorageInfo(FSImage si) throws IOException { if(layoutVersion != si.layoutVersion - || namespaceID != si.namespaceID || cTime != si.cTime) { + || namespaceID != si.namespaceID || cTime != si.cTime + || checkpointTime != si.checkpointTime) { // checkpointTime can change when the image is saved - do not compare - throw new IOException("Inconsistent checkpoint fileds. " + throw new IOException("Inconsistent checkpoint fields.\n" + "LV = " + layoutVersion + " namespaceID = " + namespaceID - + " cTime = " + cTime + ". Expecting respectively: " - + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime); + + " cTime = " + cTime + "; checkpointTime = " + checkpointTime + + ".\nExpecting respectively: " + + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime + + "; " + si.checkpointTime); } } @@ -100,17 +104,13 @@ // Writable ///////////////////////////////////////////////// public void write(DataOutput out) throws IOException { - out.writeInt(getLayoutVersion()); - out.writeInt(getNamespaceID()); - out.writeLong(getCTime()); + super.write(out); out.writeLong(editsTime); out.writeLong(checkpointTime); } public void readFields(DataInput in) throws IOException { - layoutVersion = in.readInt(); - namespaceID = in.readInt(); - cTime = in.readLong(); + super.readFields(in); editsTime = in.readLong(); checkpointTime = in.readLong(); } Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=753481&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Mar 14 01:20:36 2009 @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates; +import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer; + +/** + * The Checkpointer is responsible for supporting periodic checkpoints + * of the HDFS metadata. + * + * The Checkpointer is a daemon that periodically wakes up + * up (determined by the schedule specified in the configuration), + * triggers a periodic checkpoint and then goes back to sleep. + * + * The start of a checkpoint is triggered by one of the two factors: + * (1) time or (2) the size of the edits file. + */ +class Checkpointer implements Runnable { + public static final Log LOG = + LogFactory.getLog(Checkpointer.class.getName()); + + private BackupNode backupNode; + volatile boolean shouldRun; + private long checkpointPeriod; // in seconds + private long checkpointSize; // size (in MB) of current Edit Log + + private BackupStorage getFSImage() { + return (BackupStorage)backupNode.getFSImage(); + } + + private NamenodeProtocol getNamenode(){ + return backupNode.namenode; + } + + /** + * Create a connection to the primary namenode. + */ + Checkpointer(Configuration conf, BackupNode bnNode) throws IOException { + this.backupNode = bnNode; + try { + initialize(conf); + } catch(IOException e) { + shutdown(); + throw e; + } + } + + /** + * Initialize checkpoint. + */ + private void initialize(Configuration conf) throws IOException { + // Create connection to the namenode. + shouldRun = true; + + // Initialize other scheduling parameters from the configuration + checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600); + checkpointSize = conf.getLong("fs.checkpoint.size", 4194304); + + HttpServer httpServer = backupNode.httpServer; + httpServer.setAttribute("name.system.image", getFSImage()); + httpServer.setAttribute("name.conf", conf); + httpServer.addServlet("getimage", "/getimage", GetImageServlet.class); + + LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " + + "(" + checkpointPeriod/60 + " min)"); + LOG.info("Log Size Trigger : " + checkpointSize + " bytes " + + "(" + checkpointSize/1024 + " KB)"); + } + + /** + * Shut down the checkpointer. + */ + void shutdown() { + shouldRun = false; + backupNode.stop(); + } + + // + // The main work loop + // + public void run() { + // Check the size of the edit log once every 5 minutes. + long periodMSec = 5 * 60; // 5 minutes + if(checkpointPeriod < periodMSec) { + periodMSec = checkpointPeriod; + } + periodMSec *= 1000; + + long lastCheckpointTime = 0; + if(!backupNode.shouldCheckpointAtStartup()) + lastCheckpointTime = FSNamesystem.now(); + while(shouldRun) { + try { + long now = FSNamesystem.now(); + boolean shouldCheckpoint = false; + if(now >= lastCheckpointTime + periodMSec) { + shouldCheckpoint = true; + } else { + long size = getJournalSize(); + if(size >= checkpointSize) + shouldCheckpoint = true; + } + if(shouldCheckpoint) { + doCheckpoint(); + lastCheckpointTime = now; + } + } catch(IOException e) { + LOG.error("Exception in doCheckpoint: ", e); + } catch(Throwable e) { + LOG.error("Throwable Exception in doCheckpoint: ", e); + Runtime.getRuntime().exit(-1); + } + try { + Thread.sleep(periodMSec); + } catch(InterruptedException ie) { + // do nothing + } + } + } + + private long getJournalSize() throws IOException { + // If BACKUP node has been loaded + // get edits size from the local file. ACTIVE has the same. + if(backupNode.isRole(NamenodeRole.BACKUP) + && getFSImage().getEditLog().isOpen()) + return backupNode.journalSize(); + // Go to the ACTIVE node for its size + return getNamenode().journalSize(backupNode.getRegistration()); + } + + /** + * Download fsimage and edits + * files from the remote name-node. + */ + private void downloadCheckpoint(CheckpointSignature sig) throws IOException { + // Retrieve image file + String fileid = "getimage=1"; + File[] files = getFSImage().getImageFiles(); + assert files.length > 0 : "No checkpoint targets."; + String nnHttpAddr = backupNode.nnHttpAddress; + TransferFsImage.getFileClient(nnHttpAddr, fileid, files); + LOG.info("Downloaded file " + files[0].getName() + " size " + + files[0].length() + " bytes."); + + // Retrieve edits file + fileid = "getedit=1"; + files = getFSImage().getEditsFiles(); + assert files.length > 0 : "No checkpoint targets."; + TransferFsImage.getFileClient(nnHttpAddr, fileid, files); + LOG.info("Downloaded file " + files[0].getName() + " size " + + files[0].length() + " bytes."); + } + + /** + * Copy the new image into remote name-node. + */ + private void uploadCheckpoint(CheckpointSignature sig) throws IOException { + InetSocketAddress httpSocAddr = backupNode.getHttpAddress(); + int httpPort = httpSocAddr.getPort(); + String fileid = "putimage=1&port=" + httpPort + + "&machine=" + + InetAddress.getLocalHost().getHostAddress() + + "&token=" + sig.toString(); + LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid); + TransferFsImage.getFileClient(backupNode.nnHttpAddress, fileid, (File[])null); + } + + /** + * Create a new checkpoint + */ + void doCheckpoint() throws IOException { + long startTime = FSNamesystem.now(); + NamenodeCommand cmd = + getNamenode().startCheckpoint(backupNode.getRegistration()); + CheckpointCommand cpCmd = null; + switch(cmd.getAction()) { + case NamenodeProtocol.ACT_SHUTDOWN: + shutdown(); + throw new IOException("Name-node " + backupNode.nnRpcAddress + + " requested shutdown."); + case NamenodeProtocol.ACT_CHECKPOINT: + cpCmd = (CheckpointCommand)cmd; + break; + default: + throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction()); + } + + CheckpointSignature sig = cpCmd.getSignature(); + assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() : + "Signature should have current layout version. Expected: " + + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion(); + assert !backupNode.isRole(NamenodeRole.CHECKPOINT) || + cpCmd.isImageObsolete() : "checkpoint node should always download image."; + backupNode.setCheckpointState(CheckpointStates.UPLOAD_START); + if(cpCmd.isImageObsolete()) { + // First reset storage on disk and memory state + backupNode.resetNamespace(); + downloadCheckpoint(sig); + } + + BackupStorage bnImage = getFSImage(); + bnImage.loadCheckpoint(sig); + sig.validateStorageInfo(bnImage); + bnImage.saveCheckpoint(); + + if(cpCmd.needToReturnImage()) + uploadCheckpoint(sig); + + getNamenode().endCheckpoint(backupNode.getRegistration(), sig); + + bnImage.convergeJournalSpool(); + backupNode.setRegistration(); // keep registration up to date + if(backupNode.isRole(NamenodeRole.CHECKPOINT)) + getFSImage().getEditLog().close(); + LOG.info("Checkpoint completed in " + + (FSNamesystem.now() - startTime)/1000 + " seconds." + + " New Image Size: " + bnImage.getFsImageName().length()); + } +} Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Mar 14 01:20:36 2009 @@ -367,9 +367,9 @@ void reportDiff(BlocksMap blocksMap, BlockListAsLongs newReport, - Collection toAdd, - Collection toRemove, - Collection toInvalidate) { + Collection toAdd, // add to DatanodeDescriptor + Collection toRemove, // remove from DatanodeDescriptor + Collection toInvalidate) { // should be removed from DN // place a deilimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Sat Mar 14 01:20:36 2009 @@ -64,7 +64,7 @@ try { Thread.sleep(recheckInterval); } catch (InterruptedException ie) { - LOG.info("Interrupted " + this.getClass().getSimpleName(), ie); + LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie); } } } Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=753481&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Sat Mar 14 01:20:36 2009 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; + +/** + * An implementation of the abstract class {@link EditLogInputStream}, + * which is used to updates HDFS meta-data state on a backup node. + * + * @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal + * (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration, + * int, int, byte[]) + */ +class EditLogBackupInputStream extends EditLogInputStream { + String address; // sender address + private ByteBufferInputStream inner; + private DataInputStream in; + + /** + * A ByteArrayInputStream, which lets modify the underlying byte array. + */ + private static class ByteBufferInputStream extends ByteArrayInputStream { + ByteBufferInputStream() { + super(new byte[0]); + } + + byte[] getData() { + return super.buf; + } + + void setData(byte[] newBytes) { + super.buf = newBytes; + super.count = newBytes == null ? 0 : newBytes.length; + super.mark = 0; + reset(); + } + + /** + * Number of bytes read from the stream so far. + */ + int length() { + return count; + } + } + + EditLogBackupInputStream(String name) throws IOException { + address = name; + inner = new ByteBufferInputStream(); + in = new DataInputStream(inner); + } + + @Override // JournalStream + public String getName() { + return address; + } + + @Override // JournalStream + public JournalType getType() { + return JournalType.BACKUP; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + long length() throws IOException { + // file size + size of both buffers + return inner.length(); + } + + DataInputStream getDataInputStream() { + return in; + } + + void setBytes(byte[] newBytes) throws IOException { + inner.setData(newBytes); + in.reset(); + } + + void clear() throws IOException { + setBytes(null); + } +} Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=753481&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Mar 14 01:20:36 2009 @@ -0,0 +1,190 @@ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; + +/** + * An implementation of the abstract class {@link EditLogOutputStream}, + * which streams edits to a backup node. + * + * @see org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol#journal + * (org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration, + * int, int, byte[]) + */ +class EditLogBackupOutputStream extends EditLogOutputStream { + static int DEFAULT_BUFFER_SIZE = 256; + + private NamenodeProtocol backupNode; // RPC proxy to backup node + private NamenodeRegistration bnRegistration; // backup node registration + private NamenodeRegistration nnRegistration; // active node registration + private ArrayList bufCurrent; // current buffer for writing + private ArrayList bufReady; // buffer ready for flushing + private DataOutputBuffer out; // serialized output sent to backup node + + static class JournalRecord { + byte op; + Writable[] args; + + JournalRecord(byte op, Writable ... writables) { + this.op = op; + this.args = writables; + } + + void write(DataOutputStream out) throws IOException { + out.write(op); + if(args == null) + return; + for(Writable w : args) + w.write(out); + } + } + + EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node + NamenodeRegistration nnReg) // active name-node + throws IOException { + super(); + this.bnRegistration = bnReg; + this.nnRegistration = nnReg; + InetSocketAddress bnAddress = + NetUtils.createSocketAddr(bnRegistration.getAddress()); + Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress); + try { + this.backupNode = + (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class, + NamenodeProtocol.versionID, bnAddress, new Configuration()); + } catch(IOException e) { + Storage.LOG.error("Error connecting to: " + bnAddress, e); + throw e; + } + this.bufCurrent = new ArrayList(); + this.bufReady = new ArrayList(); + this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); + } + + @Override // JournalStream + public String getName() { + return bnRegistration.getAddress(); + } + + @Override // JournalStream + public JournalType getType() { + return JournalType.BACKUP; + } + + @Override // EditLogOutputStream + public void write(int b) throws IOException { + throw new IOException("Not implemented"); + } + + @Override // EditLogOutputStream + void write(byte op, Writable ... writables) throws IOException { + bufCurrent.add(new JournalRecord(op, writables)); + } + + /** + * There is no persistent storage. Just clear the buffers. + */ + @Override // EditLogOutputStream + void create() throws IOException { + bufCurrent.clear(); + assert bufReady.size() == 0 : "previous data is not flushed yet"; + } + + @Override // EditLogOutputStream + public void close() throws IOException { + // close should have been called after all pending transactions + // have been flushed & synced. + int size = bufCurrent.size(); + if (size != 0) { + throw new IOException("BackupEditStream has " + size + + " records still to be flushed and cannot be closed."); + } + RPC.stopProxy(backupNode); // stop the RPC threads + bufCurrent = bufReady = null; + } + + @Override // EditLogOutputStream + void setReadyToFlush() throws IOException { + assert bufReady.size() == 0 : "previous data is not flushed yet"; + ArrayList tmp = bufReady; + bufReady = bufCurrent; + bufCurrent = tmp; + } + + @Override // EditLogOutputStream + protected void flushAndSync() throws IOException { + assert out.size() == 0 : "Output buffer is not empty"; + int bufReadySize = bufReady.size(); + for(int idx = 0; idx < bufReadySize; idx++) { + JournalRecord jRec = null; + for(; idx < bufReadySize; idx++) { + jRec = bufReady.get(idx); + if(jRec.op >= FSEditLog.OP_JSPOOL_START) + break; // special operation should be sent in a separate call to BN + jRec.write(out); + } + if(out.size() > 0) + send(NamenodeProtocol.JA_JOURNAL); + if(idx == bufReadySize) + break; + // operation like start journal spool or increment checkpoint time + // is a separate call to BN + jRec.write(out); + send(jRec.op); + } + bufReady.clear(); // erase all data in the buffer + out.reset(); // reset buffer to the start position + } + + /** + * There is no persistent storage. Therefore length is 0.

+ * Length is used to check when it is large enough to start a checkpoint. + * This criteria should not be used for backup streams. + */ + @Override // EditLogOutputStream + long length() throws IOException { + return 0; + } + + private void send(int ja) throws IOException { + try { + int length = out.getLength(); + out.write(FSEditLog.OP_INVALID); + backupNode.journal(nnRegistration, ja, length, out.getData()); + } finally { + out.reset(); + } + } + + /** + * Get backup node registration. + */ + NamenodeRegistration getRegistration() { + return bnRegistration; + } + + /** + * Verify that the backup node is alive. + */ + boolean isAlive() { + try { + send(NamenodeProtocol.JA_IS_ALIVE); + } catch(IOException ei) { + Storage.LOG.info(bnRegistration.getRole() + " " + + bnRegistration.getAddress() + " is not alive. ", ei); + return false; + } + return true; + } +} Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Sat Mar 14 01:20:36 2009 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.BufferedInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -27,14 +29,8 @@ * It should stream bytes from the storage exactly as they were written * into the #{@link EditLogOutputStream}. */ -abstract class EditLogInputStream extends InputStream { - /** - * Get this stream name. - * - * @return name of the stream - */ - abstract String getName(); - +abstract class EditLogInputStream extends InputStream +implements JournalStream { /** {@inheritDoc} */ public abstract int available() throws IOException; @@ -51,4 +47,11 @@ * Return the size of the current edits log. */ abstract long length() throws IOException; + + /** + * Return DataInputStream based on this edit stream. + */ + DataInputStream getDataInputStream() { + return new DataInputStream(new BufferedInputStream(this)); + } } Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Sat Mar 14 01:20:36 2009 @@ -26,7 +26,8 @@ * A generic abstract class to support journaling of edits logs into * a persistent storage. */ -abstract class EditLogOutputStream extends OutputStream { +abstract class EditLogOutputStream extends OutputStream +implements JournalStream { // these are statistics counters private long numSync; // number of sync(s) to disk private long totalTimeSync; // total time to sync @@ -35,13 +36,6 @@ numSync = totalTimeSync = 0; } - /** - * Get this stream name. - * - * @return name of the stream - */ - abstract String getName(); - /** {@inheritDoc} */ abstract public void write(int b) throws IOException; @@ -57,7 +51,7 @@ abstract void write(byte op, Writable ... writables) throws IOException; /** - * Create and initialize new edits log storage. + * Create and initialize underlying persistent edits log storage. * * @throws IOException */ @@ -97,6 +91,10 @@ */ abstract long length() throws IOException; + boolean isOperationSupported(byte op) { + return true; + } + /** * Return total time spent in {@link #flushAndSync()} */ @@ -110,4 +108,9 @@ long getNumSync() { return numSync; } + + @Override // Object + public String toString() { + return getName(); + } } Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=753481&r1=753480&r2=753481&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sat Mar 14 01:20:36 2009 @@ -46,7 +46,7 @@ class FSDirectory implements Closeable { final FSNamesystem namesystem; - final INodeDirectoryWithQuota rootDir; + INodeDirectoryWithQuota rootDir; FSImage fsImage; private boolean ready = false; // Metrics record @@ -93,8 +93,6 @@ } FSEditLog editLog = fsImage.getEditLog(); assert editLog != null : "editLog must be initialized"; - if (!editLog.isOpen()) - editLog.open(); fsImage.setCheckpointDirectories(null, null); } catch(IOException e) { fsImage.close(); @@ -594,7 +592,11 @@ } return dirNotEmpty; } - + + boolean isEmpty() { + return isDirEmpty("/"); + } + /** * Delete a path from the name space * Update the count at each ancestor directory with quota @@ -1251,7 +1253,16 @@ } return status; } - + + /** + * Reset the entire namespace tree. + */ + void reset() { + rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME, + namesystem.createFsOwnerPermissions(new FsPermission((short)0755)), + Integer.MAX_VALUE, -1); + } + /** * Create FileStatus by file INode */