Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 73245 invoked from network); 2 Feb 2011 02:05:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Feb 2011 02:05:49 -0000 Received: (qmail 92222 invoked by uid 500); 2 Feb 2011 02:05:49 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 92191 invoked by uid 500); 2 Feb 2011 02:05:49 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 92183 invoked by uid 99); 2 Feb 2011 02:05:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Feb 2011 02:05:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Feb 2011 02:05:43 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 75FF023889E9; Wed, 2 Feb 2011 02:05:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1066305 [2/3] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/ Date: Wed, 02 Feb 2011 02:05:19 -0000 To: hdfs-commits@hadoop.apache.org From: jitendra@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110202020520.75FF023889E9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1066305&r1=1066304&r2=1066305&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Feb 2 02:05:18 2011 @@ -17,13 +17,9 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.Closeable; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; import java.net.URI; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -35,24 +31,26 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.Random; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.common.UpgradeManager; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.common.Storage.StorageState; import org.apache.hadoop.hdfs.server.common.Util; import static org.apache.hadoop.hdfs.server.common.Util.now; import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; -import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; -import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -66,70 +64,23 @@ import org.apache.hadoop.hdfs.DFSConfigK */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class FSImage extends Storage { +public class FSImage implements NNStorageListener, Closeable { + protected static final Log LOG = LogFactory.getLog(FSImage.class.getName()); private static final SimpleDateFormat DATE_FORM = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest"; - // - // The filenames used for storing the images - // - enum NameNodeFile { - IMAGE ("fsimage"), - TIME ("fstime"), - EDITS ("edits"), - IMAGE_NEW ("fsimage.ckpt"), - EDITS_NEW ("edits.new"); - - private String fileName = null; - private NameNodeFile(String name) {this.fileName = name;} - String getName() {return fileName;} - } + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // checkpoint states enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; } - /** - * Implementation of StorageDirType specific to namenode storage - * A Storage directory could be of type IMAGE which stores only fsimage, - * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which - * stores both fsimage and edits. - */ - static enum NameNodeDirType implements StorageDirType { - UNDEFINED, - IMAGE, - EDITS, - IMAGE_AND_EDITS; - - public StorageDirType getStorageDirType() { - return this; - } - - public boolean isOfType(StorageDirType type) { - if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS)) - return true; - return this == type; - } - } protected FSNamesystem namesystem = null; - protected long checkpointTime = -1L; // The age of the image protected FSEditLog editLog = null; private boolean isUpgradeFinalized = false; - protected MD5Hash imageDigest = null; protected MD5Hash newImageDigest = null; - /** - * flag that controls if we try to restore failed storages - */ - private boolean restoreFailedStorage = false; + protected NNStorage storage = null; /** - * list of failed (and thus removed) storages - */ - protected List removedStorageDirs = new ArrayList(); - - /** * URIs for importing an image from a checkpoint. In the default case, * URIs will represent directories. */ @@ -160,16 +111,22 @@ public class FSImage extends Storage { if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) { NameNode.LOG.info("set FSImage.restoreFailedStorage"); - setRestoreFailedStorage(true); + storage.setRestoreFailedStorage(true); } setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null), FSImage.getCheckpointEditsDirs(conf, null)); } private FSImage(FSNamesystem ns) { - super(NodeType.NAME_NODE); this.conf = new Configuration(); - this.editLog = new FSEditLog(this); + + storage = new NNStorage(conf); + if (ns != null) { + storage.setUpgradeManager(ns.upgradeManager); + } + storage.registerListener(this); + + this.editLog = new FSEditLog(storage); setFSNamesystem(ns); } @@ -179,11 +136,7 @@ public class FSImage extends Storage { FSImage(Collection fsDirs, Collection fsEditsDirs) throws IOException { this(); - setStorageDirectories(fsDirs, fsEditsDirs); - } - - public FSImage(StorageInfo storageInfo) { - super(NodeType.NAME_NODE, storageInfo); + storage.setStorageDirectories(fsDirs, fsEditsDirs); } /** @@ -196,7 +149,7 @@ public class FSImage extends Storage { ArrayList editsDirs = new ArrayList(1); dirs.add(imageDir); editsDirs.add(imageDir); - setStorageDirectories(dirs, editsDirs); + storage.setStorageDirectories(dirs, editsDirs); } protected FSNamesystem getFSNamesystem() { @@ -205,159 +158,17 @@ public class FSImage extends Storage { void setFSNamesystem(FSNamesystem ns) { namesystem = ns; - } - - public void setRestoreFailedStorage(boolean val) { - LOG.info("set restore failed storage to " + val); - restoreFailedStorage=val; - } - - public boolean getRestoreFailedStorage() { - return restoreFailedStorage; - } - - void setStorageDirectories(Collection fsNameDirs, - Collection fsEditsDirs) throws IOException { - this.storageDirs = new ArrayList(); - this.removedStorageDirs = new ArrayList(); - - // Add all name dirs with appropriate NameNodeDirType - for (URI dirName : fsNameDirs) { - checkSchemeConsistency(dirName); - boolean isAlsoEdits = false; - for (URI editsDirName : fsEditsDirs) { - if (editsDirName.compareTo(dirName) == 0) { - isAlsoEdits = true; - fsEditsDirs.remove(editsDirName); - break; - } - } - NameNodeDirType dirType = (isAlsoEdits) ? - NameNodeDirType.IMAGE_AND_EDITS : - NameNodeDirType.IMAGE; - // Add to the list of storage directories, only if the - // URI is of type file:// - if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) - == 0){ - this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), - dirType)); - } - } - - // Add edits dirs if they are different from name dirs - for (URI dirName : fsEditsDirs) { - checkSchemeConsistency(dirName); - // Add to the list of storage directories, only if the - // URI is of type file:// - if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) - == 0) - this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), - NameNodeDirType.EDITS)); + if (ns != null) { + storage.setUpgradeManager(ns.upgradeManager); } } - /* - * Checks the consistency of a URI, in particular if the scheme - * is specified and is supported by a concrete implementation - */ - static void checkSchemeConsistency(URI u) throws IOException { - String scheme = u.getScheme(); - // the URI should have a proper scheme - if(scheme == null) - throw new IOException("Undefined scheme for " + u); - else { - try { - // the scheme should be enumerated as JournalType - JournalType.valueOf(scheme.toUpperCase()); - } catch (IllegalArgumentException iae){ - throw new IOException("Unknown scheme " + scheme + - ". It should correspond to a JournalType enumeration value"); - } - } - }; - void setCheckpointDirectories(Collection dirs, Collection editsDirs) { checkpointDirs = dirs; checkpointEditsDirs = editsDirs; } - static File getImageFile(StorageDirectory sd, NameNodeFile type) { - return new File(sd.getCurrentDir(), type.getName()); - } - - List getRemovedStorageDirs() { - return this.removedStorageDirs; - } - - File getEditFile(StorageDirectory sd) { - return getImageFile(sd, NameNodeFile.EDITS); - } - - File getEditNewFile(StorageDirectory sd) { - return getImageFile(sd, NameNodeFile.EDITS_NEW); - } - - Collection getFiles(NameNodeFile type, NameNodeDirType dirType) { - ArrayList list = new ArrayList(); - Iterator it = (dirType == null) ? dirIterator() : - dirIterator(dirType); - for ( ;it.hasNext(); ) { - list.add(getImageFile(it.next(), type)); - } - return list; - } - - Collection getDirectories(NameNodeDirType dirType) - throws IOException { - ArrayList list = new ArrayList(); - Iterator it = (dirType == null) ? dirIterator() : - dirIterator(dirType); - for ( ;it.hasNext(); ) { - StorageDirectory sd = it.next(); - try { - list.add(Util.fileAsURI(sd.getRoot())); - } catch (IOException e) { - throw new IOException("Exception while processing " + - "StorageDirectory " + sd.getRoot(), e); - } - } - return list; - } - - /** - * Retrieve current directories of type IMAGE - * @return Collection of URI representing image directories - * @throws IOException in case of URI processing error - */ - Collection getImageDirectories() throws IOException { - return getDirectories(NameNodeDirType.IMAGE); - } - - /** - * Retrieve current directories of type EDITS - * @return Collection of URI representing edits directories - * @throws IOException in case of URI processing error - */ - Collection getEditsDirectories() throws IOException { - return getDirectories(NameNodeDirType.EDITS); - } - - /** - * Return number of storage directories of the given type. - * @param dirType directory type - * @return number of storage directories of type dirType - */ - int getNumStorageDirs(NameNodeDirType dirType) { - if(dirType == null) - return getNumStorageDirs(); - Iterator it = dirIterator(dirType); - int numDirs = 0; - for(; it.hasNext(); it.next()) - numDirs++; - return numDirs; - } - /** * Analyze storage directories. * Recover from previous transitions if required. @@ -380,26 +191,25 @@ public class FSImage extends Storage { if((dataDirs.size() == 0 || editsDirs.size() == 0) && startOpt != StartupOption.IMPORT) throw new IOException( - "All specified directories are not accessible or do not exist."); + "All specified directories are not accessible or do not exist."); if(startOpt == StartupOption.IMPORT && (checkpointDirs == null || checkpointDirs.isEmpty())) throw new IOException("Cannot import image from a checkpoint. " - + "\"dfs.namenode.checkpoint.dir\" is not set." ); + + "\"dfs.namenode.checkpoint.dir\" is not set." ); if(startOpt == StartupOption.IMPORT && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty())) throw new IOException("Cannot import image from a checkpoint. " - + "\"dfs.namenode.checkpoint.dir\" is not set." ); + + "\"dfs.namenode.checkpoint.dir\" is not set." ); - setStorageDirectories(dataDirs, editsDirs); + storage.setStorageDirectories(dataDirs, editsDirs); // 1. For each data directory calculate its state and // check whether all is consistent before transitioning. Map dataDirStates = new HashMap(); boolean isFormatted = false; - for (Iterator it = - dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); StorageState curState; try { @@ -425,7 +235,7 @@ public class FSImage extends Storage { if (startOpt == StartupOption.IMPORT && isFormatted) // import of a checkpoint is allowed only into empty image directories throw new IOException("Cannot import image from a checkpoint. " - + " NameNode already contains an image in " + sd.getRoot()); + + " NameNode already contains an image in "+ sd.getRoot()); } catch (IOException ioe) { sd.unlock(); throw ioe; @@ -436,23 +246,23 @@ public class FSImage extends Storage { if (!isFormatted && startOpt != StartupOption.ROLLBACK && startOpt != StartupOption.IMPORT) throw new IOException("NameNode is not formatted."); - if (layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) { - checkVersionUpgradable(layoutVersion); + if (storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) { + NNStorage.checkVersionUpgradable(storage.getLayoutVersion()); } if (startOpt != StartupOption.UPGRADE - && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION - && layoutVersion != FSConstants.LAYOUT_VERSION) - throw new IOException( - "\nFile system image contains an old layout version " + layoutVersion - + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION - + " is required.\nPlease restart NameNode with -upgrade option."); + && storage.getLayoutVersion() < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION + && storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION) + throw new IOException( + "\nFile system image contains an old layout version " + + storage.getLayoutVersion() + ".\nAn upgrade to version " + + FSConstants.LAYOUT_VERSION + " is required.\n" + + "Please restart NameNode with -upgrade option."); // check whether distributed upgrade is reguired and/or should be continued - verifyDistributedUpgradeProgress(startOpt); + storage.verifyDistributedUpgradeProgress(startOpt); // 2. Format unformatted dirs. - this.checkpointTime = 0L; - for (Iterator it = - dirIterator(); it.hasNext();) { + storage.setCheckpointTime(0L); + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); StorageState curState = dataDirStates.get(sd); switch(curState) { @@ -494,41 +304,39 @@ public class FSImage extends Storage { } private void doUpgrade() throws IOException { - if(getDistributedUpgradeState()) { + if(storage.getDistributedUpgradeState()) { // only distributed upgrade need to continue // don't do version upgrade this.loadFSImage(); - initializeDistributedUpgrade(); + storage.initializeDistributedUpgrade(); return; } // Upgrade is allowed only if there are // no previous fs states in any of the directories - for (Iterator it = - dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); if (sd.getPreviousDir().exists()) throw new InconsistentFSStateException(sd.getRoot(), - "previous fs state should not exist during upgrade. " - + "Finalize or rollback first."); + "previous fs state should not exist during upgrade. " + + "Finalize or rollback first."); } // load the latest image this.loadFSImage(); // Do upgrade for each directory - long oldCTime = this.getCTime(); - this.cTime = now(); // generate new cTime for the state - int oldLV = this.getLayoutVersion(); - this.layoutVersion = FSConstants.LAYOUT_VERSION; - this.checkpointTime = now(); - for (Iterator it = - dirIterator(); it.hasNext();) { + long oldCTime = storage.getCTime(); + storage.cTime = now(); // generate new cTime for the state + int oldLV = storage.getLayoutVersion(); + storage.layoutVersion = FSConstants.LAYOUT_VERSION; + storage.setCheckpointTime(now()); + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); LOG.info("Upgrading image directory " + sd.getRoot() + ".\n old LV = " + oldLV + "; old CTime = " + oldCTime - + ".\n new LV = " + this.getLayoutVersion() - + "; new CTime = " + this.getCTime()); + + ".\n new LV = " + storage.getLayoutVersion() + + "; new CTime = " + storage.getCTime()); File curDir = sd.getCurrentDir(); File prevDir = sd.getPreviousDir(); File tmpDir = sd.getPreviousTmp(); @@ -537,15 +345,15 @@ public class FSImage extends Storage { assert !tmpDir.exists() : "prvious.tmp directory must not exist."; assert !editLog.isOpen() : "Edits log must not be open."; // rename current to tmp - rename(curDir, tmpDir); + NNStorage.rename(curDir, tmpDir); // save new image saveCurrent(sd); // rename tmp to previous - rename(tmpDir, prevDir); + NNStorage.rename(tmpDir, prevDir); isUpgradeFinalized = false; LOG.info("Upgrade of " + sd.getRoot() + " is complete."); } - initializeDistributedUpgrade(); + storage.initializeDistributedUpgrade(); editLog.open(); } @@ -555,9 +363,8 @@ public class FSImage extends Storage { // Directories that don't have previous state do not rollback boolean canRollback = false; FSImage prevState = new FSImage(getFSNamesystem()); - prevState.layoutVersion = FSConstants.LAYOUT_VERSION; - for (Iterator it = - dirIterator(); it.hasNext();) { + prevState.getStorage().layoutVersion = FSConstants.LAYOUT_VERSION; + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); File prevDir = sd.getPreviousDir(); if (!prevDir.exists()) { // use current directory then @@ -566,42 +373,44 @@ public class FSImage extends Storage { sd.read(); // read and verify consistency with other directories continue; } - StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot()); - sdPrev.read(sdPrev.getPreviousVersionFile()); // read and verify consistency of the prev dir + StorageDirectory sdPrev + = prevState.getStorage().new StorageDirectory(sd.getRoot()); + + // read and verify consistency of the prev dir + sdPrev.read(sdPrev.getPreviousVersionFile()); canRollback = true; } if (!canRollback) - throw new IOException("Cannot rollback. " - + "None of the storage directories contain previous fs state."); + throw new IOException("Cannot rollback. None of the storage " + + "directories contain previous fs state."); // Now that we know all directories are going to be consistent // Do rollback for each directory containing previous state - for (Iterator it = - dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); File prevDir = sd.getPreviousDir(); if (!prevDir.exists()) continue; LOG.info("Rolling back storage directory " + sd.getRoot() - + ".\n new LV = " + prevState.getLayoutVersion() - + "; new CTime = " + prevState.getCTime()); + + ".\n new LV = " + prevState.getStorage().getLayoutVersion() + + "; new CTime = " + prevState.getStorage().getCTime()); File tmpDir = sd.getRemovedTmp(); assert !tmpDir.exists() : "removed.tmp directory must not exist."; // rename current to tmp File curDir = sd.getCurrentDir(); assert curDir.exists() : "Current directory must exist."; - rename(curDir, tmpDir); + NNStorage.rename(curDir, tmpDir); // rename previous to current - rename(prevDir, curDir); + NNStorage.rename(prevDir, curDir); // delete tmp dir - deleteDir(tmpDir); + NNStorage.deleteDir(tmpDir); LOG.info("Rollback of " + sd.getRoot()+ " is complete."); } isUpgradeFinalized = true; // check whether name-node can start in regular mode - verifyDistributedUpgradeProgress(StartupOption.REGULAR); + storage.verifyDistributedUpgradeProgress(StartupOption.REGULAR); } private void doFinalize(StorageDirectory sd) throws IOException { @@ -613,14 +422,14 @@ public class FSImage extends Storage { } LOG.info("Finalizing upgrade for storage directory " + sd.getRoot() + "." - + (getLayoutVersion()==0 ? "" : - "\n cur LV = " + this.getLayoutVersion() - + "; cur CTime = " + this.getCTime())); + + (storage.getLayoutVersion()==0 ? "" : + "\n cur LV = " + storage.getLayoutVersion() + + "; cur CTime = " + storage.getCTime())); assert sd.getCurrentDir().exists() : "Current directory must exist."; final File tmpDir = sd.getFinalizedTmp(); // rename previous to tmp and remove - rename(prevDir, tmpDir); - deleteDir(tmpDir); + NNStorage.rename(prevDir, tmpDir); + NNStorage.deleteDir(tmpDir); isUpgradeFinalized = true; LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete."); } @@ -644,17 +453,17 @@ public class FSImage extends Storage { ckptImage.close(); } // return back the real image - realImage.setStorageInfo(ckptImage); - checkpointTime = ckptImage.checkpointTime; + realImage.getStorage().setStorageInfo(ckptImage.getStorage()); + storage.setCheckpointTime(ckptImage.getStorage().getCheckpointTime()); fsNamesys.dir.fsImage = realImage; // and save it but keep the same checkpointTime saveNamespace(false); } void finalizeUpgrade() throws IOException { - for (Iterator it = - dirIterator(); it.hasNext();) { - doFinalize(it.next()); + for (Iterator it = storage.dirIterator(); it.hasNext();) { + StorageDirectory sd = it.next(); + doFinalize(sd); } } @@ -662,213 +471,10 @@ public class FSImage extends Storage { return isUpgradeFinalized; } - protected void getFields(Properties props, - StorageDirectory sd - ) throws IOException { - super.getFields(props, sd); - if (layoutVersion == 0) - throw new IOException("NameNode directory " - + sd.getRoot() + " is not formatted."); - String sDUS, sDUV; - sDUS = props.getProperty("distributedUpgradeState"); - sDUV = props.getProperty("distributedUpgradeVersion"); - setDistributedUpgradeState( - sDUS == null? false : Boolean.parseBoolean(sDUS), - sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV)); - - String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY); - if (layoutVersion <= -26) { - if (sMd5 == null) { - throw new InconsistentFSStateException(sd.getRoot(), - "file " + STORAGE_FILE_VERSION + " does not have MD5 image digest."); - } - this.imageDigest = new MD5Hash(sMd5); - } else if (sMd5 != null) { - throw new InconsistentFSStateException(sd.getRoot(), - "file " + STORAGE_FILE_VERSION + - " has image MD5 digest when version is " + layoutVersion); - } - - this.checkpointTime = readCheckpointTime(sd); - } - - /** - * Determine the checkpoint time of the specified StorageDirectory - * - * @param sd StorageDirectory to check - * @return If file exists and can be read, last checkpoint time. If not, 0L. - * @throws IOException On errors processing file pointed to by sd - */ - long readCheckpointTime(StorageDirectory sd) throws IOException { - File timeFile = getImageFile(sd, NameNodeFile.TIME); - long timeStamp = 0L; - if (timeFile.exists() && timeFile.canRead()) { - DataInputStream in = new DataInputStream(new FileInputStream(timeFile)); - try { - timeStamp = in.readLong(); - } finally { - in.close(); - } - } - return timeStamp; - } - - /** - * Write last checkpoint time and version file into the storage directory. - * - * The version file should always be written last. - * Missing or corrupted version file indicates that - * the checkpoint is not valid. - * - * @param sd storage directory - * @throws IOException - */ - protected void setFields(Properties props, - StorageDirectory sd - ) throws IOException { - super.setFields(props, sd); - boolean uState = getDistributedUpgradeState(); - int uVersion = getDistributedUpgradeVersion(); - if(uState && uVersion != getLayoutVersion()) { - props.setProperty("distributedUpgradeState", Boolean.toString(uState)); - props.setProperty("distributedUpgradeVersion", Integer.toString(uVersion)); - } - if (imageDigest == null) { - imageDigest = MD5Hash.digest( - new FileInputStream(getImageFile(sd, NameNodeFile.IMAGE))); - } - props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString()); - - writeCheckpointTime(sd); - } - - /** - * Write last checkpoint time into a separate file. - * - * @param sd - * @throws IOException - */ - void writeCheckpointTime(StorageDirectory sd) throws IOException { - if (checkpointTime < 0L) - return; // do not write negative time - File timeFile = getImageFile(sd, NameNodeFile.TIME); - if (timeFile.exists() && ! timeFile.delete()) { - LOG.error("Cannot delete chekpoint time file: " - + timeFile.getCanonicalPath()); - } - FileOutputStream fos = new FileOutputStream(timeFile); - DataOutputStream out = new DataOutputStream(fos); - try { - out.writeLong(checkpointTime); - out.flush(); - fos.getChannel().force(true); - } finally { - out.close(); - } - } - - /** - * Record new checkpoint time in order to - * distinguish healthy directories from the removed ones. - * If there is an error writing new checkpoint time, the corresponding - * storage directory is removed from the list. - */ - void incrementCheckpointTime() { - setCheckpointTime(checkpointTime + 1); - } - - /** - * The age of the namespace state.

- * Reflects the latest time the image was saved. - * Modified with every save or a checkpoint. - * Persisted in VERSION file. - */ - long getCheckpointTime() { - return checkpointTime; - } - - void setCheckpointTime(long newCpT) { - checkpointTime = newCpT; - // Write new checkpoint time in all storage directories - for(Iterator it = - dirIterator(); it.hasNext();) { - StorageDirectory sd = it.next(); - try { - writeCheckpointTime(sd); - } catch(IOException e) { - // Close any edits stream associated with this dir and remove directory - LOG.warn("incrementCheckpointTime failed on " + sd.getRoot().getPath() + ";type="+sd.getStorageDirType()); - } - } - } - - /** - * @param sds - array of SDs to process - * @param propagate - flag, if set - then call corresponding EditLog stream's - * processIOError function. - */ - void processIOError(List sds, boolean propagate) { - ArrayList al = null; - synchronized (sds) { - for (StorageDirectory sd : sds) { - // if has a stream assosiated with it - remove it too.. - if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { - EditLogOutputStream eStream = editLog.getEditsStream(sd); - if (al == null) - al = new ArrayList(1); - al.add(eStream); - } - - for (Iterator it = dirIterator(); it.hasNext();) { - StorageDirectory sd1 = it.next(); - if (sd.equals(sd1)) { - // add storage to the removed list - LOG.warn("FSImage:processIOError: removing storage: " - + sd.getRoot().getPath()); - try { - sd1.unlock(); // unlock before removing (in case it will be - // restored) - } catch (Exception e) { - LOG.info("Unable to unlock bad storage directory : " + sd.getRoot().getPath()); - } - removedStorageDirs.add(sd1); - it.remove(); - break; - } - } - } - } - // if there are some edit log streams to remove - if(propagate && al != null) - editLog.processIOError(al, false); - - //if called from edits log, the it will call increment from there - if(propagate) incrementCheckpointTime(); - } - public FSEditLog getEditLog() { return editLog; } - public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException { - File oldImageDir = new File(sd.getRoot(), "image"); - if (!oldImageDir.exists()) { - return false; - } - // check the layout version inside the image file - File oldF = new File(oldImageDir, "fsimage"); - RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws"); - try { - oldFile.seek(0); - int odlVersion = oldFile.readInt(); - if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) - return false; - } finally { - oldFile.close(); - } - return true; - } - // // Atomic move sequence, to recover from interrupted checkpoint // @@ -876,15 +482,15 @@ public class FSImage extends Storage { StorageDirectory editsSD) throws IOException { boolean needToSave = false; - File curFile = getImageFile(nameSD, NameNodeFile.IMAGE); - File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW); + File curFile = NNStorage.getStorageFile(nameSD, NameNodeFile.IMAGE); + File ckptFile = NNStorage.getStorageFile(nameSD, NameNodeFile.IMAGE_NEW); // // If we were in the midst of a checkpoint // if (ckptFile.exists()) { needToSave = true; - if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) { + if (NNStorage.getStorageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) { // // checkpointing migth have uploaded a new // merged image, but we discard it here because we are @@ -948,7 +554,7 @@ public class FSImage extends Storage { // Process each of the storage directories to find the pair of // newest image file and edit file - for (Iterator it = dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); // Was the file just formatted? @@ -962,16 +568,16 @@ public class FSImage extends Storage { // Determine if sd is image, edits or both if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { - imageExists = getImageFile(sd, NameNodeFile.IMAGE).exists(); + imageExists = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE).exists(); imageDirs.add(sd.getRoot().getCanonicalPath()); } if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { - editsExists = getImageFile(sd, NameNodeFile.EDITS).exists(); + editsExists = NNStorage.getStorageFile(sd, NameNodeFile.EDITS).exists(); editsDirs.add(sd.getRoot().getCanonicalPath()); } - checkpointTime = readCheckpointTime(sd); + long checkpointTime = storage.readCheckpointTime(sd); checkpointTimes.add(checkpointTime); @@ -1031,7 +637,8 @@ public class FSImage extends Storage { // Load in bits // latestNameSD.read(); - needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE)); + needToSave |= loadFSImage(NNStorage.getStorageFile(latestNameSD, + NameNodeFile.IMAGE)); // Load latest edits if (latestNameCheckpointTime > latestEditsCheckpointTime) @@ -1057,16 +664,16 @@ public class FSImage extends Storage { // Check that the image digest we loaded matches up with what // we expected MD5Hash readImageMd5 = loader.getLoadedImageMd5(); - if (imageDigest == null) { - imageDigest = readImageMd5; // set this fsimage's checksum - } else if (!imageDigest.equals(readImageMd5)) { + if (storage.getImageDigest() == null) { + storage.setImageDigest(readImageMd5); // set this fsimage's checksum + } else if (!storage.getImageDigest().equals(readImageMd5)) { throw new IOException("Image file " + curFile + " is corrupt with MD5 checksum of " + readImageMd5 + - " but expecting " + imageDigest); + " but expecting " + storage.getImageDigest()); } - this.namespaceID = loader.getLoadedNamespaceID(); - this.layoutVersion = loader.getLoadedImageVersion(); + storage.namespaceID = loader.getLoadedNamespaceID(); + storage.layoutVersion = loader.getLoadedImageVersion(); boolean needToSave = loader.getLoadedImageVersion() != FSConstants.LAYOUT_VERSION; @@ -1084,12 +691,13 @@ public class FSImage extends Storage { FSEditLogLoader loader = new FSEditLogLoader(namesystem); int numEdits = 0; - EditLogFileInputStream edits = - new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS)); + EditLogFileInputStream edits = + new EditLogFileInputStream(NNStorage.getStorageFile(sd, + NameNodeFile.EDITS)); numEdits = loader.loadFSEdits(edits); edits.close(); - File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW); + File editsNew = NNStorage.getStorageFile(sd, NameNodeFile.EDITS_NEW); if (editsNew.exists() && editsNew.length() > 0) { edits = new EditLogFileInputStream(editsNew); @@ -1110,12 +718,9 @@ public class FSImage extends Storage { FSImageFormat.Saver saver = new FSImageFormat.Saver(); FSImageCompression compression = FSImageCompression.createCompression(conf); saver.save(newFile, getFSNamesystem(), compression); - setImageDigest(saver.getSavedDigest()); + storage.setImageDigest(saver.getSavedDigest()); } - public void setImageDigest(MD5Hash digest) { - this.imageDigest = digest; - } /** * FSImageSaver is being run in a separate thread when saving * FSImage. There is one thread per each copy of the image. @@ -1182,19 +787,19 @@ public class FSImage extends Storage { // try to restore all failed edit logs here assert editLog != null : "editLog must be initialized"; - attemptRestoreRemovedStorage(true); + storage.attemptRestoreRemovedStorage(true); editLog.close(); if(renewCheckpointTime) - this.checkpointTime = now(); + storage.setCheckpointTime(now()); List errorSDs = Collections.synchronizedList(new ArrayList()); // mv current -> lastcheckpoint.tmp - for (Iterator it = dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); try { - moveCurrent(sd); + storage.moveCurrent(sd); } catch(IOException ie) { LOG.error("Unable to move current for " + sd.getRoot(), ie); errorSDs.add(sd); @@ -1203,8 +808,8 @@ public class FSImage extends Storage { List saveThreads = new ArrayList(); // save images into current - for (Iterator it = dirIterator(NameNodeDirType.IMAGE); - it.hasNext();) { + for (Iterator it + = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { StorageDirectory sd = it.next(); FSImageSaver saver = new FSImageSaver(sd, errorSDs); Thread saveThread = new Thread(saver, saver.toString()); @@ -1215,7 +820,7 @@ public class FSImage extends Storage { saveThreads.clear(); // -NOTE- - // If NN has image-only and edits-only storage directories and fails here + // If NN has image-only and edits-only storage directories and fails here // the image will have the latest namespace state. // During startup the image-only directories will recover by discarding // lastcheckpoint.tmp, while @@ -1224,14 +829,15 @@ public class FSImage extends Storage { // The edits directories should be discarded during startup because their // checkpointTime is older than that of image directories. // recreate edits in current - for (Iterator it = dirIterator(NameNodeDirType.EDITS); - it.hasNext();) { - final StorageDirectory sd = it.next(); + for (Iterator it + = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { + StorageDirectory sd = it.next(); // if this directory already stores the image and edits, then it was // already processed in the earlier loop. if (sd.getStorageDirType() == NameNodeDirType.IMAGE_AND_EDITS) { continue; } + FSImageSaver saver = new FSImageSaver(sd, errorSDs); Thread saveThread = new Thread(saver, saver.toString()); saveThreads.add(saveThread); @@ -1240,16 +846,16 @@ public class FSImage extends Storage { waitForThreads(saveThreads); // mv lastcheckpoint.tmp -> previous.checkpoint - for (Iterator it = dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); try { - moveLastCheckpoint(sd); + storage.moveLastCheckpoint(sd); } catch(IOException ie) { LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie); errorSDs.add(sd); } } - processIOError(errorSDs, false); + storage.reportErrorsOnDirectories(errorSDs); if(!editLog.isOpen()) editLog.open(); ckptState = CheckpointStates.UPLOAD_DONE; } @@ -1264,103 +870,14 @@ public class FSImage extends Storage { if (!curDir.exists() && !curDir.mkdir()) throw new IOException("Cannot create directory " + curDir); if (dirType.isOfType(NameNodeDirType.IMAGE)) - saveFSImage(getImageFile(sd, NameNodeFile.IMAGE)); + saveFSImage(NNStorage.getStorageFile(sd, NameNodeFile.IMAGE)); if (dirType.isOfType(NameNodeDirType.EDITS)) - editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS)); + editLog.createEditLogFile(NNStorage.getStorageFile(sd, + NameNodeFile.EDITS)); // write version and time files sd.write(); } - /** - * Move {@code current} to {@code lastcheckpoint.tmp} and - * recreate empty {@code current}. - * {@code current} is moved only if it is well formatted, - * that is contains VERSION file. - * - * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp() - * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint() - */ - protected void moveCurrent(StorageDirectory sd) - throws IOException { - File curDir = sd.getCurrentDir(); - File tmpCkptDir = sd.getLastCheckpointTmp(); - // mv current -> lastcheckpoint.tmp - // only if current is formatted - has VERSION file - if(sd.getVersionFile().exists()) { - assert curDir.exists() : curDir + " directory must exist."; - assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist."; - rename(curDir, tmpCkptDir); - } - // recreate current - if(!curDir.exists() && !curDir.mkdir()) - throw new IOException("Cannot create directory " + curDir); - } - - /** - * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint} - * - * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint() - * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp() - */ - protected void moveLastCheckpoint(StorageDirectory sd) - throws IOException { - File tmpCkptDir = sd.getLastCheckpointTmp(); - File prevCkptDir = sd.getPreviousCheckpoint(); - // remove previous.checkpoint - if (prevCkptDir.exists()) - deleteDir(prevCkptDir); - // mv lastcheckpoint.tmp -> previous.checkpoint - if(tmpCkptDir.exists()) - rename(tmpCkptDir, prevCkptDir); - } - - /** - * Generate new namespaceID. - * - * namespaceID is a persistent attribute of the namespace. - * It is generated when the namenode is formatted and remains the same - * during the life cycle of the namenode. - * When a datanodes register they receive it as the registrationID, - * which is checked every time the datanode is communicating with the - * namenode. Datanodes that do not 'know' the namespaceID are rejected. - * - * @return new namespaceID - */ - private int newNamespaceID() { - Random r = new Random(); - r.setSeed(now()); - int newID = 0; - while(newID == 0) - newID = r.nextInt(0x7FFFFFFF); // use 31 bits only - return newID; - } - - /** Create new dfs name directory. Caution: this destroys all files - * in this filesystem. */ - void format(StorageDirectory sd) throws IOException { - sd.clearDirectory(); // create currrent dir - sd.lock(); - try { - saveCurrent(sd); - } finally { - sd.unlock(); - } - LOG.info("Storage directory " + sd.getRoot() - + " has been successfully formatted."); - } - - public void format() throws IOException { - this.layoutVersion = FSConstants.LAYOUT_VERSION; - this.namespaceID = newNamespaceID(); - this.cTime = 0L; - this.checkpointTime = now(); - for (Iterator it = - dirIterator(); it.hasNext();) { - StorageDirectory sd = it.next(); - format(sd); - } - } - /** * Moves fsimage.ckpt to fsImage and edits.new to edits @@ -1376,14 +893,14 @@ public class FSImage extends Storage { throws IOException { if (ckptState != CheckpointStates.UPLOAD_DONE && !(ckptState == CheckpointStates.ROLLED_EDITS - && getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) { + && storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0)) { throw new IOException("Cannot roll fsImage before rolling edits log."); } - for (Iterator it = - dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { + for (Iterator it + = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { StorageDirectory sd = it.next(); - File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW); + File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW); if (!ckpt.exists()) { throw new IOException("Checkpoint file " + ckpt + " does not exist"); @@ -1391,7 +908,8 @@ public class FSImage extends Storage { } editLog.purgeEditLog(); // renamed edits.new to edits if(LOG.isDebugEnabled()) { - LOG.debug("rollFSImage after purgeEditLog: storageList=" + listStorageDirectories()); + LOG.debug("rollFSImage after purgeEditLog: storageList=" + + storage.listStorageDirectories()); } // // Renames new image @@ -1403,17 +921,18 @@ public class FSImage extends Storage { /** * Renames new image */ - void renameCheckpoint() { + void renameCheckpoint() throws IOException { ArrayList al = null; - for (Iterator it = - dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { + for (Iterator it + = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { StorageDirectory sd = it.next(); - File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW); - File curFile = getImageFile(sd, NameNodeFile.IMAGE); + File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW); + File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE); // renameTo fails on Windows if the destination file // already exists. if(LOG.isDebugEnabled()) { - LOG.debug("renaming " + ckpt.getAbsolutePath() + " to " + curFile.getAbsolutePath()); + LOG.debug("renaming " + ckpt.getAbsolutePath() + + " to " + curFile.getAbsolutePath()); } if (!ckpt.renameTo(curFile)) { if (!curFile.delete() || !ckpt.renameTo(curFile)) { @@ -1425,32 +944,32 @@ public class FSImage extends Storage { } } } - if(al != null) processIOError(al, true); + if(al != null) storage.reportErrorsOnDirectories(al); } /** * Updates version and fstime files in all directories (fsimage and edits). */ - void resetVersion(boolean renewCheckpointTime, MD5Hash newImageDigest) throws IOException { - this.layoutVersion = FSConstants.LAYOUT_VERSION; + void resetVersion(boolean renewCheckpointTime, MD5Hash newImageDigest) + throws IOException { + storage.layoutVersion = FSConstants.LAYOUT_VERSION; if(renewCheckpointTime) - this.checkpointTime = now(); - this.imageDigest = newImageDigest; + storage.setCheckpointTime(now()); + storage.setImageDigest(newImageDigest); ArrayList al = null; - for (Iterator it = - dirIterator(); it.hasNext();) { + for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); // delete old edits if sd is the image only the directory if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { - File editsFile = getImageFile(sd, NameNodeFile.EDITS); + File editsFile = NNStorage.getStorageFile(sd, NameNodeFile.EDITS); if(editsFile.exists() && !editsFile.delete()) throw new IOException("Cannot delete edits file " + editsFile.getCanonicalPath()); } // delete old fsimage if sd is the edits only the directory if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { - File imageFile = getImageFile(sd, NameNodeFile.IMAGE); + File imageFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE); if(imageFile.exists() && !imageFile.delete()) throw new IOException("Cannot delete image file " + imageFile.getCanonicalPath()); @@ -1464,7 +983,7 @@ public class FSImage extends Storage { al.add(sd); } } - if(al != null) processIOError(al, true); + if(al != null) storage.reportErrorsOnDirectories(al); ckptState = FSImage.CheckpointStates.START; } @@ -1472,7 +991,7 @@ public class FSImage extends Storage { getEditLog().rollEditLog(); ckptState = CheckpointStates.ROLLED_EDITS; // If checkpoint fails this should be the most recent image, therefore - incrementCheckpointTime(); + storage.incrementCheckpointTime(); return new CheckpointSignature(this); } @@ -1520,38 +1039,38 @@ public class FSImage extends Storage { throws IOException { String msg = null; // Verify that checkpoint is allowed - if(bnReg.getNamespaceID() != this.getNamespaceID()) + if(bnReg.getNamespaceID() != storage.getNamespaceID()) msg = "Name node " + bnReg.getAddress() + " has incompatible namespace id: " + bnReg.getNamespaceID() - + " expected: " + getNamespaceID(); + + " expected: " + storage.getNamespaceID(); else if(bnReg.isRole(NamenodeRole.ACTIVE)) msg = "Name node " + bnReg.getAddress() + " role " + bnReg.getRole() + ": checkpoint is not allowed."; - else if(bnReg.getLayoutVersion() < this.getLayoutVersion() - || (bnReg.getLayoutVersion() == this.getLayoutVersion() - && bnReg.getCTime() > this.getCTime()) - || (bnReg.getLayoutVersion() == this.getLayoutVersion() - && bnReg.getCTime() == this.getCTime() - && bnReg.getCheckpointTime() > this.checkpointTime)) + else if(bnReg.getLayoutVersion() < storage.getLayoutVersion() + || (bnReg.getLayoutVersion() == storage.getLayoutVersion() + && bnReg.getCTime() > storage.getCTime()) + || (bnReg.getLayoutVersion() == storage.getLayoutVersion() + && bnReg.getCTime() == storage.getCTime() + && bnReg.getCheckpointTime() > storage.getCheckpointTime())) // remote node has newer image age msg = "Name node " + bnReg.getAddress() + " has newer image layout version: LV = " +bnReg.getLayoutVersion() + " cTime = " + bnReg.getCTime() + " checkpointTime = " + bnReg.getCheckpointTime() - + ". Current version: LV = " + getLayoutVersion() - + " cTime = " + getCTime() - + " checkpointTime = " + checkpointTime; + + ". Current version: LV = " + storage.getLayoutVersion() + + " cTime = " + storage.getCTime() + + " checkpointTime = " + storage.getCheckpointTime(); if(msg != null) { LOG.error(msg); return new NamenodeCommand(NamenodeProtocol.ACT_SHUTDOWN); } boolean isImgObsolete = true; - if(bnReg.getLayoutVersion() == this.getLayoutVersion() - && bnReg.getCTime() == this.getCTime() - && bnReg.getCheckpointTime() == this.checkpointTime) + if(bnReg.getLayoutVersion() == storage.getLayoutVersion() + && bnReg.getCTime() == storage.getCTime() + && bnReg.getCheckpointTime() == storage.getCheckpointTime()) isImgObsolete = false; boolean needToReturnImg = true; - if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0) + if(storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) // do not return image if there are no image directories needToReturnImg = false; CheckpointSignature sig = rollEditLog(); @@ -1599,140 +1118,14 @@ public class FSImage extends Storage { ckptState = CheckpointStates.UPLOAD_DONE; } - synchronized void close() throws IOException { + synchronized public void close() throws IOException { getEditLog().close(); - unlockAll(); - } - - /** - * Return the name of the image file. - */ - File getFsImageName() { - StorageDirectory sd = null; - for (Iterator it = - dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { - sd = it.next(); - if(sd.getRoot().canRead()) - return getImageFile(sd, NameNodeFile.IMAGE); - } - return null; - } - - /** - * See if any of removed storages is "writable" again, and can be returned - * into service. If saveNamespace is set, then this methdod is being - * called form saveNamespace. - */ - synchronized void attemptRestoreRemovedStorage(boolean saveNamespace) { - // if directory is "alive" - copy the images there... - if(!restoreFailedStorage || removedStorageDirs.size() == 0) - return; //nothing to restore - - LOG.info("FSImage.attemptRestoreRemovedStorage: check removed(failed) " + - "storarge. removedStorages size = " + removedStorageDirs.size()); - for(Iterator it = this.removedStorageDirs.iterator(); it.hasNext();) { - StorageDirectory sd = it.next(); - File root = sd.getRoot(); - LOG.info("currently disabled dir " + root.getAbsolutePath() + - "; type="+sd.getStorageDirType() + ";canwrite="+root.canWrite()); - try { - - if(root.exists() && root.canWrite()) { - /** If this call is being made from savenamespace command, then no - * need to format, the savenamespace command will format and write - * the new image to this directory anyways. - */ - if (saveNamespace) { - sd.clearDirectory(); - } else { - format(sd); - } - LOG.info("restoring dir " + sd.getRoot().getAbsolutePath()); - if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { - File eFile = getEditFile(sd); - editLog.addNewEditLogStream(eFile); - } - this.addStorageDir(sd); // restore - it.remove(); - } - } catch(IOException e) { - LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e); - } - } - } - - public File getFsEditName() throws IOException { - return getEditLog().getFsEditName(); - } - - File getFsTimeName() { - StorageDirectory sd = null; - // NameNodeFile.TIME shoul be same on all directories - for (Iterator it = - dirIterator(); it.hasNext();) - sd = it.next(); - return getImageFile(sd, NameNodeFile.TIME); - } - - /** - * Return the name of the image file that is uploaded by periodic - * checkpointing. - */ - File[] getFsImageNameCheckpoint() { - ArrayList list = new ArrayList(); - for (Iterator it = - dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { - list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW)); - } - return list.toArray(new File[list.size()]); - } - - private boolean getDistributedUpgradeState() { - FSNamesystem ns = getFSNamesystem(); - return ns == null ? false : ns.getDistributedUpgradeState(); - } - - private int getDistributedUpgradeVersion() { - FSNamesystem ns = getFSNamesystem(); - return ns == null ? 0 : ns.getDistributedUpgradeVersion(); - } - - private void setDistributedUpgradeState(boolean uState, int uVersion) { - getFSNamesystem().upgradeManager.setUpgradeState(uState, uVersion); - } - - private void verifyDistributedUpgradeProgress(StartupOption startOpt - ) throws IOException { - if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT) - return; - UpgradeManager um = getFSNamesystem().upgradeManager; - assert um != null : "FSNameSystem.upgradeManager is null."; - if(startOpt != StartupOption.UPGRADE) { - if(um.getUpgradeState()) - throw new IOException( - "\n Previous distributed upgrade was not completed. " - + "\n Please restart NameNode with -upgrade option."); - if(um.getDistributedUpgrades() != null) - throw new IOException("\n Distributed upgrade for NameNode version " - + um.getUpgradeVersion() + " to current LV " + FSConstants.LAYOUT_VERSION - + " is required.\n Please restart NameNode with -upgrade option."); - } - } - - private void initializeDistributedUpgrade() throws IOException { - UpgradeManagerNamenode um = getFSNamesystem().upgradeManager; - if(! um.initializeUpgrade()) - return; - // write new upgrade state into disk - writeAll(); - NameNode.LOG.info("\n Distributed upgrade for NameNode version " - + um.getUpgradeVersion() + " to current LV " - + FSConstants.LAYOUT_VERSION + " is initialized."); + storage.close(); } /** * Retrieve checkpoint dirs from configuration. - * + * * @param conf the Configuration * @param defaultValue a default value for the attribute, if null * @return a Collection of URIs representing the values in @@ -1755,5 +1148,33 @@ public class FSImage extends Storage { dirNames.add(defaultName); } return Util.stringCollectionAsURIs(dirNames); - } + } + + public NNStorage getStorage() { + return storage; + } + + @Override // NNStorageListener + public void errorOccurred(StorageDirectory sd) throws IOException { + // do nothing, + } + + @Override // NNStorageListener + public void formatOccurred(StorageDirectory sd) throws IOException { + if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { + sd.lock(); + try { + saveCurrent(sd); + } finally { + sd.unlock(); + } + LOG.info("Storage directory " + sd.getRoot() + + " has been successfully formatted."); + } + }; + + @Override // NNStorageListener + public void directoryAvailable(StorageDirectory sd) throws IOException { + // do nothing + } } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1066305&r1=1066304&r2=1066305&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Feb 2 02:05:18 2011 @@ -472,7 +472,7 @@ class FSImageFormat { DataOutputStream out = new DataOutputStream(fos); try { out.writeInt(FSConstants.LAYOUT_VERSION); - out.writeInt(sourceNamesystem.getFSImage().getNamespaceID()); // TODO bad dependency + out.writeInt(sourceNamesystem.getFSImage().getStorage().getNamespaceID()); // TODO bad dependency out.writeLong(fsDir.rootDir.numItemsInTree()); out.writeLong(sourceNamesystem.getGenerationStamp()); Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1066305&r1=1066304&r2=1066305&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 2 02:05:18 2011 @@ -439,11 +439,11 @@ public class FSNamesystem implements FSC * Should do everything that would be done for the NameNode, * except for loading the image. * - * @param bnImage {@link BackupStorage} + * @param bnImage {@link BackupImage} * @param conf configuration * @throws IOException */ - FSNamesystem(Configuration conf, BackupStorage bnImage) throws IOException { + FSNamesystem(Configuration conf, BackupImage bnImage) throws IOException { try { initialize(conf, bnImage); } catch(IOException e) { @@ -523,8 +523,8 @@ public class FSNamesystem implements FSC } NamespaceInfo getNamespaceInfo() { - return new NamespaceInfo(dir.fsImage.getNamespaceID(), - dir.fsImage.getCTime(), + return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(), + dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion()); } @@ -2655,7 +2655,7 @@ public class FSNamesystem implements FSC * @return registration ID */ public String getRegistrationID() { - return Storage.getRegistrationID(dir.fsImage); + return Storage.getRegistrationID(dir.fsImage.getStorage()); } /** @@ -3422,10 +3422,10 @@ public class FSNamesystem implements FSC // if it is disabled - enable it and vice versa. if(arg.equals("check")) - return getFSImage().getRestoreFailedStorage(); + return getFSImage().getStorage().getRestoreFailedStorage(); boolean val = arg.equals("true"); // false if not - getFSImage().setRestoreFailedStorage(val); + getFSImage().getStorage().setRestoreFailedStorage(val); return val; } finally { @@ -4803,18 +4803,20 @@ public class FSNamesystem implements FSC * @throws IOException */ void registerBackupNode(NamenodeRegistration registration) - throws IOException { + throws IOException { writeLock(); try { - if(getFSImage().getNamespaceID() != registration.getNamespaceID()) - throw new IOException("Incompatible namespaceIDs: " - + " Namenode namespaceID = " + getFSImage().getNamespaceID() - + "; " + registration.getRole() + - " node namespaceID = " + registration.getNamespaceID()); - boolean regAllowed = getEditLog().checkBackupRegistration(registration); - if(!regAllowed) - throw new IOException("Registration is not allowed. " + - "Another node is registered as a backup."); + if(getFSImage().getStorage().getNamespaceID() + != registration.getNamespaceID()) + throw new IOException("Incompatible namespaceIDs: " + + " Namenode namespaceID = " + + getFSImage().getStorage().getNamespaceID() + "; " + + registration.getRole() + + " node namespaceID = " + registration.getNamespaceID()); + boolean regAllowed = getEditLog().checkBackupRegistration(registration); + if(!regAllowed) + throw new IOException("Registration is not allowed. " + + "Another node is registered as a backup."); } finally { writeUnlock(); } @@ -4828,15 +4830,17 @@ public class FSNamesystem implements FSC * @throws IOException */ void releaseBackupNode(NamenodeRegistration registration) - throws IOException { + throws IOException { writeLock(); try { - if(getFSImage().getNamespaceID() != registration.getNamespaceID()) - throw new IOException("Incompatible namespaceIDs: " - + " Namenode namespaceID = " + getFSImage().getNamespaceID() - + "; " + registration.getRole() + - " node namespaceID = " + registration.getNamespaceID()); - getEditLog().releaseBackupStream(registration); + if(getFSImage().getStorage().getNamespaceID() + != registration.getNamespaceID()) + throw new IOException("Incompatible namespaceIDs: " + + " Namenode namespaceID = " + + getFSImage().getStorage().getNamespaceID() + "; " + + registration.getRole() + + " node namespaceID = " + registration.getNamespaceID()); + getEditLog().releaseBackupStream(registration); } finally { writeUnlock(); } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1066305&r1=1066304&r2=1066305&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Wed Feb 2 02:05:18 2011 @@ -76,17 +76,19 @@ public class GetImageServlet extends Htt public Void run() throws Exception { if (ff.getImage()) { response.setHeader(TransferFsImage.CONTENT_LENGTH, - String.valueOf(nnImage.getFsImageName().length())); + String.valueOf(nnImage.getStorage() + .getFsImageName().length())); // send fsImage TransferFsImage.getFileServer(response.getOutputStream(), - nnImage.getFsImageName(), + nnImage.getStorage().getFsImageName(), getThrottler(conf)); } else if (ff.getEdit()) { response.setHeader(TransferFsImage.CONTENT_LENGTH, - String.valueOf(nnImage.getFsEditName().length())); + String.valueOf(nnImage.getStorage() + .getFsEditName().length())); // send edits TransferFsImage.getFileServer(response.getOutputStream(), - nnImage.getFsEditName(), + nnImage.getStorage().getFsEditName(), getThrottler(conf)); } else if (ff.putImage()) { // issue a HTTP get request to download the new fsimage @@ -98,7 +100,7 @@ public class GetImageServlet extends Htt public MD5Hash run() throws Exception { return TransferFsImage.getFileClient( ff.getInfoServer(), "getimage=1", - nnImage.getFsImageNameCheckpoint(), true); + nnImage.getStorage().getFsImageNameCheckpoint(), true); } }); if (!nnImage.newImageDigest.equals(downloadImageDigest)) {