Author: jitendra
Date: Wed Feb 2 02:05:18 2011
New Revision: 1066305
URL: http://svn.apache.org/viewvc?rev=1066305&view=rev
Log:
HDFS-1557. Separate Storage from FSImage. Contributed by Ivan Kelly.
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
Removed:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Feb 2 02:05:18 2011
@@ -49,6 +49,8 @@ Trunk (unreleased changes)
HDFS-1335. HDFS side change of HADDOP-6904: RPC compatibility. (hairong)
+ HDFS-1557. Separate Storage from FSImage. (Ivan Kelly via jitendra)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Feb
2 02:05:18 2011
@@ -65,7 +65,7 @@ public abstract class Storage extends St
// Constants
// last layout version that did not support upgrades
- protected static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
+ public static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
// this corresponds to Hadoop-0.14.
public static final int LAST_UPGRADABLE_LAYOUT_VERSION = -7;
@@ -710,7 +710,7 @@ public abstract class Storage extends St
*
* @param oldVersion
*/
- protected static void checkVersionUpgradable(int oldVersion)
+ public static void checkVersionUpgradable(int oldVersion)
throws IOException {
if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) {
String msg = "*********** Upgrade is not supported from this older" +
@@ -785,7 +785,13 @@ public abstract class Storage extends St
+ from.getCanonicalPath() + " to " + to.getCanonicalPath());
}
- protected static void deleteDir(File dir) throws IOException {
+ /**
+ * Recursively delete all the content of the directory first and then
+ * the directory itself from the local filesystem.
+ * @param dir The directory to delete
+ * @throws IOException
+ */
+ public static void deleteDir(File dir) throws IOException {
if (!FileUtil.fullyDelete(dir))
throw new IOException("Failed to delete " + dir.getCanonicalPath());
}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1066305&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed
Feb 2 02:05:18 2011
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Extension of FSImage for the backup node.
+ * This class handles the setup of the journaling
+ * spool on the backup namenode.
+ */
+@InterfaceAudience.Private
+public class BackupImage extends FSImage {
+ // Names of the journal spool directory and the spool file
+ private static final String STORAGE_JSPOOL_DIR = "jspool";
+ private static final String STORAGE_JSPOOL_FILE =
+ NNStorage.NameNodeFile.EDITS_NEW.getName();
+
+ /** Backup input stream for loading edits into memory */
+ private EditLogBackupInputStream backupInputStream;
+
+ /** Is journal spooling in progress */
+ volatile JSpoolState jsState;
+
+ static enum JSpoolState {
+ OFF,
+ INPROGRESS,
+ WAIT;
+ }
+
+ /**
+ */
+ BackupImage() {
+ super();
+ storage.setDisablePreUpgradableLayoutCheck(true);
+ jsState = JSpoolState.OFF;
+ }
+
+ /**
+ * Analyze backup storage directories for consistency.<br>
+ * Recover from incomplete checkpoints if required.<br>
+ * Read VERSION and fstime files if exist.<br>
+ * Do not load image or edits.
+ *
+ * @param imageDirs list of image directories as URI.
+ * @param editsDirs list of edits directories URI.
+ * @throws IOException if the node should shutdown.
+ */
+ void recoverCreateRead(Collection<URI> imageDirs,
+ Collection<URI> editsDirs) throws IOException {
+ storage.setStorageDirectories(imageDirs, editsDirs);
+ storage.setCheckpointTime(0L);
+ for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ StorageState curState;
+ try {
+ curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
+ // sd is locked but not opened
+ switch(curState) {
+ case NON_EXISTENT:
+ // fail if any of the configured storage dirs are inaccessible
+ throw new InconsistentFSStateException(sd.getRoot(),
+ "checkpoint directory does not exist or is not accessible.");
+ case NOT_FORMATTED:
+ // for backup node all directories may be unformatted initially
+ LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
+ LOG.info("Formatting ...");
+ sd.clearDirectory(); // create empty current
+ break;
+ case NORMAL:
+ break;
+ default: // recovery is possible
+ sd.doRecover(curState);
+ }
+ if(curState != StorageState.NOT_FORMATTED) {
+ sd.read(); // read and verify consistency with other directories
+ }
+ } catch(IOException ioe) {
+ sd.unlock();
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Reset storage directories.
+ * <p>
+ * Unlock the storage.
+ * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
+ * and recreate empty <code>current</code>.
+ * @throws IOException
+ */
+ synchronized void reset() throws IOException {
+ // reset NameSpace tree
+ FSDirectory fsDir = getFSNamesystem().dir;
+ fsDir.reset();
+
+ // unlock, close and rename storage directories
+ storage.unlockAll();
+ // recover from unsuccessful checkpoint if necessary
+ recoverCreateRead(storage.getImageDirectories(),
+ storage.getEditsDirectories());
+ // rename and recreate
+ for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ // rename current to lastcheckpoint.tmp
+ storage.moveCurrent(sd);
+ }
+ }
+
+ /**
+ * Load checkpoint from local files only if the memory state is empty.<br>
+ * Set new checkpoint time received from the name-node.<br>
+ * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+ * @throws IOException
+ */
+ void loadCheckpoint(CheckpointSignature sig) throws IOException {
+ // load current image and journal if it is not in memory already
+ if(!editLog.isOpen())
+ editLog.open();
+
+ FSDirectory fsDir = getFSNamesystem().dir;
+ if(fsDir.isEmpty()) {
+ Iterator<StorageDirectory> itImage
+ = storage.dirIterator(NameNodeDirType.IMAGE);
+ Iterator<StorageDirectory> itEdits
+ = storage.dirIterator(NameNodeDirType.EDITS);
+ if(!itImage.hasNext() || ! itEdits.hasNext())
+ throw new IOException("Could not locate checkpoint directories");
+ StorageDirectory sdName = itImage.next();
+ StorageDirectory sdEdits = itEdits.next();
+ getFSDirectoryRootLock().writeLock();
+ try { // load image under rootDir lock
+ loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
+ } finally {
+ getFSDirectoryRootLock().writeUnlock();
+ }
+ loadFSEdits(sdEdits);
+ }
+
+ // set storage fields
+ storage.setStorageInfo(sig);
+ storage.setImageDigest(sig.imageDigest);
+ storage.setCheckpointTime(sig.checkpointTime);
+ }
+
+ /**
+ * Save meta-data into fsimage files.
+ * and create empty edits.
+ */
+ void saveCheckpoint() throws IOException {
+ saveNamespace(false);
+ }
+
+ private FSDirectory getFSDirectoryRootLock() {
+ return getFSNamesystem().dir;
+ }
+
+ static File getJSpoolDir(StorageDirectory sd) {
+ return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
+ }
+
+ static File getJSpoolFile(StorageDirectory sd) {
+ return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+ }
+
+ /**
+ * Journal writer journals new meta-data state.
+ * <ol>
+ * <li> If Journal Spool state is OFF then journal records (edits)
+ * are applied directly to meta-data state in memory and are written
+ * to the edits file(s).</li>
+ * <li> If Journal Spool state is INPROGRESS then records are only
+ * written to edits.new file, which is called Spooling.</li>
+ * <li> Journal Spool state WAIT blocks journaling until the
+ * Journal Spool reader finalizes merging of the spooled data and
+ * switches to applying journal to memory.</li>
+ * </ol>
+ * @param length length of data.
+ * @param data serialized journal records.
+ * @throws IOException
+ * @see #convergeJournalSpool()
+ */
+ synchronized void journal(int length, byte[] data) throws IOException {
+ assert backupInputStream.length() == 0 : "backup input stream is not empty";
+ try {
+ switch(jsState) {
+ case WAIT:
+ case OFF:
+ // wait until spooling is off
+ waitSpoolEnd();
+ // update NameSpace in memory
+ backupInputStream.setBytes(data);
+ FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ logLoader.loadEditRecords(storage.getLayoutVersion(),
+ backupInputStream.getDataInputStream(), true);
+ getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
+ break;
+ case INPROGRESS:
+ break;
+ }
+ // write to files
+ editLog.logEdit(length, data);
+ editLog.logSync();
+ } finally {
+ backupInputStream.clear();
+ }
+ }
+
+ private synchronized void waitSpoolEnd() {
+ while(jsState == JSpoolState.WAIT) {
+ try {
+ wait();
+ } catch (InterruptedException e) {}
+ }
+ // now spooling should be off, verifying just in case
+ assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+ }
+
+ /**
+ * Start journal spool.
+ * Switch to writing into edits.new instead of edits.
+ *
+ * edits.new for spooling is in separate directory "spool" rather than in
+ * "current" because the two directories should be independent.
+ * While spooling a checkpoint can happen and current will first
+ * move to lastcheckpoint.tmp and then to previous.checkpoint
+ * spool/edits.new will remain in place during that.
+ */
+ synchronized void startJournalSpool(NamenodeRegistration nnReg)
+ throws IOException {
+ switch(jsState) {
+ case OFF:
+ break;
+ case INPROGRESS:
+ return;
+ case WAIT:
+ waitSpoolEnd();
+ }
+
+ // create journal spool directories
+ for (Iterator<StorageDirectory> it
+ = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ File jsDir = getJSpoolDir(sd);
+ if (!jsDir.exists() && !jsDir.mkdirs()) {
+ throw new IOException("Mkdirs failed to create "
+ + jsDir.getCanonicalPath());
+ }
+ // create edit file if missing
+ File eFile = storage.getEditFile(sd);
+ if(!eFile.exists()) {
+ editLog.createEditLogFile(eFile);
+ }
+ }
+
+ if(!editLog.isOpen())
+ editLog.open();
+
+ // create streams pointing to the journal spool files
+ // subsequent journal records will go directly to the spool
+ editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+ setCheckpointState(CheckpointStates.ROLLED_EDITS);
+
+ // set up spooling
+ if(backupInputStream == null)
+ backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
+ jsState = JSpoolState.INPROGRESS;
+ }
+
+ synchronized void setCheckpointTime(int length, byte[] data)
+ throws IOException {
+ assert backupInputStream.length() == 0 : "backup input stream is not empty";
+ try {
+ // unpack new checkpoint time
+ backupInputStream.setBytes(data);
+ DataInputStream in = backupInputStream.getDataInputStream();
+ byte op = in.readByte();
+ assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
+ LongWritable lw = new LongWritable();
+ lw.readFields(in);
+ storage.setCheckpointTimeInStorage(lw.get());
+ } finally {
+ backupInputStream.clear();
+ }
+ }
+
+ /**
+ * Merge Journal Spool to memory.<p>
+ * Journal Spool reader reads journal records from edits.new.
+ * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
+ * This blocks journaling (see {@link #journal(int,byte[])}.
+ * The reader
+ * <ul>
+ * <li> reads remaining journal records if any,</li>
+ * <li> renames edits.new to edits,</li>
+ * <li> sets {@link JSpoolState} to OFF,</li>
+ * <li> and notifies the journaling thread.</li>
+ * </ul>
+ * Journaling resumes with applying new journal records to the memory state,
+ * and writing them into edits file(s).
+ */
+ void convergeJournalSpool() throws IOException {
+ Iterator<StorageDirectory> itEdits
+ = storage.dirIterator(NameNodeDirType.EDITS);
+ if(! itEdits.hasNext())
+ throw new IOException("Could not locate checkpoint directories");
+ StorageDirectory sdEdits = itEdits.next();
+ int numEdits = 0;
+ File jSpoolFile = getJSpoolFile(sdEdits);
+ long startTime = now();
+ if(jSpoolFile.exists()) {
+ // load edits.new
+ EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
+ DataInputStream in = edits.getDataInputStream();
+ FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ numEdits += logLoader.loadFSEdits(in, false);
+
+ // first time reached the end of spool
+ jsState = JSpoolState.WAIT;
+ numEdits += logLoader.loadEditRecords(storage.getLayoutVersion(),
+ in, true);
+ getFSNamesystem().dir.updateCountForINodeWithQuota();
+ edits.close();
+ }
+
+ FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
+ + " of size " + jSpoolFile.length() + " edits # " + numEdits
+ + " loaded in " + (now()-startTime)/1000 + " seconds.");
+
+ // rename spool edits.new to edits making it in sync with the active node
+ // subsequent journal records will go directly to edits
+ editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
+
+ // write version file
+ resetVersion(false, storage.getImageDigest());
+
+ // wake up journal writer
+ synchronized(this) {
+ jsState = JSpoolState.OFF;
+ notifyAll();
+ }
+
+ // Rename lastcheckpoint.tmp to previous.checkpoint
+ for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ storage.moveLastCheckpoint(sd);
+ }
+ }
+}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Wed
Feb 2 02:05:18 2011
@@ -114,7 +114,7 @@ public class BackupNode extends NameNode
@Override // NameNode
protected void loadNamesystem(Configuration conf) throws IOException {
- BackupStorage bnImage = new BackupStorage();
+ BackupImage bnImage = new BackupImage();
this.namesystem = new FSNamesystem(conf, bnImage);
bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
@@ -206,7 +206,7 @@ public class BackupNode extends NameNode
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
+ nnReg.getAddress() + " expecting " + nnRpcAddress);
- BackupStorage bnImage = (BackupStorage)getFSImage();
+ BackupImage bnImage = (BackupImage)getFSImage();
switch(jAction) {
case (int)JA_IS_ALIVE:
return;
@@ -228,8 +228,8 @@ public class BackupNode extends NameNode
boolean shouldCheckpointAtStartup() {
FSImage fsImage = getFSImage();
if(isRole(NamenodeRole.CHECKPOINT)) {
- assert fsImage.getNumStorageDirs() > 0;
- return ! fsImage.getStorageDir(0).getVersionFile().exists();
+ assert fsImage.getStorage().getNumStorageDirs() > 0;
+ return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
}
if(namesystem == null || namesystem.dir == null || getFSImage() == null)
return true;
@@ -291,14 +291,14 @@ public class BackupNode extends NameNode
* @throws IOException
*/
private void registerWith(NamespaceInfo nsInfo) throws IOException {
- BackupStorage bnImage = (BackupStorage)getFSImage();
+ BackupImage bnImage = (BackupImage)getFSImage();
// verify namespaceID
- if(bnImage.getNamespaceID() == 0) // new backup storage
- bnImage.setStorageInfo(nsInfo);
- else if(bnImage.getNamespaceID() != nsInfo.getNamespaceID())
+ if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
+ bnImage.getStorage().setStorageInfo(nsInfo);
+ else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
throw new IOException("Incompatible namespaceIDs"
+ ": active node namespaceID = " + nsInfo.getNamespaceID()
- + "; backup node namespaceID = " + bnImage.getNamespaceID());
+ + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
setRegistration();
NamenodeRegistration nnReg = null;
@@ -333,7 +333,7 @@ public class BackupNode extends NameNode
* @throws IOException
*/
void resetNamespace() throws IOException {
- ((BackupStorage)getFSImage()).reset();
+ ((BackupImage)getFSImage()).reset();
}
/**
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
Wed Feb 2 02:05:18 2011
@@ -41,10 +41,10 @@ public class CheckpointSignature extends
public CheckpointSignature() {}
CheckpointSignature(FSImage fsImage) {
- super(fsImage);
+ super(fsImage.getStorage());
editsTime = fsImage.getEditLog().getFsEditTime();
- checkpointTime = fsImage.getCheckpointTime();
- imageDigest = fsImage.imageDigest;
+ checkpointTime = fsImage.getStorage().getCheckpointTime();
+ imageDigest = fsImage.getStorage().getImageDigest();
}
CheckpointSignature(String str) {
@@ -76,18 +76,21 @@ public class CheckpointSignature extends
}
void validateStorageInfo(FSImage si) throws IOException {
- if(layoutVersion != si.layoutVersion
- || namespaceID != si.namespaceID || cTime != si.cTime
- || checkpointTime != si.checkpointTime ||
- !imageDigest.equals(si.imageDigest)) {
+ if(layoutVersion != si.getStorage().layoutVersion
+ || namespaceID != si.getStorage().namespaceID
+ || cTime != si.getStorage().cTime
+ || checkpointTime != si.getStorage().getCheckpointTime() ||
+ !imageDigest.equals(si.getStorage().getImageDigest())) {
// checkpointTime can change when the image is saved - do not compare
throw new IOException("Inconsistent checkpoint fields.\n"
+ "LV = " + layoutVersion + " namespaceID = " + namespaceID
+ " cTime = " + cTime + "; checkpointTime = " + checkpointTime
+ " ; imageDigest = " + imageDigest
+ ".\nExpecting respectively: "
- + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime
- + "; " + si.checkpointTime + "; " + si.imageDigest);
+ + si.getStorage().layoutVersion + "; "
+ + si.getStorage().namespaceID + "; " + si.getStorage().cTime
+ + "; " + si.getStorage().getCheckpointTime() + "; "
+ + si.getStorage().getImageDigest());
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Wed
Feb 2 02:05:18 2011
@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -63,8 +63,8 @@ class Checkpointer extends Daemon {
private String infoBindAddress;
- private BackupStorage getFSImage() {
- return (BackupStorage)backupNode.getFSImage();
+ private BackupImage getFSImage() {
+ return (BackupImage)backupNode.getFSImage();
}
private NamenodeProtocol getNamenode(){
@@ -178,8 +178,8 @@ class Checkpointer extends Daemon {
private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
// Retrieve image file
String fileid = "getimage=1";
- Collection<File> list = getFSImage().getFiles(NameNodeFile.IMAGE,
- NameNodeDirType.IMAGE);
+ Collection<File> list = getFSImage()
+ .getStorage().getFiles(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
File[] files = list.toArray(new File[list.size()]);
assert files.length > 0 : "No checkpoint targets.";
String nnHttpAddr = backupNode.nnHttpAddress;
@@ -189,7 +189,8 @@ class Checkpointer extends Daemon {
// Retrieve edits file
fileid = "getedit=1";
- list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
+ list = getFSImage()
+ .getStorage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
files = list.toArray(new File[list.size()]);
assert files.length > 0 : "No checkpoint targets.";
TransferFsImage.getFileClient(nnHttpAddr, fileid, files, false);
@@ -207,7 +208,7 @@ class Checkpointer extends Daemon {
String fileid = "putimage=1&port=" + httpPort +
"&machine=" + infoBindAddress +
"&token=" + sig.toString() +
- "&newChecksum=" + getFSImage().imageDigest.toString();
+ "&newChecksum=" + getFSImage().getStorage().getImageDigest().toString();
LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
TransferFsImage.getFileClient(backupNode.nnHttpAddress,
fileid, (File[])null, false);
@@ -246,7 +247,7 @@ class Checkpointer extends Daemon {
downloadCheckpoint(sig);
}
- BackupStorage bnImage = getFSImage();
+ BackupImage bnImage = getFSImage();
bnImage.loadCheckpoint(sig);
sig.validateStorageInfo(bnImage);
bnImage.saveCheckpoint();
@@ -262,6 +263,6 @@ class Checkpointer extends Daemon {
getFSImage().getEditLog().close();
LOG.info("Checkpoint completed in "
+ (now() - startTime)/1000 + " seconds."
- + " New Image Size: " + bnImage.getFsImageName().length());
+ + " New Image Size: " + bnImage.getStorage().getFsImageName().length());
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed
Feb 2 02:05:18 2011
@@ -141,8 +141,8 @@ class FSDirectory implements Closeable {
throws IOException {
// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
- fsImage.setStorageDirectories(dataDirs, editsDirs);
- fsImage.format();
+ fsImage.getStorage().setStorageDirectories(dataDirs, editsDirs);
+ fsImage.getStorage().format();
startOpt = StartupOption.REGULAR;
}
try {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1066305&r1=1066304&r2=1066305&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Feb
2 02:05:18 2011
@@ -36,11 +36,11 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
@@ -56,7 +56,7 @@ import static org.apache.hadoop.hdfs.ser
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class FSEditLog {
+public class FSEditLog implements NNStorageListener {
static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
" File system changes are not persistent. No journal streams.";
@@ -66,7 +66,6 @@ public class FSEditLog {
private volatile int sizeOutputFlushBuffer = 512*1024;
private ArrayList<EditLogOutputStream> editStreams = null;
- private FSImage fsimage = null;
// a monotonically increasing counter that represents transactionIds.
private long txid = 0;
@@ -89,6 +88,8 @@ public class FSEditLog {
private long totalTimeTransactions; // total time for all transactions
private NameNodeMetrics metrics;
+ private NNStorage storage;
+
private static class TransactionId {
public long txid;
@@ -104,23 +105,24 @@ public class FSEditLog {
}
};
- FSEditLog(FSImage image) {
- fsimage = image;
+ FSEditLog(NNStorage storage) {
isSyncRunning = false;
+ this.storage = storage;
+ this.storage.registerListener(this);
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = now();
}
private File getEditFile(StorageDirectory sd) {
- return fsimage.getEditFile(sd);
+ return storage.getEditFile(sd);
}
private File getEditNewFile(StorageDirectory sd) {
- return fsimage.getEditNewFile(sd);
+ return storage.getEditNewFile(sd);
}
private int getNumEditsDirs() {
- return fsimage.getNumStorageDirs(NameNodeDirType.EDITS);
+ return storage.getNumStorageDirs(NameNodeDirType.EDITS);
}
synchronized int getNumEditStreams() {
@@ -151,8 +153,8 @@ public class FSEditLog {
editStreams = new ArrayList<EditLogOutputStream>();
ArrayList<StorageDirectory> al = null;
- for (Iterator<StorageDirectory> it =
- fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ for (Iterator<StorageDirectory> it
+ = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
StorageDirectory sd = it.next();
File eFile = getEditFile(sd);
try {
@@ -166,7 +168,7 @@ public class FSEditLog {
}
}
- if(al != null) fsimage.processIOError(al, false);
+ if(al != null) storage.reportErrorsOnDirectories(al);
}
@@ -210,7 +212,7 @@ public class FSEditLog {
errorStreams.add(eStream);
}
}
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
editStreams.clear();
}
@@ -228,64 +230,26 @@ public class FSEditLog {
/**
* The specified streams have IO errors. Close and remove them.
- * If propagate is true - close related StorageDirectories.
- * (is called with propagate value true from everywhere
- * except fsimage.processIOError)
- */
- synchronized void processIOError(
- List<EditLogOutputStream> errorStreams,
- boolean propagate) {
-
+ */
+ synchronized
+ void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
if (errorStreams == null || errorStreams.size() == 0) {
return; // nothing to do
}
-
- String lsd = fsimage.listStorageDirectories();
- LOG.info("current list of storage dirs:" + lsd);
-
- ArrayList<StorageDirectory> al = null;
- for (EditLogOutputStream eStream : errorStreams) {
- LOG.error("Unable to log edits to " + eStream.getName()
- + "; removing it");
-
- StorageDirectory storageDir;
- if(propagate && eStream.getType() == JournalType.FILE && //find SD
- (storageDir = getStorage(eStream)) != null) {
- LOG.info("about to remove corresponding storage:"
- + storageDir.getRoot().getAbsolutePath());
- // remove corresponding storage dir
- if(al == null) al = new ArrayList<StorageDirectory>(1);
- al.add(storageDir);
+ ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
+ for (EditLogOutputStream e : errorStreams) {
+ if (e.getType() == JournalType.FILE) {
+ errorDirs.add(getStorageDirectoryForStream(e));
+ } else {
+ disableStream(e);
}
- Iterator<EditLogOutputStream> ies = editStreams.iterator();
- while (ies.hasNext()) {
- EditLogOutputStream es = ies.next();
- if (es == eStream) {
- try { eStream.close(); } catch (IOException e) {
- // nothing to do.
- LOG.warn("Failed to close eStream " + eStream.getName()
- + " before removing it (might be ok)");
- }
- ies.remove();
- break;
- }
- }
- }
-
- if (editStreams == null || editStreams.size() <= 0) {
- String msg = "Fatal Error: All storage directories are inaccessible.";
- LOG.fatal(msg, new IOException(msg));
- Runtime.getRuntime().exit(-1);
}
- // removed failed SDs
- if(propagate && al != null) fsimage.processIOError(al, false);
-
- //for the rest of the streams
- if(propagate) incrementCheckpointTime();
-
- lsd = fsimage.listStorageDirectories();
- LOG.info("at the end current list of storage dirs:" + lsd);
+ try {
+ storage.reportErrorsOnDirectories(errorDirs);
+ } catch (IOException ioe) {
+ LOG.error("Problem erroring streams " + ioe);
+ }
}
@@ -298,7 +262,7 @@ public class FSEditLog {
String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
.getParentFile().getParentFile().getAbsolutePath();
- Iterator<StorageDirectory> it = fsimage.dirIterator();
+ Iterator<StorageDirectory> it = storage.dirIterator();
while (it.hasNext()) {
StorageDirectory sd = it.next();
LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());
@@ -355,7 +319,7 @@ public class FSEditLog {
errorStreams.add(eStream);
}
}
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
recordTransaction(start);
// check if it is time to schedule an automatic sync
@@ -547,7 +511,7 @@ public class FSEditLog {
}
}
long elapsed = now() - start;
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
if (metrics != null) // Metrics non-null only when used inside name node
metrics.syncs.inc(elapsed);
@@ -814,7 +778,7 @@ public class FSEditLog {
al.add(es);
}
}
- if(al!=null) processIOError(al, true);
+ if(al!=null) disableAndReportErrorOnStreams(al);
return size;
}
@@ -823,7 +787,7 @@ public class FSEditLog {
*/
synchronized void rollEditLog() throws IOException {
waitForSyncToFinish();
- Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+ Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
if(!it.hasNext())
return;
//
@@ -841,7 +805,7 @@ public class FSEditLog {
return; // nothing to do, edits.new exists!
// check if any of failed storage is now available and put it back
- fsimage.attemptRestoreRemovedStorage(false);
+ storage.attemptRestoreRemovedStorage(false);
divertFileStreams(
Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
@@ -863,7 +827,7 @@ public class FSEditLog {
EditStreamIterator itE =
(EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
Iterator<StorageDirectory> itD =
- fsimage.dirIterator(NameNodeDirType.EDITS);
+ storage.dirIterator(NameNodeDirType.EDITS);
while(itE.hasNext() && itD.hasNext()) {
EditLogOutputStream eStream = itE.next();
StorageDirectory sd = itD.next();
@@ -885,7 +849,7 @@ public class FSEditLog {
errorStreams.add(eStream);
}
}
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
}
/**
@@ -929,7 +893,7 @@ public class FSEditLog {
EditStreamIterator itE =
(EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
Iterator<StorageDirectory> itD =
- fsimage.dirIterator(NameNodeDirType.EDITS);
+ storage.dirIterator(NameNodeDirType.EDITS);
while(itE.hasNext() && itD.hasNext()) {
EditLogOutputStream eStream = itE.next();
StorageDirectory sd = itD.next();
@@ -964,7 +928,7 @@ public class FSEditLog {
errorStreams.add(eStream);
}
}
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
}
/**
@@ -973,7 +937,7 @@ public class FSEditLog {
synchronized File getFsEditName() {
StorageDirectory sd = null;
for (Iterator<StorageDirectory> it =
- fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
sd = it.next();
if(sd.getRoot().canRead())
return getEditFile(sd);
@@ -985,7 +949,7 @@ public class FSEditLog {
* Returns the timestamp of the edit log
*/
synchronized long getFsEditTime() {
- Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+ Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
if(it.hasNext())
return getEditFile(it.next()).lastModified();
return 0;
@@ -1052,7 +1016,7 @@ public class FSEditLog {
errorStreams.add(eStream);
}
}
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
recordTransaction(start);
}
@@ -1126,8 +1090,8 @@ public class FSEditLog {
}
void incrementCheckpointTime() {
- fsimage.incrementCheckpointTime();
- Writable[] args = {new LongWritable(fsimage.getCheckpointTime())};
+ storage.incrementCheckpointTime();
+ Writable[] args = {new LongWritable(storage.getCheckpointTime())};
logEdit(OP_CHECKPOINT_TIME, args);
}
@@ -1148,7 +1112,7 @@ public class FSEditLog {
}
assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
"Not a backup node corresponds to a backup stream";
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
}
synchronized boolean checkBackupRegistration(
@@ -1175,7 +1139,7 @@ public class FSEditLog {
}
assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
"Not a backup node corresponds to a backup stream";
- processIOError(errorStreams, true);
+ disableAndReportErrorOnStreams(errorStreams);
return regAllowed;
}
@@ -1186,4 +1150,80 @@ public class FSEditLog {
}
return new BytesWritable(bytes);
}
+
+ /**
+ * Get the StorageDirectory for a stream
+ * @param es Stream whose StorageDirectory we wish to know
+ * @return the matching StorageDirectory
+ */
+ StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
+ String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
+
+ for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ StorageDirectory sd = it.next();
+ FSNamesystem.LOG.info("comparing: " + parentStorageDir
+ + " and " + sd.getRoot().getAbsolutePath());
+ if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+ return sd;
+ }
+ return null;
+ }
+
+ private synchronized void disableStream(EditLogOutputStream stream) {
+ try { stream.close(); } catch (IOException e) {
+ // nothing to do.
+ LOG.warn("Failed to close eStream " + stream.getName()
+ + " before removing it (might be ok)");
+ }
+ editStreams.remove(stream);
+
+ if (editStreams.size() <= 0) {
+ String msg = "Fatal Error: All storage directories are inaccessible.";
+ LOG.fatal(msg, new IOException(msg));
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+
+ /**
+ * Error Handling on a storageDirectory
+ *
+ */
+ // NNStorageListener Interface
+ @Override // NNStorageListener
+ public synchronized void errorOccurred(StorageDirectory sd)
+ throws IOException {
+ ArrayList<EditLogOutputStream> errorStreams
+ = new ArrayList<EditLogOutputStream>();
+
+ for (EditLogOutputStream eStream : editStreams) {
+ LOG.error("Unable to log edits to " + eStream.getName()
+ + "; removing it");
+
+ StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
+ if (sd == streamStorageDir) {
+ errorStreams.add(eStream);
+ }
+ }
+
+ for (EditLogOutputStream eStream : errorStreams) {
+ disableStream(eStream);
+ }
+ }
+
+ @Override // NNStorageListener
+ public synchronized void formatOccurred(StorageDirectory sd)
+ throws IOException {
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+ createEditLogFile(NNStorage.getStorageFile(sd, NameNodeFile.EDITS));
+ }
+ };
+
+ @Override // NNStorageListener
+ public synchronized void directoryAvailable(StorageDirectory sd)
+ throws IOException {
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+ File eFile = getEditFile(sd);
+ addNewEditLogStream(eFile);
+ }
+ }
}
|