Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 46162 invoked from network); 21 Apr 2010 03:03:16 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 21 Apr 2010 03:03:16 -0000 Received: (qmail 66329 invoked by uid 500); 21 Apr 2010 03:03:16 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 66257 invoked by uid 500); 21 Apr 2010 03:03:14 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 66249 invoked by uid 99); 21 Apr 2010 03:03:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Apr 2010 03:03:14 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=10.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Apr 2010 03:03:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5A8EC238897D; Wed, 21 Apr 2010 03:02:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r936145 - in /hadoop/hdfs/branches/branch-0.21: ./ ivy/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/ Date: Wed, 21 Apr 2010 03:02:25 -0000 To: hdfs-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100421030225.5A8EC238897D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Wed Apr 21 03:02:24 2010 New Revision: 936145 URL: http://svn.apache.org/viewvc?rev=936145&view=rev Log: HDFS-909. Merge -r 936130:936131 from trunk to branch-0.21. Added: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (with props) Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt hadoop/hdfs/branches/branch-0.21/ivy/libraries.properties hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=936145&r1=936144&r2=936145&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Wed Apr 21 03:02:24 2010 @@ -570,6 +570,9 @@ Release 0.20.3 - Unreleased HDFS-1041. DFSClient.getFileChecksum(..) should retry if connection to the first datanode fails. (szetszwo) + HDFS-909. Wait until edits syncing is finishes before purging edits. + (Todd Lipcon via shv) + Release 0.20.2 - Unreleased IMPROVEMENTS Modified: hadoop/hdfs/branches/branch-0.21/ivy/libraries.properties URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/ivy/libraries.properties?rev=936145&r1=936144&r2=936145&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/ivy/libraries.properties (original) +++ hadoop/hdfs/branches/branch-0.21/ivy/libraries.properties Wed Apr 21 03:02:24 2010 @@ -77,4 +77,4 @@ xerces.version=1.4.4 aspectj.version=1.6.5 -mockito-all.version=1.8.0 +mockito-all.version=1.8.2 Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=936145&r1=936144&r2=936145&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Apr 21 03:02:24 2010 @@ -101,7 +101,8 @@ public class FSEditLog { private long lastPrintTime; // is a sync currently running? - private boolean isSyncRunning; + private volatile boolean isSyncRunning; + // these are statistics counters. private long numTransactions; // number of transactions @@ -147,6 +148,14 @@ public class FSEditLog { return editStreams == null ? 0 : editStreams.size(); } + /** + * Return the currently active edit streams. + * This should be used only by unit tests. + */ + ArrayList getEditStreams() { + return editStreams; + } + boolean isOpen() { return getNumEditStreams() > 0; } @@ -189,6 +198,8 @@ public class FSEditLog { } synchronized void createEditLogFile(File name) throws IOException { + waitForSyncToFinish(); + EditLogOutputStream eStream = new EditLogFileOutputStream(name, sizeOutputFlushBuffer); eStream.create(); @@ -199,12 +210,7 @@ public class FSEditLog { * Shutdown the file store. */ synchronized void close() { - while (isSyncRunning) { - try { - wait(1000); - } catch (InterruptedException ie) { - } - } + waitForSyncToFinish(); if (editStreams == null || editStreams.isEmpty()) { return; } @@ -750,9 +756,52 @@ public class FSEditLog { metrics.transactions.inc((end-start)); } - // - // Sync all modifications done by this thread. - // + /** + * Blocks until all ongoing edits have been synced to disk. + * This differs from logSync in that it waits for edits that have been + * written by other threads, not just edits from the calling thread. + * + * NOTE: this should be done while holding the FSNamesystem lock, or + * else more operations can start writing while this is in progress. + */ + void logSyncAll() throws IOException { + // Record the most recent transaction ID as our own id + synchronized (this) { + TransactionId id = myTransactionId.get(); + id.txid = txid; + } + // Then make sure we're synced up to this point + logSync(); + } + + /** + * Sync all modifications done by this thread. + * + * The internal concurrency design of this class is as follows: + * - Log items are written synchronized into an in-memory buffer, + * and each assigned a transaction ID. + * - When a thread (client) would like to sync all of its edits, logSync() + * uses a ThreadLocal transaction ID to determine what edit number must + * be synced to. + * - The isSyncRunning volatile boolean tracks whether a sync is currently + * under progress. + * + * The data is double-buffered within each edit log implementation so that + * in-memory writing can occur in parallel with the on-disk writing. + * + * Each sync occurs in three steps: + * 1. synchronized, it swaps the double buffer and sets the isSyncRunning + * flag. + * 2. unsynchronized, it flushes the data to storage + * 3. synchronized, it resets the flag and notifies anyone waiting on the + * sync. + * + * The lack of synchronization on step 2 allows other threads to continue + * to write into the memory buffer while the sync is in progress. + * Because this step is unsynchronized, actions that need to avoid + * concurrency with sync() should be synchronized and also call + * waitForSyncToFinish() before assuming they are running alone. + */ public void logSync() throws IOException { ArrayList errorStreams = null; long syncStart = 0; @@ -1020,6 +1069,7 @@ public class FSEditLog { * Closes the current edit log and opens edits.new. */ synchronized void rollEditLog() throws IOException { + waitForSyncToFinish(); Iterator it = fsimage.dirIterator(NameNodeDirType.EDITS); if(!it.hasNext()) return; @@ -1052,6 +1102,8 @@ public class FSEditLog { * @throws IOException */ synchronized void divertFileStreams(String dest) throws IOException { + waitForSyncToFinish(); + assert getNumEditStreams() >= getNumEditsDirs() : "Inconsistent number of streams"; ArrayList errorStreams = null; @@ -1088,10 +1140,25 @@ public class FSEditLog { * Reopens the edits file. */ synchronized void purgeEditLog() throws IOException { + waitForSyncToFinish(); revertFileStreams( Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName()); } + + /** + * The actual sync activity happens while not synchronized on this object. + * Thus, synchronized activities that require that they are not concurrent + * with file operations should wait for any running sync to finish. + */ + synchronized void waitForSyncToFinish() { + while (isSyncRunning) { + try { + wait(1000); + } catch (InterruptedException ie) {} + } + } + /** * Revert file streams from file edits.new back to file edits.

* Close file streams, which are currently writing into getRoot()/source. @@ -1101,6 +1168,8 @@ public class FSEditLog { * @throws IOException */ synchronized void revertFileStreams(String source) throws IOException { + waitForSyncToFinish(); + assert getNumEditStreams() >= getNumEditsDirs() : "Inconsistent number of streams"; ArrayList errorStreams = null; @@ -1112,7 +1181,8 @@ public class FSEditLog { EditLogOutputStream eStream = itE.next(); StorageDirectory sd = itD.next(); if(!eStream.getName().startsWith(sd.getRoot().getPath())) - throw new IOException("Inconsistent order of edit streams: " + eStream); + throw new IOException("Inconsistent order of edit streams: " + eStream + + " does not start with " + sd.getRoot().getPath()); try { // close old stream closeStream(eStream); Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=936145&r1=936144&r2=936145&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Apr 21 03:02:24 2010 @@ -3577,6 +3577,10 @@ public class FSNamesystem implements FSC * @throws IOException */ synchronized void enterSafeMode() throws IOException { + // Ensure that any concurrent operations have been fully synced + // before entering safe mode. This ensures that the FSImage + // is entirely stable on disk as soon as we're in safe mode. + getEditLog().logSyncAll(); if (!isInSafeMode()) { safeMode = new SafeModeInfo(); return; Added: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=936145&view=auto ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (added) +++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java Wed Apr 21 03:02:24 2010 @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.*; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType; +import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.*; + +/** + * This class tests various synchronization bugs in FSEditLog rolling + * and namespace saving. + */ +public class TestEditLogRace { + private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); + + private static final String NAME_DIR = + MiniDFSCluster.getBaseDirectory() + "name1"; + + // This test creates NUM_THREADS threads and each thread continuously writes + // transactions + static final int NUM_THREADS = 16; + + /** + * The number of times to roll the edit log during the test. Since this + * tests for a race condition, higher numbers are more likely to find + * a bug if it exists, but the test will take longer. + */ + static final int NUM_ROLLS = 30; + + /** + * The number of times to save the fsimage and create an empty edit log. + */ + static final int NUM_SAVE_IMAGE = 30; + + private List workers = new ArrayList(); + + private static final int NUM_DATA_NODES = 1; + + /** + * Several of the test cases work by introducing a sleep + * into an operation that is usually fast, and then verifying + * that another operation blocks for at least this amount of time. + * This value needs to be significantly longer than the average + * time for an fsync() or enterSafeMode(). + */ + private static final int BLOCK_TIME = 10; + + // + // an object that does a bunch of transactions + // + static class Transactions implements Runnable { + FSNamesystem namesystem; + short replication = 3; + long blockSize = 64; + volatile boolean stopped = false; + volatile Thread thr; + AtomicReference caught; + + Transactions(FSNamesystem ns, AtomicReference caught) { + namesystem = ns; + this.caught = caught; + } + + // add a bunch of transactions. + public void run() { + thr = Thread.currentThread(); + PermissionStatus p = namesystem.createFsOwnerPermissions( + new FsPermission((short)0777)); + int i = 0; + while (!stopped) { + try { + UserGroupInformation.setCurrentUser( + UserGroupInformation.login(new HdfsConfiguration())); + String dirname = "/thr-" + thr.getId() + "-dir-" + i; + namesystem.mkdirs(dirname, p, true); + namesystem.delete(dirname, true); + } catch (SafeModeException sme) { + // This is OK - the tests will bring NN in and out of safemode + } catch (Throwable e) { + LOG.warn("Got error in transaction thread", e); + caught.compareAndSet(null, e); + break; + } + i++; + } + } + + public void stop() { + stopped = true; + } + + public Thread getThread() { + return thr; + } + } + + private void startTransactionWorkers(FSNamesystem namesystem, + AtomicReference caughtErr) { + // Create threads and make them run transactions concurrently. + for (int i = 0; i < NUM_THREADS; i++) { + Transactions trans = new Transactions(namesystem, caughtErr); + new Thread(trans, "TransactionThread-" + i).start(); + workers.add(trans); + } + } + + private void stopTransactionWorkers() { + // wait for all transactions to get over + for (Transactions worker : workers) { + worker.stop(); + } + + for (Transactions worker : workers) { + Thread thr = worker.getThread(); + try { + if (thr != null) thr.join(); + } catch (InterruptedException ie) {} + } + } + + /** + * Tests rolling edit logs while transactions are ongoing. + */ + @Test + public void testEditLogRolling() throws Exception { + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + FileSystem fileSys = null; + + + AtomicReference caughtErr = new AtomicReference(); + try { + cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null); + cluster.waitActive(); + fileSys = cluster.getFileSystem(); + final FSNamesystem namesystem = cluster.getNamesystem(); + + FSImage fsimage = namesystem.getFSImage(); + FSEditLog editLog = fsimage.getEditLog(); + + // set small size of flush buffer + editLog.setBufferCapacity(2048); + editLog.close(); + editLog.open(); + + startTransactionWorkers(namesystem, caughtErr); + + for (int i = 0; i < NUM_ROLLS && caughtErr.get() == null; i++) { + try { + Thread.sleep(20); + } catch (InterruptedException e) {} + + LOG.info("Starting roll " + i + "."); + editLog.rollEditLog(); + LOG.info("Roll complete " + i + "."); + + verifyEditLogs(namesystem, fsimage); + + LOG.info("Starting purge " + i + "."); + editLog.purgeEditLog(); + LOG.info("Complete purge " + i + "."); + } + } finally { + stopTransactionWorkers(); + if (caughtErr.get() != null) { + throw new RuntimeException(caughtErr.get()); + } + + if(fileSys != null) fileSys.close(); + if(cluster != null) cluster.shutdown(); + } + } + + private void verifyEditLogs(FSNamesystem namesystem, FSImage fsimage) + throws IOException { + // Verify that we can read in all the transactions that we have written. + // If there were any corruptions, it is likely that the reading in + // of these transactions will throw an exception. + for (Iterator it = + fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { + File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); + System.out.println("Verifying file: " + editFile); + int numEdits = namesystem.getEditLog().loadFSEdits( + new EditLogFileInputStream(editFile)); + System.out.println("Number of edits: " + numEdits); + } + } + + /** + * Tests saving fs image while transactions are ongoing. + */ + @Test + public void testSaveNamespace() throws Exception { + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + FileSystem fileSys = null; + + AtomicReference caughtErr = new AtomicReference(); + try { + cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null); + cluster.waitActive(); + fileSys = cluster.getFileSystem(); + final FSNamesystem namesystem = cluster.getNamesystem(); + + FSImage fsimage = namesystem.getFSImage(); + FSEditLog editLog = fsimage.getEditLog(); + + // set small size of flush buffer + editLog.setBufferCapacity(2048); + editLog.close(); + editLog.open(); + + startTransactionWorkers(namesystem, caughtErr); + + for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) { + try { + Thread.sleep(20); + } catch (InterruptedException e) {} + + + LOG.info("Save " + i + ": entering safe mode"); + namesystem.enterSafeMode(); + + // Verify edit logs before the save + verifyEditLogs(namesystem, fsimage); + + LOG.info("Save " + i + ": saving namespace"); + namesystem.saveNamespace(); + LOG.info("Save " + i + ": leaving safemode"); + + // Verify that edit logs post save are also not corrupt + verifyEditLogs(namesystem, fsimage); + + namesystem.leaveSafeMode(false); + LOG.info("Save " + i + ": complete"); + + } + } finally { + stopTransactionWorkers(); + if (caughtErr.get() != null) { + throw new RuntimeException(caughtErr.get()); + } + if(fileSys != null) fileSys.close(); + if(cluster != null) cluster.shutdown(); + } + } + + private Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + return conf; + } + + + /** + * The logSync() method in FSEditLog is unsynchronized whiel syncing + * so that other threads can concurrently enqueue edits while the prior + * sync is ongoing. This test checks that the log is saved correctly + * if the saveImage occurs while the syncing thread is in the unsynchronized middle section. + * + * This replicates the following manual test proposed by Konstantin: + * I start the name-node in debugger. + * I do -mkdir and stop the debugger in logSync() just before it does flush. + * Then I enter safe mode with another client + * I start saveNamepsace and stop the debugger in + * FSImage.saveFSImage() -> FSEditLog.createEditLogFile() + * -> EditLogFileOutputStream.create() -> + * after truncating the file but before writing LAYOUT_VERSION into it. + * Then I let logSync() run. + * Then I terminate the name-node. + * After that the name-node wont start, since the edits file is broken. + */ + @Test + public void testSaveImageWhileSyncInProgress() throws Exception { + Configuration conf = getConf(); + NameNode.initMetrics(conf, NamenodeRole.ACTIVE); + NameNode.format(conf); + final FSNamesystem namesystem = new FSNamesystem(conf); + + try { + FSImage fsimage = namesystem.getFSImage(); + FSEditLog editLog = fsimage.getEditLog(); + + ArrayList streams = editLog.getEditStreams(); + EditLogOutputStream spyElos = spy(streams.get(0)); + streams.set(0, spyElos); + + final AtomicReference deferredException = + new AtomicReference(); + final CountDownLatch waitToEnterFlush = new CountDownLatch(1); + + final Thread doAnEditThread = new Thread() { + public void run() { + try { + LOG.info("Starting mkdirs"); + namesystem.mkdirs("/test", + new PermissionStatus("test","test", new FsPermission((short)00755)), + true); + LOG.info("mkdirs complete"); + } catch (Throwable ioe) { + deferredException.set(ioe); + waitToEnterFlush.countDown(); + } + } + }; + + Answer blockingFlush = new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + LOG.info("Flush called"); + if (Thread.currentThread() == doAnEditThread) { + LOG.info("edit thread: Telling main thread we made it to flush section..."); + // Signal to main thread that the edit thread is in the racy section + waitToEnterFlush.countDown(); + LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); + Thread.sleep(BLOCK_TIME*1000); + LOG.info("Going through to flush. This will allow the main thread to continue."); + } + invocation.callRealMethod(); + LOG.info("Flush complete"); + return null; + } + }; + doAnswer(blockingFlush).when(spyElos).flush(); + + doAnEditThread.start(); + // Wait for the edit thread to get to the logsync unsynchronized section + LOG.info("Main thread: waiting to enter flush..."); + waitToEnterFlush.await(); + assertNull(deferredException.get()); + LOG.info("Main thread: detected that logSync is in unsynchronized section."); + LOG.info("Trying to enter safe mode."); + LOG.info("This should block for " + BLOCK_TIME + "sec, since flush will sleep that long"); + + long st = System.currentTimeMillis(); + namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + long et = System.currentTimeMillis(); + LOG.info("Entered safe mode"); + // Make sure we really waited for the flush to complete! + assertTrue(et - st > (BLOCK_TIME - 1)*1000); + + // Once we're in safe mode, save namespace. + namesystem.saveNamespace(); + + LOG.info("Joining on edit thread..."); + doAnEditThread.join(); + assertNull(deferredException.get()); + + verifyEditLogs(namesystem, fsimage); + } finally { + LOG.info("Closing namesystem"); + if(namesystem != null) namesystem.close(); + } + } + + /** + * Most of the FSNamesystem methods have a synchronized section where they + * update the name system itself and write to the edit log, and then + * unsynchronized, they call logSync. This test verifies that, if an + * operation has written to the edit log but not yet synced it, + * we wait for that sync before entering safe mode. + */ + @Test + public void testSaveRightBeforeSync() throws Exception { + Configuration conf = getConf(); + NameNode.initMetrics(conf, NamenodeRole.ACTIVE); + NameNode.format(conf); + final FSNamesystem namesystem = new FSNamesystem(conf); + + try { + FSImage fsimage = namesystem.getFSImage(); + FSEditLog editLog = spy(fsimage.getEditLog()); + fsimage.editLog = editLog; + + final AtomicReference deferredException = + new AtomicReference(); + final CountDownLatch waitToEnterSync = new CountDownLatch(1); + + final Thread doAnEditThread = new Thread() { + public void run() { + try { + LOG.info("Starting mkdirs"); + namesystem.mkdirs("/test", + new PermissionStatus("test","test", new FsPermission((short)00755)), + true); + LOG.info("mkdirs complete"); + } catch (Throwable ioe) { + deferredException.set(ioe); + waitToEnterSync.countDown(); + } + } + }; + + Answer blockingSync = new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + LOG.info("logSync called"); + if (Thread.currentThread() == doAnEditThread) { + LOG.info("edit thread: Telling main thread we made it just before logSync..."); + waitToEnterSync.countDown(); + LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); + Thread.sleep(BLOCK_TIME*1000); + LOG.info("Going through to logSync. This will allow the main thread to continue."); + } + invocation.callRealMethod(); + LOG.info("logSync complete"); + return null; + } + }; + doAnswer(blockingSync).when(editLog).logSync(); + + doAnEditThread.start(); + LOG.info("Main thread: waiting to just before logSync..."); + waitToEnterSync.await(); + assertNull(deferredException.get()); + LOG.info("Main thread: detected that logSync about to be called."); + LOG.info("Trying to enter safe mode."); + LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits"); + + long st = System.currentTimeMillis(); + namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + long et = System.currentTimeMillis(); + LOG.info("Entered safe mode"); + // Make sure we really waited for the flush to complete! + assertTrue(et - st > (BLOCK_TIME - 1)*1000); + + // Once we're in safe mode, save namespace. + namesystem.saveNamespace(); + + LOG.info("Joining on edit thread..."); + doAnEditThread.join(); + assertNull(deferredException.get()); + + verifyEditLogs(namesystem, fsimage); + } finally { + LOG.info("Closing namesystem"); + if(namesystem != null) namesystem.close(); + } + } +} Propchange: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java ------------------------------------------------------------------------------ svn:mime-type = text/plain