Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4113A7EB9 for ; Thu, 14 Jul 2011 18:53:48 +0000 (UTC) Received: (qmail 61125 invoked by uid 500); 14 Jul 2011 18:53:48 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 61033 invoked by uid 500); 14 Jul 2011 18:53:47 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 60893 invoked by uid 99); 14 Jul 2011 18:53:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jul 2011 18:53:47 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jul 2011 18:53:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8B5EB2388894; Thu, 14 Jul 2011 18:53:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1146845 [1/2] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/server/protocol/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/ Date: Thu, 14 Jul 2011 18:53:13 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110714185314.8B5EB2388894@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Thu Jul 14 18:53:11 2011 New Revision: 1146845 URL: http://svn.apache.org/viewvc?rev=1146845&view=rev Log: HDFS-1979. Fix backupnode for new edits/image layout. Contributed by Todd Lipcon. Added: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BackupNodeProtocol.java Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt (original) +++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt Thu Jul 14 18:53:11 2011 @@ -67,3 +67,4 @@ HDFS-2102. Zero-pad edits filename to ma HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible. (atm via todd) HDFS-2123. Checkpoint interval should be based on txn count, not size. (todd) +HDFS-1979. Fix backupnode for new edits/image layout. (todd) Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Thu Jul 14 18:53:11 2011 @@ -19,31 +19,21 @@ package org.apache.hadoop.hdfs.server.na import java.io.BufferedInputStream; import java.io.DataInputStream; -import java.io.File; import java.io.IOException; import java.util.Iterator; -import java.util.List; -import java.util.zip.CheckedInputStream; import java.util.zip.Checksum; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.LayoutVersion; -import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.common.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageState; -import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; -import static org.apache.hadoop.hdfs.server.common.Util.now; +import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader; -import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; -import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; -import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; -import org.apache.hadoop.hdfs.util.MD5FileUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MD5Hash; -import org.apache.hadoop.conf.Configuration; +import com.google.common.base.Preconditions; /** * Extension of FSImage for the backup node. @@ -52,30 +42,39 @@ import org.apache.hadoop.conf.Configurat */ @InterfaceAudience.Private public class BackupImage extends FSImage { - // Names of the journal spool directory and the spool file - private static final String STORAGE_JSPOOL_DIR = "jspool"; - private static final String STORAGE_JSPOOL_FILE = - NNStorage.NameNodeFile.EDITS_NEW.getName(); - /** Backup input stream for loading edits into memory */ - private EditLogBackupInputStream backupInputStream; - - /** Is journal spooling in progress */ - volatile JSpoolState jsState; - private long lastAppliedTxId = 0; - - static enum JSpoolState { - OFF, - INPROGRESS, - WAIT; + private EditLogBackupInputStream backupInputStream = + new EditLogBackupInputStream("Data from remote NameNode"); + + /** + * Current state of the BackupNode. The BackupNode's state + * transitions are as follows: + * + * Initial: DROP_UNTIL_NEXT_ROLL + * - Transitions to JOURNAL_ONLY the next time the log rolls + * - Transitions to IN_SYNC in convergeJournalSpool + * - Transitions back to JOURNAL_ONLY if the log rolls while + * stopApplyingOnNextRoll is true. + */ + BNState bnState; + static enum BNState { + // Edits from the NN should be dropped. On the next log roll, + // transition to JOURNAL_ONLY state + DROP_UNTIL_NEXT_ROLL, + // Edits from the NN should be written to the local edits log + // but not applied to the namespace. + JOURNAL_ONLY, + // Edits should be written to the local edits log and applied + // to the local namespace. + IN_SYNC; } - /** - * Place-holder for a txid that still needs to be addressed - * in HDFS-1073 branch before merging into trunk. + * Flag to indicate that the next time the NN rolls, the BN + * should transition from to JOURNAL_ONLY state. + * {@see #freezeNamespaceAtNextRoll()} */ - private static final long TODO_TXID = 0xDEADBEEF; + private boolean stopApplyingEditsOnNextRoll = false; /** * Construct a backup image. @@ -85,7 +84,8 @@ public class BackupImage extends FSImage BackupImage(Configuration conf) throws IOException { super(conf); storage.setDisablePreUpgradableLayoutCheck(true); - jsState = JSpoolState.OFF; + bnState = BNState.DROP_UNTIL_NEXT_ROLL; + editLog.initJournals(); } /** @@ -130,279 +130,259 @@ public class BackupImage extends FSImage } /** - * Reset storage directories. - *

- * Unlock the storage. - * Rename current to lastcheckpoint.tmp - * and recreate empty current. - * @throws IOException + * Save meta-data into fsimage files. + * and create empty edits. */ - synchronized void reset() throws IOException { - /* TODO: BackupNode - // reset NameSpace tree - FSDirectory fsDir = getFSNamesystem().dir; - fsDir.reset(); - - // unlock, close and rename storage directories - storage.unlockAll(); - - // recover from unsuccessful checkpoint if necessary - recoverCreateRead(); - // rename and recreate - for (Iterator it = storage.dirIterator(); it.hasNext();) { - StorageDirectory sd = it.next(); - // rename current to lastcheckpoint.tmp - storage.moveCurrent(sd); - } - */ + void saveCheckpoint() throws IOException { + saveNamespace(); } /** - * Load checkpoint from local files only if the memory state is empty.
- * Set new checkpoint time received from the name-node.
- * Move lastcheckpoint.tmp to previous.checkpoint. + * Receive a batch of edits from the NameNode. + * + * Depending on bnState, different actions are taken. See + * {@link BackupImage.BNState} + * + * @param firstTxId first txid in batch + * @param numTxns number of transactions + * @param data serialized journal records. * @throws IOException + * @see #convergeJournalSpool() */ - void loadCheckpoint(CheckpointSignature sig) throws IOException { - // load current image and journal if it is not in memory already - if(!editLog.isOpen()) - editLog.startLogSegment(TODO_TXID); - - // set storage fields - storage.setStorageInfo(sig); - - FSDirectory fsDir = getFSNamesystem().dir; - if(fsDir.isEmpty()) { - Iterator itImage - = storage.dirIterator(NameNodeDirType.IMAGE); - Iterator itEdits - = storage.dirIterator(NameNodeDirType.EDITS); - if(!itImage.hasNext() || ! itEdits.hasNext()) - throw new IOException("Could not locate checkpoint directories"); - StorageDirectory sdName = itImage.next(); - StorageDirectory sdEdits = itEdits.next(); - - getFSDirectoryRootLock().writeLock(); - try { // load image under rootDir lock - File imageFile = null; // TODO - MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile); - loadFSImage(imageFile, expectedMD5); - } finally { - getFSDirectoryRootLock().writeUnlock(); - } - List editsFiles = - FSImageOldStorageInspector.getEditsInStorageDir(sdEdits); - loadEdits(editsFiles); - lastAppliedTxId = getEditLog().getLastWrittenTxId(); + synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Got journal, " + + "state = " + bnState + + "; firstTxId = " + firstTxId + + "; numTxns = " + numTxns); } + + switch(bnState) { + case DROP_UNTIL_NEXT_ROLL: + return; + + case IN_SYNC: + // update NameSpace in memory + applyEdits(firstTxId, numTxns, data); + break; + + case JOURNAL_ONLY: + break; + + default: + throw new AssertionError("Unhandled state: " + bnState); + } + + // write to BN's local edit log. + logEditsLocally(firstTxId, numTxns, data); } /** - * Save meta-data into fsimage files. - * and create empty edits. + * Write the batch of edits to the local copy of the edit logs. */ - void saveCheckpoint() throws IOException { - saveNamespace(); - } - - private FSDirectory getFSDirectoryRootLock() { - return getFSNamesystem().dir; - } - - static File getJSpoolDir(StorageDirectory sd) { - return new File(sd.getRoot(), STORAGE_JSPOOL_DIR); - } - - static File getJSpoolFile(StorageDirectory sd) { - return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE); + private void logEditsLocally(long firstTxId, int numTxns, byte[] data) { + long expectedTxId = editLog.getLastWrittenTxId() + 1; + Preconditions.checkState(firstTxId == expectedTxId, + "received txid batch starting at %s but expected txn %s", + firstTxId, expectedTxId); + editLog.setNextTxId(firstTxId + numTxns - 1); + editLog.logEdit(data.length, data); + editLog.logSync(); } /** - * Journal writer journals new meta-data state. - *

    - *
  1. If Journal Spool state is OFF then journal records (edits) - * are applied directly to meta-data state in memory and are written - * to the edits file(s).
  2. - *
  3. If Journal Spool state is INPROGRESS then records are only - * written to edits.new file, which is called Spooling.
  4. - *
  5. Journal Spool state WAIT blocks journaling until the - * Journal Spool reader finalizes merging of the spooled data and - * switches to applying journal to memory.
  6. - *
- * @param length length of data. - * @param data serialized journal records. - * @throws IOException - * @see #convergeJournalSpool() + * Apply the batch of edits to the local namespace. */ - synchronized void journal(int length, byte[] data) throws IOException { + private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data) + throws IOException { + Preconditions.checkArgument(firstTxId == lastAppliedTxId + 1, + "Received txn batch starting at %s but expected %s", + firstTxId, lastAppliedTxId + 1); assert backupInputStream.length() == 0 : "backup input stream is not empty"; try { - switch(jsState) { - case WAIT: - case OFF: - // wait until spooling is off - waitSpoolEnd(); - // update NameSpace in memory - backupInputStream.setBytes(data); - FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); - int logVersion = storage.getLayoutVersion(); - BufferedInputStream bin = new BufferedInputStream(backupInputStream); - DataInputStream in = new DataInputStream(bin); - Checksum checksum = null; - if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) { - checksum = FSEditLog.getChecksum(); - in = new DataInputStream(new CheckedInputStream(bin, checksum)); - } - logLoader.loadEditRecords(logVersion, in, checksum, true, - lastAppliedTxId + 1); - getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient! - break; - case INPROGRESS: - break; + if (LOG.isTraceEnabled()) { + LOG.debug("data:" + StringUtils.byteToHexString(data)); + } + backupInputStream.setBytes(data); + FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); + int logVersion = storage.getLayoutVersion(); + BufferedInputStream bin = new BufferedInputStream(backupInputStream); + DataInputStream in = new DataInputStream(bin); + Checksum checksum = FSEditLog.getChecksum(); + int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true, + lastAppliedTxId + 1); + if (numLoaded != numTxns) { + throw new IOException("Batch of txns starting at txnid " + + firstTxId + " was supposed to contain " + numTxns + + " transactions but only was able to apply " + numLoaded); } - // write to files - editLog.logEdit(length, data); - editLog.logSync(); + lastAppliedTxId += numTxns; + + getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient! } finally { backupInputStream.clear(); } } - private synchronized void waitSpoolEnd() { - while(jsState == JSpoolState.WAIT) { + /** + * Transition the BackupNode from JOURNAL_ONLY state to IN_SYNC state. + * This is done by repeated invocations of tryConvergeJournalSpool until + * we are caught up to the latest in-progress edits file. + */ + void convergeJournalSpool() throws IOException { + Preconditions.checkState(bnState == BNState.JOURNAL_ONLY, + "bad state: %s", bnState); + + while (!tryConvergeJournalSpool()) { + ; + } + assert bnState == BNState.IN_SYNC; + } + + private boolean tryConvergeJournalSpool() throws IOException { + Preconditions.checkState(bnState == BNState.JOURNAL_ONLY, + "bad state: %s", bnState); + + // This section is unsynchronized so we can continue to apply + // ahead of where we're reading, concurrently. Since the state + // is JOURNAL_ONLY at this point, we know that lastAppliedTxId + // doesn't change, and curSegmentTxId only increases + + while (lastAppliedTxId < editLog.getCurSegmentTxId() - 1) { + long target = editLog.getCurSegmentTxId(); + LOG.info("Loading edits into backupnode to try to catch up from txid " + + lastAppliedTxId + " to " + target); + FSImageTransactionalStorageInspector inspector = + new FSImageTransactionalStorageInspector(); + + storage.inspectStorageDirs(inspector); + LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId, + target - 1); + + logLoadPlan.doRecovery(); + loadEdits(logLoadPlan.getEditsFiles()); + } + + // now, need to load the in-progress file + synchronized (this) { + if (lastAppliedTxId != editLog.getCurSegmentTxId() - 1) { + LOG.debug("Logs rolled while catching up to current segment"); + return false; // drop lock and try again to load local logs + } + + EditLogInputStream stream = getEditLog().getInProgressFileInputStream(); try { - wait(); - } catch (InterruptedException e) {} + long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId; + + LOG.info("Going to finish converging with remaining " + remainingTxns + + " txns from in-progress stream " + stream); + + FSEditLogLoader loader = new FSEditLogLoader(namesystem); + int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1); + lastAppliedTxId += numLoaded; + assert numLoaded == remainingTxns : + "expected to load " + remainingTxns + " but loaded " + + numLoaded + " from " + stream; + } finally { + IOUtils.closeStream(stream); + } + + LOG.info("Successfully synced BackupNode with NameNode at txnid " + + lastAppliedTxId); + setState(BNState.IN_SYNC); } - // now spooling should be off, verifying just in case - assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState; + return true; } /** - * Start journal spool. - * Switch to writing into edits.new instead of edits. - * - * edits.new for spooling is in separate directory "spool" rather than in - * "current" because the two directories should be independent. - * While spooling a checkpoint can happen and current will first - * move to lastcheckpoint.tmp and then to previous.checkpoint - * spool/edits.new will remain in place during that. - */ - synchronized void startJournalSpool(NamenodeRegistration nnReg) - throws IOException { - switch(jsState) { - case OFF: - break; - case INPROGRESS: - return; - case WAIT: - waitSpoolEnd(); + * Transition edit log to a new state, logging as necessary. + */ + private synchronized void setState(BNState newState) { + if (LOG.isDebugEnabled()) { + LOG.debug("State transition " + bnState + " -> " + newState, + new Exception("trace")); } + bnState = newState; + } - // create journal spool directories - for (Iterator it - = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { - StorageDirectory sd = it.next(); - File jsDir = getJSpoolDir(sd); - if (!jsDir.exists() && !jsDir.mkdirs()) { - throw new IOException("Mkdirs failed to create " - + jsDir.getCanonicalPath()); + /** + * Receive a notification that the NameNode has begun a new edit log. + * This causes the BN to also start the new edit log in its local + * directories. + */ + synchronized void namenodeStartedLogSegment(long txid) { + LOG.info("NameNode started a new log segment at txid " + txid); + if (editLog.isOpen()) { + if (editLog.getLastWrittenTxId() == txid - 1) { + // We are in sync with the NN, so end and finalize the current segment + editLog.endCurrentLogSegment(false); + } else { + // We appear to have missed some transactions -- the NN probably + // lost contact with us temporarily. So, mark the current segment + // as aborted. + LOG.warn("NN started new log segment at txid " + txid + + ", but BN had only written up to txid " + + editLog.getLastWrittenTxId() + + "in the log segment starting at " + + editLog.getCurSegmentTxId() + ". Aborting this " + + "log segment."); + editLog.abortCurrentLogSegment(); } - // create edit file if missing - /*File eFile = storage.getEditFile(sd); TODO - if(!eFile.exists()) { - editLog.createEditLogFile(eFile); - }*/ - } - - if(!editLog.isOpen()) - editLog.startLogSegment(TODO_TXID); - - // create streams pointing to the journal spool files - // subsequent journal records will go directly to the spool -// TODO editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE); - - // set up spooling - if(backupInputStream == null) - backupInputStream = new EditLogBackupInputStream(nnReg.getAddress()); - jsState = JSpoolState.INPROGRESS; + } + editLog.setNextTxId(txid); + editLog.startLogSegment(txid, false); + if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) { + setState(BNState.JOURNAL_ONLY); + } + + if (stopApplyingEditsOnNextRoll) { + if (bnState == BNState.IN_SYNC) { + LOG.info("Stopped applying edits to prepare for checkpoint."); + setState(BNState.JOURNAL_ONLY); + } + stopApplyingEditsOnNextRoll = false; + notifyAll(); + } } /** - * Merge Journal Spool to memory.

- * Journal Spool reader reads journal records from edits.new. - * When it reaches the end of the file it sets {@link JSpoolState} to WAIT. - * This blocks journaling (see {@link #journal(int,byte[])}. - * The reader - *

    - *
  • reads remaining journal records if any,
  • - *
  • renames edits.new to edits,
  • - *
  • sets {@link JSpoolState} to OFF,
  • - *
  • and notifies the journaling thread.
  • - *
- * Journaling resumes with applying new journal records to the memory state, - * and writing them into edits file(s). + * Request that the next time the BN receives a log roll, it should + * stop applying the edits log to the local namespace. This is + * typically followed on by a call to {@link #waitUntilNamespaceFrozen()} */ - void convergeJournalSpool() throws IOException { - Iterator itEdits - = storage.dirIterator(NameNodeDirType.EDITS); - if(! itEdits.hasNext()) - throw new IOException("Could not locate checkpoint directories"); - StorageDirectory sdEdits = itEdits.next(); - int numEdits = 0; - File jSpoolFile = getJSpoolFile(sdEdits); - long startTime = now(); - if(jSpoolFile.exists()) { - // load edits.new - EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile); - BufferedInputStream bin = new BufferedInputStream(edits); - DataInputStream in = new DataInputStream(bin); - FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); + synchronized void freezeNamespaceAtNextRoll() { + stopApplyingEditsOnNextRoll = true; + } - LogHeader header = FSEditLogOp.LogHeader.read(in); - int loaded = logLoader.loadEditRecords( - header.logVersion, in, header.checksum, false, - lastAppliedTxId + 1); - - lastAppliedTxId += loaded; - numEdits += loaded; - - // first time reached the end of spool - jsState = JSpoolState.WAIT; - loaded = logLoader.loadEditRecords( - header.logVersion, in, header.checksum, - true, lastAppliedTxId + 1); - numEdits += loaded; - lastAppliedTxId += loaded; - - getFSNamesystem().dir.updateCountForINodeWithQuota(); - edits.close(); - } - - FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath() - + " of size " + jSpoolFile.length() + " edits # " + numEdits - + " loaded in " + (now()-startTime)/1000 + " seconds."); - - // rename spool edits.new to edits making it in sync with the active node - // subsequent journal records will go directly to edits - // TODO editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE); - - // write version file - // TODO resetVersion(storage.getImageDigest()); - - // wake up journal writer - synchronized(this) { - jsState = JSpoolState.OFF; - notifyAll(); - } + /** + * After {@link #freezeNamespaceAtNextRoll()} has been called, wait until + * the BN receives notification of the next log roll. + */ + synchronized void waitUntilNamespaceFrozen() throws IOException { + if (bnState != BNState.IN_SYNC) return; - /* - * TODO: bn - // Rename lastcheckpoint.tmp to previous.checkpoint - for (Iterator it = storage.dirIterator(); it.hasNext();) { - StorageDirectory sd = it.next(); - storage.moveLastCheckpoint(sd); + LOG.info("Waiting until the NameNode rolls its edit logs in order " + + "to freeze the BackupNode namespace."); + while (bnState == BNState.IN_SYNC) { + Preconditions.checkState(stopApplyingEditsOnNextRoll, + "If still in sync, we should still have the flag set to " + + "freeze at next roll"); + try { + wait(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for namespace to freeze", ie); + throw new IOException(ie); + } } - */ + LOG.info("BackupNode namespace frozen."); + } + + /** + * Override close() so that we don't finalize edit logs. + */ + @Override + public synchronized void close() throws IOException { + editLog.abortCurrentLogSegment(); + storage.close(); } } Added: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1146845&view=auto ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (added) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Thu Jul 14 18:53:11 2011 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.namenode.NNStorageArchivalManager.StorageArchiver; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; + +public class BackupJournalManager implements JournalManager { + + private final NamenodeRegistration nnReg; + private final NamenodeRegistration bnReg; + + BackupJournalManager(NamenodeRegistration bnReg, + NamenodeRegistration nnReg) { + this.bnReg = bnReg; + this.nnReg = nnReg; + } + + @Override + public EditLogOutputStream startLogSegment(long txId) throws IOException { + EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg); + stm.startLogSegment(txId); + return stm; + } + + @Override + public void finalizeLogSegment(long firstTxId, long lastTxId) + throws IOException { + } + + @Override + public void setOutputBufferCapacity(int size) { + } + + @Override + public void archiveLogsOlderThan(long minTxIdToKeep, StorageArchiver archiver) + throws IOException { + } + + public boolean matchesRegistration(NamenodeRegistration bnReg) { + return bnReg.getAddress().equals(this.bnReg.getAddress()); + } + + @Override + public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) { + return null; + } +} Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Thu Jul 14 18:53:11 2011 @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -51,7 +52,7 @@ import org.apache.hadoop.net.NetUtils; * */ @InterfaceAudience.Private -public class BackupNode extends NameNode { +public class BackupNode extends NameNode implements BackupNodeProtocol { private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT; private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; @@ -176,6 +177,17 @@ public class BackupNode extends NameNode super.stop(); } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(BackupNodeProtocol.class.getName())) { + return BackupNodeProtocol.versionID; + } else { + return super.getProtocolVersion(protocol, clientVersion); + } + } + ///////////////////////////////////////////////////// // NamenodeProtocol implementation for backup node. ///////////////////////////////////////////////////// @@ -202,30 +214,36 @@ public class BackupNode extends NameNode public void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { throw new UnsupportedActionException("endCheckpoint"); - } + } - @Override // NamenodeProtocol + ///////////////////////////////////////////////////// + // BackupNodeProtocol implementation for backup node. + ///////////////////////////////////////////////////// + + @Override public void journal(NamenodeRegistration nnReg, - int jAction, - int length, - byte[] args) throws IOException { + long firstTxId, int numTxns, + byte[] records) throws IOException { verifyRequest(nnReg); if(!nnRpcAddress.equals(nnReg.getAddress())) throw new IOException("Journal request from unexpected name-node: " + nnReg.getAddress() + " expecting " + nnRpcAddress); - BackupImage bnImage = (BackupImage)getFSImage(); - switch(jAction) { - case (int)JA_IS_ALIVE: - return; - case (int)JA_JOURNAL: - bnImage.journal(length, args); - return; - case (int)JA_JSPOOL_START: - bnImage.startJournalSpool(nnReg); - return; - default: - throw new IOException("Unexpected journal action: " + jAction); - } + getBNImage().journal(firstTxId, numTxns, records); + } + + @Override + public void startLogSegment(NamenodeRegistration registration, long txid) + throws IOException { + verifyRequest(registration); + + getBNImage().namenodeStartedLogSegment(txid); + } + + ////////////////////////////////////////////////////// + + + BackupImage getBNImage() { + return (BackupImage)getFSImage(); } boolean shouldCheckpointAtStartup() { @@ -234,9 +252,9 @@ public class BackupNode extends NameNode assert fsImage.getStorage().getNumStorageDirs() > 0; return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists(); } - if(namesystem == null || namesystem.dir == null || getFSImage() == null) - return true; - return false; // TODO fsImage.getEditLog().getNumJournals() == 0; + + // BN always checkpoints on startup in order to get in sync with namespace + return true; } private NamespaceInfo handshake(Configuration conf) throws IOException { @@ -287,14 +305,15 @@ public class BackupNode extends NameNode */ private void registerWith(NamespaceInfo nsInfo) throws IOException { BackupImage bnImage = (BackupImage)getFSImage(); + NNStorage storage = bnImage.getStorage(); // verify namespaceID - if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage - bnImage.getStorage().setStorageInfo(nsInfo); - else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID()) - throw new IOException("Incompatible namespaceIDs" - + ": active node namespaceID = " + nsInfo.getNamespaceID() - + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID()); - + if (storage.getNamespaceID() == 0) { // new backup storage + storage.setStorageInfo(nsInfo); + storage.setBlockPoolID(nsInfo.getBlockPoolID()); + storage.setClusterID(nsInfo.getClusterID()); + } else { + nsInfo.validateStorage(storage); + } setRegistration(); NamenodeRegistration nnReg = null; while(!isStopRequested()) { @@ -323,14 +342,6 @@ public class BackupNode extends NameNode nnRpcAddress = nnReg.getAddress(); } - /** - * Reset node namespace state in memory and in storage directories. - * @throws IOException - */ - void resetNamespace() throws IOException { - ((BackupImage)getFSImage()).reset(); - } - // TODO: move to a common with DataNode util class private static NamespaceInfo handshake(NamenodeProtocol namenode) throws IOException, SocketTimeoutException { Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Thu Jul 14 18:53:11 2011 @@ -110,7 +110,6 @@ public class CheckpointSignature extends || cTime != si.getStorage().cTime || !clusterID.equals(si.getClusterID()) || !blockpoolID.equals(si.getBlockPoolID())) { - // checkpointTime can change when the image is saved - do not compare throw new IOException("Inconsistent checkpoint fields.\n" + "LV = " + layoutVersion + " namespaceID = " + namespaceID + " cTime = " + cTime Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Thu Jul 14 18:53:11 2011 @@ -17,30 +17,27 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.IOException; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.common.Util.now; + import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; -import static org.apache.hadoop.hdfs.server.common.Util.now; -import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates; -import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; -import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.util.Daemon; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT; /** * The Checkpointer is responsible for supporting periodic checkpoints @@ -80,6 +77,7 @@ class Checkpointer extends Daemon { try { initialize(conf); } catch(IOException e) { + LOG.warn("Checkpointer got exception", e); shutdown(); throw e; } @@ -134,8 +132,9 @@ class Checkpointer extends Daemon { periodMSec *= 1000; long lastCheckpointTime = 0; - if(!backupNode.shouldCheckpointAtStartup()) + if (!backupNode.shouldCheckpointAtStartup()) { lastCheckpointTime = now(); + } while(shouldRun) { try { long now = now(); @@ -175,44 +174,15 @@ class Checkpointer extends Daemon { } /** - * Download fsimage and edits - * files from the remote name-node. - */ - private void downloadCheckpoint(CheckpointSignature sig) throws IOException { - String nnHttpAddr = backupNode.nnHttpAddress; - - // Retrieve image file - MD5Hash hash = TransferFsImage.downloadImageToStorage( - nnHttpAddr, sig.mostRecentCheckpointTxId, - getFSImage().getStorage(), true); - getFSImage().saveDigestAndRenameCheckpointImage(sig.mostRecentCheckpointTxId, hash); - - // Retrieve edits file - // TODO! - } - - /** - * Copy the new image into remote name-node. - */ - private void uploadCheckpoint(CheckpointSignature sig) throws IOException { - // TODO: checkpoint node disabled in 1073 branch -/* // Use the exact http addr as specified in config to deal with ip aliasing - InetSocketAddress httpSocAddr = backupNode.getHttpAddress(); - int httpPort = httpSocAddr.getPort(); - String fileid = "putimage=1&port=" + httpPort + - "&machine=" + infoBindAddress + - "&token=" + sig.toString(); - LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid); - TransferFsImage.getFileClient(backupNode.nnHttpAddress, - fileid, null, false); - */ - } - - /** * Create a new checkpoint */ void doCheckpoint() throws IOException { + BackupImage bnImage = getFSImage(); + NNStorage bnStorage = bnImage.getStorage(); + long startTime = now(); + bnImage.freezeNamespaceAtNextRoll(); + NamenodeCommand cmd = getNamenode().startCheckpoint(backupNode.getRegistration()); CheckpointCommand cpCmd = null; @@ -228,36 +198,76 @@ class Checkpointer extends Daemon { throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction()); } + bnImage.waitUntilNamespaceFrozen(); + CheckpointSignature sig = cpCmd.getSignature(); - assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() : - "Signature should have current layout version. Expected: " - + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion(); - assert !backupNode.isRole(NamenodeRole.CHECKPOINT) || - cpCmd.isImageObsolete() : "checkpoint node should always download image."; - if(cpCmd.isImageObsolete()) { - // First reset storage on disk and memory state - backupNode.resetNamespace(); - downloadCheckpoint(sig); - } - BackupImage bnImage = getFSImage(); - bnImage.getStorage().setBlockPoolID(backupNode.getBlockPoolId()); - bnImage.getStorage().setClusterID(backupNode.getClusterId()); - bnImage.loadCheckpoint(sig); + // Make sure we're talking to the same NN! sig.validateStorageInfo(bnImage); - bnImage.saveCheckpoint(); - if(cpCmd.needToReturnImage()) - uploadCheckpoint(sig); + long lastApplied = bnImage.getLastAppliedTxId(); + LOG.debug("Doing checkpoint. Last applied: " + lastApplied); + RemoteEditLogManifest manifest = + getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId()); + + if (!manifest.getLogs().isEmpty()) { + RemoteEditLog firstRemoteLog = manifest.getLogs().get(0); + // we don't have enough logs to roll forward using only logs. Need + // to download and load the image. + if (firstRemoteLog.getStartTxId() > lastApplied + 1) { + LOG.info("Unable to roll forward using only logs. Downloading " + + "image with txid " + sig.mostRecentCheckpointTxId); + MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage( + backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, + bnStorage, true); + bnImage.saveDigestAndRenameCheckpointImage( + sig.mostRecentCheckpointTxId, downloadedHash); + + LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId); + File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId); + bnImage.reloadFromImageFile(file); + } + + lastApplied = bnImage.getLastAppliedTxId(); + if (firstRemoteLog.getStartTxId() > lastApplied + 1) { + throw new IOException("No logs to roll forward from " + lastApplied); + } + + // get edits files + for (RemoteEditLog log : manifest.getLogs()) { + TransferFsImage.downloadEditsToStorage( + backupNode.nnHttpAddress, log, bnStorage); + } + + SecondaryNameNode.rollForwardByApplyingLogs(manifest, bnImage); + } + + long txid = bnImage.getLastAppliedTxId(); + bnImage.saveFSImageInAllDirs(txid); + bnStorage.writeAll(); + + if(cpCmd.needToReturnImage()) { + TransferFsImage.uploadImageFromStorage( + backupNode.nnHttpAddress, getImageListenAddress(), + bnStorage, txid); + } getNamenode().endCheckpoint(backupNode.getRegistration(), sig); - bnImage.convergeJournalSpool(); + if (backupNode.getRole() == NamenodeRole.BACKUP) { + bnImage.convergeJournalSpool(); + } backupNode.setRegistration(); // keep registration up to date - if(backupNode.isRole(NamenodeRole.CHECKPOINT)) - getFSImage().getEditLog().close(); + + long imageSize = bnImage.getStorage().getFsImageName(txid).length(); LOG.info("Checkpoint completed in " + (now() - startTime)/1000 + " seconds." - + " New Image Size: " + bnImage.getStorage().getFsImageName(0 /* TODO */).length()); + + " New Image Size: " + imageSize); + } + + private InetSocketAddress getImageListenAddress() { + InetSocketAddress httpSocAddr = backupNode.getHttpAddress(); + int httpPort = httpSocAddr.getPort(); + return new InetSocketAddress(infoBindAddress, httpPort); } } Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Thu Jul 14 18:53:11 2011 @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; @@ -42,7 +42,7 @@ import org.apache.hadoop.net.NetUtils; class EditLogBackupOutputStream extends EditLogOutputStream { static int DEFAULT_BUFFER_SIZE = 256; - private NamenodeProtocol backupNode; // RPC proxy to backup node + private BackupNodeProtocol backupNode; // RPC proxy to backup node private NamenodeRegistration bnRegistration; // backup node registration private NamenodeRegistration nnRegistration; // active node registration private ArrayList bufCurrent; // current buffer for writing @@ -60,13 +60,8 @@ class EditLogBackupOutputStream extends this.args = writables; } - void write(DataOutputStream out) throws IOException { - out.write(op); - out.writeLong(txid); - if(args == null) - return; - for(Writable w : args) - w.write(out); + void write(DataOutputBuffer out) throws IOException { + writeChecksummedOp(out, op, txid, args); } } @@ -81,8 +76,8 @@ class EditLogBackupOutputStream extends Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress); try { this.backupNode = - (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class, - NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration()); + (BackupNodeProtocol) RPC.getProxy(BackupNodeProtocol.class, + BackupNodeProtocol.versionID, bnAddress, new HdfsConfiguration()); } catch(IOException e) { Storage.LOG.error("Error connecting to: " + bnAddress, e); throw e; @@ -91,7 +86,7 @@ class EditLogBackupOutputStream extends this.bufReady = new ArrayList(); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); } - + @Override // JournalStream public String getName() { return bnRegistration.getAddress(); @@ -156,23 +151,14 @@ class EditLogBackupOutputStream extends @Override // EditLogOutputStream protected void flushAndSync() throws IOException { assert out.size() == 0 : "Output buffer is not empty"; - int bufReadySize = bufReady.size(); - for(int idx = 0; idx < bufReadySize; idx++) { - JournalRecord jRec = null; - for(; idx < bufReadySize; idx++) { - jRec = bufReady.get(idx); - if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode()) - break; // special operation should be sent in a separate call to BN - jRec.write(out); - } - if(out.size() > 0) - send(NamenodeProtocol.JA_JOURNAL); - if(idx == bufReadySize) - break; - // operation like start journal spool or increment checkpoint time - // is a separate call to BN + for (JournalRecord jRec : bufReady) { jRec.write(out); - send(jRec.op); + } + if (out.size() > 0) { + byte[] data = Arrays.copyOf(out.getData(), out.getLength()); + backupNode.journal(nnRegistration, + bufReady.get(0).txid, bufReady.size(), + data); } bufReady.clear(); // erase all data in the buffer out.reset(); // reset buffer to the start position @@ -188,16 +174,6 @@ class EditLogBackupOutputStream extends return 0; } - private void send(int ja) throws IOException { - try { - int length = out.getLength(); - out.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); - backupNode.journal(nnRegistration, ja, length, out.getData()); - } finally { - out.reset(); - } - } - /** * Get backup node registration. */ @@ -205,17 +181,7 @@ class EditLogBackupOutputStream extends return bnRegistration; } - /** - * Verify that the backup node is alive. - */ - boolean isAlive() { - try { - send(NamenodeProtocol.JA_IS_ALIVE); - } catch(IOException ei) { - Storage.LOG.info(bnRegistration.getRole() + " " - + bnRegistration.getAddress() + " is not alive. ", ei); - return false; - } - return true; + void startLogSegment(long txId) throws IOException { + backupNode.startLogSegment(nnRegistration, txId); } } Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Thu Jul 14 18:53:11 2011 @@ -72,5 +72,10 @@ class EditLogFileInputStream extends Edi // file size + size of both buffers return file.length(); } + + @Override + public String toString() { + return getName(); + } } Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu Jul 14 18:53:11 2011 @@ -111,21 +111,9 @@ class EditLogFileOutputStream extends Ed if (fp == null) { throw new IOException("Trying to use aborted output stream"); } - int start = bufCurrent.getLength(); - write(op); - bufCurrent.writeLong(txid); - for (Writable w : writables) { - w.write(bufCurrent); - } - // write transaction checksum - int end = bufCurrent.getLength(); - Checksum checksum = FSEditLog.getChecksum(); - checksum.reset(); - checksum.update(bufCurrent.getData(), start, end-start); - int sum = (int)checksum.getValue(); - bufCurrent.writeInt(sum); + writeChecksummedOp(bufCurrent, op, txid, writables); } - + @Override void write(final byte[] data, int off, int len) throws IOException { bufCurrent.write(data, off, len); @@ -151,7 +139,7 @@ class EditLogFileOutputStream extends Ed setReadyToFlush(); flush(); - + // close should have been called after all pending transactions // have been flushed & synced. // if already closed, just skip Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Thu Jul 14 18:53:11 2011 @@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs.server.na import java.io.IOException; import java.io.OutputStream; +import java.util.zip.Checksum; import static org.apache.hadoop.hdfs.server.common.Util.now; + +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; /** @@ -135,5 +138,25 @@ implements JournalStream { return getName(); } - + /** + * Write the given operation to the specified buffer, including + * the transaction ID and checksum. + */ + protected static void writeChecksummedOp( + DataOutputBuffer buf, byte op, long txid, Writable... writables) + throws IOException { + int start = buf.getLength(); + buf.write(op); + buf.writeLong(txid); + for (Writable w : writables) { + w.write(buf); + } + // write transaction checksum + int end = buf.getLength(); + Checksum checksum = FSEditLog.getChecksum(); + checksum.reset(); + checksum.update(buf.getData(), start, end-start); + int sum = (int)checksum.getValue(); + buf.writeInt(sum); + } } Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Jul 14 18:53:11 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.zip.Checksum; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.Deprecated import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import static org.apache.hadoop.hdfs.server.common.Util.now; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; @@ -151,7 +153,7 @@ public class FSEditLog { /** * Initialize the list of edit journals */ - private void initJournals() { + synchronized void initJournals() { assert journals.isEmpty(); Preconditions.checkState(state == State.UNINITIALIZED, "Bad state: %s", state); @@ -167,10 +169,6 @@ public class FSEditLog { state = State.BETWEEN_LOG_SEGMENTS; } - private int getNumEditsDirs() { - return storage.getNumStorageDirs(NameNodeDirType.EDITS); - } - /** * Initialize the output stream for logging, opening the first * log segment. @@ -179,7 +177,7 @@ public class FSEditLog { Preconditions.checkState(state == State.UNINITIALIZED); initJournals(); - startLogSegment(getLastWrittenTxId() + 1); + startLogSegment(getLastWrittenTxId() + 1, true); assert state == State.IN_SEGMENT : "Bad state: " + state; } @@ -199,7 +197,7 @@ public class FSEditLog { if (state == State.IN_SEGMENT) { assert !journals.isEmpty(); waitForSyncToFinish(); - endCurrentLogSegment(); + endCurrentLogSegment(true); } state = State.CLOSED; @@ -335,11 +333,12 @@ public class FSEditLog { * Set the transaction ID to use for the next transaction written. */ synchronized void setNextTxId(long nextTxId) { - assert synctxid <= txid && - nextTxId >= txid : "May not decrease txid." + - " synctxid=" + synctxid + - " txid=" + txid + - " nextTxid=" + nextTxId; + Preconditions.checkArgument(synctxid <= txid && + nextTxId >= txid, + "May not decrease txid." + + " synctxid=%s txid=%s nextTxId=%s", + synctxid, txid, nextTxId); + txid = nextTxId - 1; } @@ -470,7 +469,8 @@ public class FSEditLog { if (badJournals.size() >= journals.size()) { LOG.fatal("Could not sync any journal to persistent storage. " + - "Unsynced transactions: " + (txid - synctxid)); + "Unsynced transactions: " + (txid - synctxid), + new Exception()); runtime.exit(1); } } finally { @@ -726,36 +726,6 @@ public class FSEditLog { } /** - * Return the size of the current EditLog - */ - // TODO who uses this, does it make sense with transactions? - synchronized long getEditLogSize() throws IOException { - assert getNumEditsDirs() <= journals.size() : - "Number of edits directories should not exceed the number of streams."; - long size = -1; - - List badJournals = Lists.newArrayList(); - - for (JournalAndStream j : journals) { - if (!j.isActive()) continue; - EditLogOutputStream es = j.getCurrentStream(); - try { - long curSize = es.length(); - assert (size == 0 || size == curSize || curSize ==0) : - "Wrong streams size"; - size = Math.max(size, curSize); - } catch (IOException e) { - LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e); - badJournals.add(j); - } - } - disableAndReportErrorOnJournals(badJournals); - - assert size != -1; - return size; - } - - /** * Used only by unit tests. */ @VisibleForTesting @@ -793,10 +763,10 @@ public class FSEditLog { */ synchronized long rollEditLog() throws IOException { LOG.info("Rolling edit logs."); - endCurrentLogSegment(); + endCurrentLogSegment(true); long nextTxId = getLastWrittenTxId() + 1; - startLogSegment(nextTxId); + startLogSegment(nextTxId, true); assert curSegmentTxId == nextTxId; return nextTxId; @@ -806,14 +776,20 @@ public class FSEditLog { * Start writing to the log segment with the given txid. * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. */ - synchronized void startLogSegment(final long txId) { - LOG.info("Starting log segment at " + txId); + synchronized void startLogSegment(final long segmentTxId, + boolean writeHeaderTxn) { + LOG.info("Starting log segment at " + segmentTxId); + Preconditions.checkArgument(segmentTxId > 0, + "Bad txid: %s", segmentTxId); Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, "Bad state: %s", state); - Preconditions.checkState(txId > curSegmentTxId, - "Cannot start writing to log segment " + txId + + Preconditions.checkState(segmentTxId > curSegmentTxId, + "Cannot start writing to log segment " + segmentTxId + " when previous log segment started at " + curSegmentTxId); - curSegmentTxId = txId; + Preconditions.checkArgument(segmentTxId == txid + 1, + "Cannot start log segment at txid %s when next expected " + + "txid is %s", segmentTxId, txid + 1); + curSegmentTxId = segmentTxId; numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0; @@ -823,26 +799,32 @@ public class FSEditLog { mapJournalsAndReportErrors(new JournalClosure() { @Override public void apply(JournalAndStream jas) throws IOException { - jas.startLogSegment(txId); + jas.startLogSegment(segmentTxId); } - }, "starting log segment " + txId); + }, "starting log segment " + segmentTxId); state = State.IN_SEGMENT; - logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT); - logSync(); + if (writeHeaderTxn) { + logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT); + logSync(); + } } /** * Finalize the current log segment. * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state. */ - synchronized void endCurrentLogSegment() { + synchronized void endCurrentLogSegment(boolean writeEndTxn) { LOG.info("Ending log segment " + curSegmentTxId); Preconditions.checkState(state == State.IN_SEGMENT, "Bad state: %s", state); - logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT); - waitForSyncToFinish(); + + if (writeEndTxn) { + logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT); + logSync(); + } + printStatistics(true); final long lastTxId = getLastWrittenTxId(); @@ -858,6 +840,20 @@ public class FSEditLog { state = State.BETWEEN_LOG_SEGMENTS; } + + /** + * Abort all current logs. Called from the backup node. + */ + synchronized void abortCurrentLogSegment() { + mapJournalsAndReportErrors(new JournalClosure() { + + @Override + public void apply(JournalAndStream jas) throws IOException { + jas.abort(); + } + }, "aborting all streams"); + state = State.BETWEEN_LOG_SEGMENTS; + } /** * Archive any log files that are older than the given txid. @@ -910,36 +906,62 @@ public class FSEditLog { /** * Create (or find if already exists) an edit output stream, which * streams journal records (edits) to the specified backup node.
- * Send a record, prescribing to start journal spool.
- * This should be sent via regular stream of journal records so that - * the backup node new exactly after which record it should start spooling. + * + * The new BackupNode will start receiving edits the next time this + * NameNode's logs roll. * * @param bnReg the backup node registration information. * @param nnReg this (active) name-node registration. * @throws IOException */ - synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node - NamenodeRegistration nnReg) // active name-node + synchronized void registerBackupNode( + NamenodeRegistration bnReg, // backup node + NamenodeRegistration nnReg) // active name-node throws IOException { - /* if(bnReg.isRole(NamenodeRole.CHECKPOINT)) return; // checkpoint node does not stream edits - if(editStreams == null) - editStreams = new ArrayList(); - EditLogOutputStream boStream = null; - for(EditLogOutputStream eStream : editStreams) { - if(eStream.getName().equals(bnReg.getAddress())) { - boStream = eStream; // already there - break; + + JournalAndStream jas = findBackupJournalAndStream(bnReg); + if (jas != null) { + // already registered + LOG.info("Backup node " + bnReg + " re-registers"); + return; + } + + LOG.info("Registering new backup node: " + bnReg); + BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); + journals.add(new JournalAndStream(bjm)); + } + + synchronized void releaseBackupStream(NamenodeRegistration registration) { + for (Iterator iter = journals.iterator(); + iter.hasNext();) { + JournalAndStream jas = iter.next(); + if (jas.manager instanceof BackupJournalManager && + ((BackupJournalManager)jas.manager).matchesRegistration( + registration)) { + jas.abort(); + LOG.info("Removing backup journal " + jas); + iter.remove(); + } + } + } + + /** + * Find the JournalAndStream associated with this BackupNode. + * @return null if it cannot be found + */ + private synchronized JournalAndStream findBackupJournalAndStream( + NamenodeRegistration bnReg) { + for (JournalAndStream jas : journals) { + if (jas.manager instanceof BackupJournalManager) { + BackupJournalManager bjm = (BackupJournalManager)jas.manager; + if (bjm.matchesRegistration(bnReg)) { + return jas; + } } } - if(boStream == null) { - boStream = new EditLogBackupOutputStream(bnReg, nnReg); - editStreams.add(boStream); - } - logEdit(OP_JSPOOL_START, (Writable[])null); - TODO: backupnode is disabled - */ + return null; } /** @@ -961,62 +983,6 @@ public class FSEditLog { endTransaction(start); } - synchronized void releaseBackupStream(NamenodeRegistration registration) { - /* - Iterator it = - getOutputStreamIterator(JournalType.BACKUP); - ArrayList errorStreams = null; - NamenodeRegistration backupNode = null; - while(it.hasNext()) { - EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next(); - backupNode = eStream.getRegistration(); - if(backupNode.getAddress().equals(registration.getAddress()) && - backupNode.isRole(registration.getRole())) { - errorStreams = new ArrayList(1); - errorStreams.add(eStream); - break; - } - } - assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) : - "Not a backup node corresponds to a backup stream"; - disableAndReportErrorOnJournals(errorStreams); - TODO BN currently disabled - */ - } - - synchronized boolean checkBackupRegistration( - NamenodeRegistration registration) { - /* - Iterator it = - getOutputStreamIterator(JournalType.BACKUP); - boolean regAllowed = !it.hasNext(); - NamenodeRegistration backupNode = null; - ArrayList errorStreams = null; - while(it.hasNext()) { - EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next(); - backupNode = eStream.getRegistration(); - if(backupNode.getAddress().equals(registration.getAddress()) && - backupNode.isRole(registration.getRole())) { - regAllowed = true; // same node re-registers - break; - } - if(!eStream.isAlive()) { - if(errorStreams == null) - errorStreams = new ArrayList(1); - errorStreams.add(eStream); - regAllowed = true; // previous backup node failed - } - } - assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) : - "Not a backup node corresponds to a backup stream"; - disableAndReportErrorOnJournals(errorStreams); - return regAllowed; - - TODO BN currently disabled - */ - return false; - } - static BytesWritable toBytesWritable(Options.Rename... options) { byte[] bytes = new byte[options.length]; for (int i = 0; i < options.length; i++) { @@ -1061,12 +1027,7 @@ public class FSEditLog { for (JournalAndStream j : badJournals) { LOG.error("Disabling journal " + j); - try { - j.abort(); - } catch (IOException ioe) { - LOG.warn("Failed to abort faulty journal " + j - + " before removing it (might be OK)", ioe); - } + j.abort(); } } @@ -1093,13 +1054,17 @@ public class FSEditLog { } public void close(long lastTxId) throws IOException { + Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId, + "invalid segment: lastTxId %s >= " + + "segment starting txid %s", lastTxId, segmentStartsAtTxId); + if (stream == null) return; stream.close(); manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId); stream = null; } - public void abort() throws IOException { + public void abort() { if (stream == null) return; try { stream.abort(); @@ -1133,5 +1098,30 @@ public class FSEditLog { JournalManager getManager() { return manager; } + + public EditLogInputStream getInProgressInputStream() throws IOException { + return manager.getInProgressInputStream(segmentStartsAtTxId); + } + } + + /** + * @return an EditLogInputStream that reads from the same log that + * the edit log is currently writing. This is used from the BackupNode + * during edits synchronization. + * @throws IOException if no valid logs are available. + */ + synchronized EditLogInputStream getInProgressFileInputStream() + throws IOException { + for (JournalAndStream jas : journals) { + if (!jas.isActive()) continue; + try { + EditLogInputStream in = jas.getInProgressInputStream(); + if (in != null) return in; + } catch (IOException ioe) { + LOG.warn("Unable to get the in-progress input stream from " + jas, + ioe); + } + } + throw new IOException("No in-progress stream provided edits"); } } Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Jul 14 18:53:11 2011 @@ -21,16 +21,13 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.URI; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +42,6 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageState; -import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; import static org.apache.hadoop.hdfs.server.common.Util.now; import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; @@ -74,14 +70,17 @@ import com.google.common.collect.Lists; public class FSImage implements Closeable { protected static final Log LOG = LogFactory.getLog(FSImage.class.getName()); - // checkpoint states - enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; } - protected FSNamesystem namesystem = null; protected FSEditLog editLog = null; private boolean isUpgradeFinalized = false; protected NNStorage storage; + + /** + * The last transaction ID that was either loaded from an image + * or loaded by loading edits files. + */ + protected long lastAppliedTxId = 0; /** * URIs for importing an image from a checkpoint. In the default case, @@ -92,10 +91,6 @@ public class FSImage implements Closeabl final private Configuration conf; - /** - * Can fs-image be rolled? - */ - volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START; private final NNStorageArchivalManager archivalManager; /** @@ -559,6 +554,18 @@ public class FSImage implements Closeabl editLog.open(); storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId()); }; + + /** + * Toss the current image and namesystem, reloading from the specified + * file. + */ + void reloadFromImageFile(File file) throws IOException { + // TODO: namesystem.close(); ?? + namesystem.dir.reset(); + + LOG.debug("Reloading namespace from " + file); + loadFSImage(file); + } /** * Choose latest image from one of the directories, @@ -626,8 +633,12 @@ public class FSImage implements Closeabl } catch (IOException ioe) { throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe); } - - needToSave |= loadEdits(loadPlan.getEditsFiles()); + + long numLoaded = loadEdits(loadPlan.getEditsFiles()); + needToSave |= numLoaded > 0; + + // update the txid for the edit log + editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1); /* TODO(todd) Need to discuss whether we should force a re-save * of the image if one of the edits or images has an old format @@ -640,13 +651,14 @@ public class FSImage implements Closeabl /** * Load the specified list of edit files into the image. - * @return true if the image should be re-saved + * @return the number of transactions loaded */ - protected boolean loadEdits(List editLogs) throws IOException { + protected long loadEdits(List editLogs) throws IOException { LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editLogs)); - + + long startingTxId = getLastAppliedTxId() + 1; + FSEditLogLoader loader = new FSEditLogLoader(namesystem); - long startingTxId = storage.getMostRecentCheckpointTxId() + 1; int numLoaded = 0; // Load latest edits for (File edits : editLogs) { @@ -655,17 +667,13 @@ public class FSImage implements Closeabl int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); startingTxId += thisNumLoaded; numLoaded += thisNumLoaded; + lastAppliedTxId += thisNumLoaded; editIn.close(); } // update the counts getFSNamesystem().dir.updateCountForINodeWithQuota(); - - // update the txid for the edit log - editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1); - - // If we loaded any edits, need to save. - return numLoaded > 0; + return numLoaded; } @@ -673,7 +681,7 @@ public class FSImage implements Closeabl * Load the image namespace from the given image file, verifying * it against the MD5 sum stored in its associated .md5 file. */ - void loadFSImage(File imageFile) throws IOException { + private void loadFSImage(File imageFile) throws IOException { MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile); if (expectedMD5 == null) { throw new IOException("No MD5 file found corresponding to image file " @@ -687,7 +695,7 @@ public class FSImage implements Closeabl * filenames and blocks. Return whether we should * "re-save" and consolidate the edit-logs */ - void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException { + private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException { FSImageFormat.Loader loader = new FSImageFormat.Loader( conf, getFSNamesystem()); loader.load(curFile); @@ -704,8 +712,9 @@ public class FSImage implements Closeabl } long txId = loader.getLoadedImageTxId(); + LOG.info("Loaded image for txid " + txId + " from " + curFile); + lastAppliedTxId = txId; storage.setMostRecentCheckpointTxId(txId); - editLog.setNextTxId(txId + 1); } @@ -718,10 +727,10 @@ public class FSImage implements Closeabl FSImageFormat.Saver saver = new FSImageFormat.Saver(); FSImageCompression compression = FSImageCompression.createCompression(conf); - saver.save(newFile, getFSNamesystem(), compression); + saver.save(newFile, txid, getFSNamesystem(), compression); MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest()); - storage.setMostRecentCheckpointTxId(editLog.getLastWrittenTxId()); + storage.setMostRecentCheckpointTxId(txid); } /** @@ -784,7 +793,7 @@ public class FSImage implements Closeabl boolean editLogWasOpen = editLog.isOpen(); if (editLogWasOpen) { - editLog.endCurrentLogSegment(); + editLog.endCurrentLogSegment(true); } long imageTxId = editLog.getLastWrittenTxId(); try { @@ -793,7 +802,7 @@ public class FSImage implements Closeabl } finally { if (editLogWasOpen) { - editLog.startLogSegment(imageTxId + 1); + editLog.startLogSegment(imageTxId + 1, true); // Take this opportunity to note the current transaction storage.writeTransactionIdFileToStorage(imageTxId + 1); } @@ -951,7 +960,6 @@ public class FSImage implements Closeabl // do not return image if there are no image directories needToReturnImg = false; CheckpointSignature sig = rollEditLog(); - getEditLog().logJSpoolStart(bnReg, nnReg); return new CheckpointCommand(sig, isImgObsolete, needToReturnImg); } @@ -1003,7 +1011,9 @@ public class FSImage implements Closeabl } synchronized public void close() throws IOException { - getEditLog().close(); + if (editLog != null) { // 2NN doesn't have any edit log + getEditLog().close(); + } storage.close(); } @@ -1054,4 +1064,8 @@ public class FSImage implements Closeabl public String getBlockPoolID() { return storage.getBlockPoolID(); } + + public synchronized long getLastAppliedTxId() { + return lastAppliedTxId; + } } Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu Jul 14 18:53:11 2011 @@ -539,6 +539,7 @@ class FSImageFormat { } void save(File newFile, + long txid, FSNamesystem sourceNamesystem, FSImageCompression compression) throws IOException { @@ -559,7 +560,6 @@ class FSImageFormat { .getStorage().getNamespaceID()); // TODO bad dependency out.writeLong(fsDir.rootDir.numItemsInTree()); out.writeLong(sourceNamesystem.getGenerationStamp()); - long txid = sourceNamesystem.getEditLog().getLastWrittenTxId(); out.writeLong(txid); // write compression info and set up compressed stream Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1146845&r1=1146844&r2=1146845&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original) +++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Thu Jul 14 18:53:11 2011 @@ -106,14 +106,15 @@ class FSImageTransactionalStorageInspect "not configured to contain images."); } } + } + - // Check for a seen_txid file, which marks a minimum transaction ID that - // must be included in our load plan. - try { - maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); - } catch (IOException ioe) { - LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); - } + // Check for a seen_txid file, which marks a minimum transaction ID that + // must be included in our load plan. + try { + maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); + } catch (IOException ioe) { + LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); } List editLogs = matchEditLogs(filesInStorage); @@ -215,14 +216,45 @@ class FSImageTransactionalStorageInspect } FoundFSImage recoveryImage = getLatestImage(); - long expectedTxId = recoveryImage.txId + 1; + LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE); + + return new TransactionalLoadPlan(recoveryImage, + logPlan); + } + + /** + * Plan which logs to load in order to bring the namespace up-to-date. + * Transactions will be considered in the range (sinceTxId, maxTxId] + * + * @param sinceTxId the highest txid that is already loaded + * (eg from the image checkpoint) + * @param maxStartTxId ignore any log files that start after this txid + */ + LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException { + long expectedTxId = sinceTxId + 1; List recoveryLogs = new ArrayList(); - SortedMap usefulGroups = logGroups.tailMap(expectedTxId); - LOG.debug("Excluded " + (logGroups.size() - usefulGroups.size()) + - " groups of logs because they start with a txid less than image " + - "txid " + recoveryImage.txId); + SortedMap tailGroups = logGroups.tailMap(expectedTxId); + if (logGroups.size() > tailGroups.size()) { + LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + + " groups of logs because they start with a txid less than image " + + "txid " + sinceTxId); + } + + SortedMap usefulGroups; + if (maxStartTxId > sinceTxId) { + usefulGroups = tailGroups.headMap(maxStartTxId); + } else { + usefulGroups = new TreeMap(); + } + + if (usefulGroups.size() > tailGroups.size()) { + LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + + " groups of logs because they start with a txid higher than max " + + "txid " + sinceTxId); + } + for (Map.Entry entry : usefulGroups.entrySet()) { long logStartTxId = entry.getKey(); @@ -251,7 +283,7 @@ class FSImageTransactionalStorageInspect long lastLogGroupStartTxId = usefulGroups.isEmpty() ? 0 : usefulGroups.lastKey(); - if (maxSeenTxId > recoveryImage.txId && + if (maxSeenTxId > sinceTxId && maxSeenTxId > lastLogGroupStartTxId) { String msg = "At least one storage directory indicated it has seen a " + "log segment starting at txid " + maxSeenTxId; @@ -263,9 +295,10 @@ class FSImageTransactionalStorageInspect } throw new IOException(msg); } - - return new TransactionalLoadPlan(recoveryImage, recoveryLogs, + + return new LogLoadPlan(recoveryLogs, Lists.newArrayList(usefulGroups.values())); + } @Override @@ -595,23 +628,18 @@ class FSImageTransactionalStorageInspect static class TransactionalLoadPlan extends LoadPlan { final FoundFSImage image; - final List editLogs; - final List logGroupsToRecover; + final LogLoadPlan logPlan; public TransactionalLoadPlan(FoundFSImage image, - List editLogs, - List logGroupsToRecover) { + LogLoadPlan logPlan) { super(); this.image = image; - this.editLogs = editLogs; - this.logGroupsToRecover = logGroupsToRecover; + this.logPlan = logPlan; } @Override boolean doRecovery() throws IOException { - for (LogGroup g : logGroupsToRecover) { - g.recover(); - } + logPlan.doRecovery(); return false; } @@ -622,11 +650,7 @@ class FSImageTransactionalStorageInspect @Override List getEditsFiles() { - List ret = new ArrayList(); - for (FoundEditLog log : editLogs) { - ret.add(log.getFile()); - } - return ret; + return logPlan.getEditsFiles(); } @Override @@ -634,4 +658,29 @@ class FSImageTransactionalStorageInspect return image.sd; } } + + static class LogLoadPlan { + final List editLogs; + final List logGroupsToRecover; + + LogLoadPlan(List editLogs, + List logGroupsToRecover) { + this.editLogs = editLogs; + this.logGroupsToRecover = logGroupsToRecover; + } + + public void doRecovery() throws IOException { + for (LogGroup g : logGroupsToRecover) { + g.recover(); + } + } + + public List getEditsFiles() { + List ret = new ArrayList(); + for (FoundEditLog log : editLogs) { + ret.add(log.getFile()); + } + return ret; + } + } }