Author: shv
Date: Wed Apr 21 01:15:25 2010
New Revision: 936131
URL: http://svn.apache.org/viewvc?rev=936131&view=rev
Log:
HDFS-909. Wait until edits syncing is finishes before purging edits. Contributed by Todd Lipcon.
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(with props)
Modified:
hadoop/hdfs/trunk/.eclipse.templates/.classpath
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/ivy/libraries.properties
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Modified: hadoop/hdfs/trunk/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/.eclipse.templates/.classpath?rev=936131&r1=936130&r2=936131&view=diff
==============================================================================
--- hadoop/hdfs/trunk/.eclipse.templates/.classpath (original)
+++ hadoop/hdfs/trunk/.eclipse.templates/.classpath Wed Apr 21 01:15:25 2010
@@ -34,7 +34,7 @@
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.5.8.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/slf4j-log4j12-1.4.3.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/xmlenc-0.52.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.0.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.2.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/aspectjrt-1.6.5.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.core.framework.uberjar.javaEE.14-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.ant-1.8.0.jar"/>
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=936131&r1=936130&r2=936131&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Apr 21 01:15:25 2010
@@ -831,6 +831,9 @@ Release 0.20.3 - Unreleased
HDFS-1041. DFSClient.getFileChecksum(..) should retry if connection to
the first datanode fails. (szetszwo)
+ HDFS-909. Wait until edits syncing is finishes before purging edits.
+ (Todd Lipcon via shv)
+
Release 0.20.2 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/ivy/libraries.properties?rev=936131&r1=936130&r2=936131&view=diff
==============================================================================
--- hadoop/hdfs/trunk/ivy/libraries.properties (original)
+++ hadoop/hdfs/trunk/ivy/libraries.properties Wed Apr 21 01:15:25 2010
@@ -77,4 +77,4 @@ xerces.version=1.4.4
aspectj.version=1.6.5
-mockito-all.version=1.8.0
+mockito-all.version=1.8.2
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=936131&r1=936130&r2=936131&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Apr
21 01:15:25 2010
@@ -113,7 +113,8 @@ public class FSEditLog {
private long lastPrintTime;
// is a sync currently running?
- private boolean isSyncRunning;
+ private volatile boolean isSyncRunning;
+
// these are statistics counters.
private long numTransactions; // number of transactions
@@ -159,6 +160,14 @@ public class FSEditLog {
return editStreams == null ? 0 : editStreams.size();
}
+ /**
+ * Return the currently active edit streams.
+ * This should be used only by unit tests.
+ */
+ ArrayList<EditLogOutputStream> getEditStreams() {
+ return editStreams;
+ }
+
boolean isOpen() {
return getNumEditStreams() > 0;
}
@@ -201,6 +210,8 @@ public class FSEditLog {
}
synchronized void createEditLogFile(File name) throws IOException {
+ waitForSyncToFinish();
+
EditLogOutputStream eStream = new EditLogFileOutputStream(name,
sizeOutputFlushBuffer);
eStream.create();
@@ -211,12 +222,7 @@ public class FSEditLog {
* Shutdown the file store.
*/
synchronized void close() {
- while (isSyncRunning) {
- try {
- wait(1000);
- } catch (InterruptedException ie) {
- }
- }
+ waitForSyncToFinish();
if (editStreams == null || editStreams.isEmpty()) {
return;
}
@@ -883,9 +889,52 @@ public class FSEditLog {
metrics.transactions.inc((end-start));
}
- //
- // Sync all modifications done by this thread.
- //
+ /**
+ * Blocks until all ongoing edits have been synced to disk.
+ * This differs from logSync in that it waits for edits that have been
+ * written by other threads, not just edits from the calling thread.
+ *
+ * NOTE: this should be done while holding the FSNamesystem lock, or
+ * else more operations can start writing while this is in progress.
+ */
+ void logSyncAll() throws IOException {
+ // Record the most recent transaction ID as our own id
+ synchronized (this) {
+ TransactionId id = myTransactionId.get();
+ id.txid = txid;
+ }
+ // Then make sure we're synced up to this point
+ logSync();
+ }
+
+ /**
+ * Sync all modifications done by this thread.
+ *
+ * The internal concurrency design of this class is as follows:
+ * - Log items are written synchronized into an in-memory buffer,
+ * and each assigned a transaction ID.
+ * - When a thread (client) would like to sync all of its edits, logSync()
+ * uses a ThreadLocal transaction ID to determine what edit number must
+ * be synced to.
+ * - The isSyncRunning volatile boolean tracks whether a sync is currently
+ * under progress.
+ *
+ * The data is double-buffered within each edit log implementation so that
+ * in-memory writing can occur in parallel with the on-disk writing.
+ *
+ * Each sync occurs in three steps:
+ * 1. synchronized, it swaps the double buffer and sets the isSyncRunning
+ * flag.
+ * 2. unsynchronized, it flushes the data to storage
+ * 3. synchronized, it resets the flag and notifies anyone waiting on the
+ * sync.
+ *
+ * The lack of synchronization on step 2 allows other threads to continue
+ * to write into the memory buffer while the sync is in progress.
+ * Because this step is unsynchronized, actions that need to avoid
+ * concurrency with sync() should be synchronized and also call
+ * waitForSyncToFinish() before assuming they are running alone.
+ */
public void logSync() throws IOException {
ArrayList<EditLogOutputStream> errorStreams = null;
long syncStart = 0;
@@ -1219,6 +1268,7 @@ public class FSEditLog {
* Closes the current edit log and opens edits.new.
*/
synchronized void rollEditLog() throws IOException {
+ waitForSyncToFinish();
Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
if(!it.hasNext())
return;
@@ -1251,6 +1301,8 @@ public class FSEditLog {
* @throws IOException
*/
synchronized void divertFileStreams(String dest) throws IOException {
+ waitForSyncToFinish();
+
assert getNumEditStreams() >= getNumEditsDirs() :
"Inconsistent number of streams";
ArrayList<EditLogOutputStream> errorStreams = null;
@@ -1287,10 +1339,25 @@ public class FSEditLog {
* Reopens the edits file.
*/
synchronized void purgeEditLog() throws IOException {
+ waitForSyncToFinish();
revertFileStreams(
Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
}
+
+ /**
+ * The actual sync activity happens while not synchronized on this object.
+ * Thus, synchronized activities that require that they are not concurrent
+ * with file operations should wait for any running sync to finish.
+ */
+ synchronized void waitForSyncToFinish() {
+ while (isSyncRunning) {
+ try {
+ wait(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+
/**
* Revert file streams from file edits.new back to file edits.<p>
* Close file streams, which are currently writing into getRoot()/source.
@@ -1300,6 +1367,8 @@ public class FSEditLog {
* @throws IOException
*/
synchronized void revertFileStreams(String source) throws IOException {
+ waitForSyncToFinish();
+
assert getNumEditStreams() >= getNumEditsDirs() :
"Inconsistent number of streams";
ArrayList<EditLogOutputStream> errorStreams = null;
@@ -1311,7 +1380,8 @@ public class FSEditLog {
EditLogOutputStream eStream = itE.next();
StorageDirectory sd = itD.next();
if(!eStream.getName().startsWith(sd.getRoot().getPath()))
- throw new IOException("Inconsistent order of edit streams: " + eStream);
+ throw new IOException("Inconsistent order of edit streams: " + eStream +
+ " does not start with " + sd.getRoot().getPath());
try {
// close old stream
closeStream(eStream);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=936131&r1=936130&r2=936131&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed
Apr 21 01:15:25 2010
@@ -3855,6 +3855,10 @@ public class FSNamesystem implements FSC
* @throws IOException
*/
synchronized void enterSafeMode() throws IOException {
+ // Ensure that any concurrent operations have been fully synced
+ // before entering safe mode. This ensures that the FSImage
+ // is entirely stable on disk as soon as we're in safe mode.
+ getEditLog().logSyncAll();
if (!isInSafeMode()) {
safeMode = new SafeModeInfo();
return;
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=936131&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
Wed Apr 21 01:15:25 2010
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * This class tests various synchronization bugs in FSEditLog rolling
+ * and namespace saving.
+ */
+public class TestEditLogRace {
+ private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
+
+ private static final String NAME_DIR =
+ MiniDFSCluster.getBaseDirectory() + "name1";
+
+ // This test creates NUM_THREADS threads and each thread continuously writes
+ // transactions
+ static final int NUM_THREADS = 16;
+
+ /**
+ * The number of times to roll the edit log during the test. Since this
+ * tests for a race condition, higher numbers are more likely to find
+ * a bug if it exists, but the test will take longer.
+ */
+ static final int NUM_ROLLS = 30;
+
+ /**
+ * The number of times to save the fsimage and create an empty edit log.
+ */
+ static final int NUM_SAVE_IMAGE = 30;
+
+ private List<Transactions> workers = new ArrayList<Transactions>();
+
+ private static final int NUM_DATA_NODES = 1;
+
+ /**
+ * Several of the test cases work by introducing a sleep
+ * into an operation that is usually fast, and then verifying
+ * that another operation blocks for at least this amount of time.
+ * This value needs to be significantly longer than the average
+ * time for an fsync() or enterSafeMode().
+ */
+ private static final int BLOCK_TIME = 10;
+
+ //
+ // an object that does a bunch of transactions
+ //
+ static class Transactions implements Runnable {
+ FSNamesystem namesystem;
+ short replication = 3;
+ long blockSize = 64;
+ volatile boolean stopped = false;
+ volatile Thread thr;
+ AtomicReference<Throwable> caught;
+
+ Transactions(FSNamesystem ns, AtomicReference<Throwable> caught) {
+ namesystem = ns;
+ this.caught = caught;
+ }
+
+ // add a bunch of transactions.
+ public void run() {
+ thr = Thread.currentThread();
+ PermissionStatus p = namesystem.createFsOwnerPermissions(
+ new FsPermission((short)0777));
+ int i = 0;
+ while (!stopped) {
+ try {
+ String dirname = "/thr-" + thr.getId() + "-dir-" + i;
+ namesystem.mkdirs(dirname, p, true);
+ namesystem.delete(dirname, true);
+ } catch (SafeModeException sme) {
+ // This is OK - the tests will bring NN in and out of safemode
+ } catch (Throwable e) {
+ LOG.warn("Got error in transaction thread", e);
+ caught.compareAndSet(null, e);
+ break;
+ }
+ i++;
+ }
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public Thread getThread() {
+ return thr;
+ }
+ }
+
+ private void startTransactionWorkers(FSNamesystem namesystem,
+ AtomicReference<Throwable> caughtErr) {
+ // Create threads and make them run transactions concurrently.
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Transactions trans = new Transactions(namesystem, caughtErr);
+ new Thread(trans, "TransactionThread-" + i).start();
+ workers.add(trans);
+ }
+ }
+
+ private void stopTransactionWorkers() {
+ // wait for all transactions to get over
+ for (Transactions worker : workers) {
+ worker.stop();
+ }
+
+ for (Transactions worker : workers) {
+ Thread thr = worker.getThread();
+ try {
+ if (thr != null) thr.join();
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+ /**
+ * Tests rolling edit logs while transactions are ongoing.
+ */
+ @Test
+ public void testEditLogRolling() throws Exception {
+ // start a cluster
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+
+
+ AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
+ try {
+ cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null);
+ cluster.waitActive();
+ fileSys = cluster.getFileSystem();
+ final FSNamesystem namesystem = cluster.getNamesystem();
+
+ FSImage fsimage = namesystem.getFSImage();
+ FSEditLog editLog = fsimage.getEditLog();
+
+ // set small size of flush buffer
+ editLog.setBufferCapacity(2048);
+ editLog.close();
+ editLog.open();
+
+ startTransactionWorkers(namesystem, caughtErr);
+
+ for (int i = 0; i < NUM_ROLLS && caughtErr.get() == null; i++) {
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {}
+
+ LOG.info("Starting roll " + i + ".");
+ editLog.rollEditLog();
+ LOG.info("Roll complete " + i + ".");
+
+ verifyEditLogs(namesystem, fsimage);
+
+ LOG.info("Starting purge " + i + ".");
+ editLog.purgeEditLog();
+ LOG.info("Complete purge " + i + ".");
+ }
+ } finally {
+ stopTransactionWorkers();
+ if (caughtErr.get() != null) {
+ throw new RuntimeException(caughtErr.get());
+ }
+
+ if(fileSys != null) fileSys.close();
+ if(cluster != null) cluster.shutdown();
+ }
+ }
+
+ private void verifyEditLogs(FSNamesystem namesystem, FSImage fsimage)
+ throws IOException {
+ // Verify that we can read in all the transactions that we have written.
+ // If there were any corruptions, it is likely that the reading in
+ // of these transactions will throw an exception.
+ for (Iterator<StorageDirectory> it =
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+ System.out.println("Verifying file: " + editFile);
+ int numEdits = namesystem.getEditLog().loadFSEdits(
+ new EditLogFileInputStream(editFile));
+ System.out.println("Number of edits: " + numEdits);
+ }
+ }
+
+ /**
+ * Tests saving fs image while transactions are ongoing.
+ */
+ @Test
+ public void testSaveNamespace() throws Exception {
+ // start a cluster
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+
+ AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
+ try {
+ cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null);
+ cluster.waitActive();
+ fileSys = cluster.getFileSystem();
+ final FSNamesystem namesystem = cluster.getNamesystem();
+
+ FSImage fsimage = namesystem.getFSImage();
+ FSEditLog editLog = fsimage.getEditLog();
+
+ // set small size of flush buffer
+ editLog.setBufferCapacity(2048);
+ editLog.close();
+ editLog.open();
+
+ startTransactionWorkers(namesystem, caughtErr);
+
+ for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {}
+
+
+ LOG.info("Save " + i + ": entering safe mode");
+ namesystem.enterSafeMode();
+
+ // Verify edit logs before the save
+ verifyEditLogs(namesystem, fsimage);
+
+ LOG.info("Save " + i + ": saving namespace");
+ namesystem.saveNamespace();
+ LOG.info("Save " + i + ": leaving safemode");
+
+ // Verify that edit logs post save are also not corrupt
+ verifyEditLogs(namesystem, fsimage);
+
+ namesystem.leaveSafeMode(false);
+ LOG.info("Save " + i + ": complete");
+
+ }
+ } finally {
+ stopTransactionWorkers();
+ if (caughtErr.get() != null) {
+ throw new RuntimeException(caughtErr.get());
+ }
+ if(fileSys != null) fileSys.close();
+ if(cluster != null) cluster.shutdown();
+ }
+ }
+
+ private Configuration getConf() {
+ Configuration conf = new HdfsConfiguration();
+ FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+ return conf;
+ }
+
+
+ /**
+ * The logSync() method in FSEditLog is unsynchronized whiel syncing
+ * so that other threads can concurrently enqueue edits while the prior
+ * sync is ongoing. This test checks that the log is saved correctly
+ * if the saveImage occurs while the syncing thread is in the unsynchronized middle section.
+ *
+ * This replicates the following manual test proposed by Konstantin:
+ * I start the name-node in debugger.
+ * I do -mkdir and stop the debugger in logSync() just before it does flush.
+ * Then I enter safe mode with another client
+ * I start saveNamepsace and stop the debugger in
+ * FSImage.saveFSImage() -> FSEditLog.createEditLogFile()
+ * -> EditLogFileOutputStream.create() ->
+ * after truncating the file but before writing LAYOUT_VERSION into it.
+ * Then I let logSync() run.
+ * Then I terminate the name-node.
+ * After that the name-node wont start, since the edits file is broken.
+ */
+ @Test
+ public void testSaveImageWhileSyncInProgress() throws Exception {
+ Configuration conf = getConf();
+ NameNode.initMetrics(conf, NamenodeRole.ACTIVE);
+ NameNode.format(conf);
+ final FSNamesystem namesystem = new FSNamesystem(conf);
+
+ try {
+ FSImage fsimage = namesystem.getFSImage();
+ FSEditLog editLog = fsimage.getEditLog();
+
+ ArrayList<EditLogOutputStream> streams = editLog.getEditStreams();
+ EditLogOutputStream spyElos = spy(streams.get(0));
+ streams.set(0, spyElos);
+
+ final AtomicReference<Throwable> deferredException =
+ new AtomicReference<Throwable>();
+ final CountDownLatch waitToEnterFlush = new CountDownLatch(1);
+
+ final Thread doAnEditThread = new Thread() {
+ public void run() {
+ try {
+ LOG.info("Starting mkdirs");
+ namesystem.mkdirs("/test",
+ new PermissionStatus("test","test", new FsPermission((short)00755)),
+ true);
+ LOG.info("mkdirs complete");
+ } catch (Throwable ioe) {
+ deferredException.set(ioe);
+ waitToEnterFlush.countDown();
+ }
+ }
+ };
+
+ Answer<Void> blockingFlush = new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ LOG.info("Flush called");
+ if (Thread.currentThread() == doAnEditThread) {
+ LOG.info("edit thread: Telling main thread we made it to flush section...");
+ // Signal to main thread that the edit thread is in the racy section
+ waitToEnterFlush.countDown();
+ LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
+ Thread.sleep(BLOCK_TIME*1000);
+ LOG.info("Going through to flush. This will allow the main thread to continue.");
+ }
+ invocation.callRealMethod();
+ LOG.info("Flush complete");
+ return null;
+ }
+ };
+ doAnswer(blockingFlush).when(spyElos).flush();
+
+ doAnEditThread.start();
+ // Wait for the edit thread to get to the logsync unsynchronized section
+ LOG.info("Main thread: waiting to enter flush...");
+ waitToEnterFlush.await();
+ assertNull(deferredException.get());
+ LOG.info("Main thread: detected that logSync is in unsynchronized section.");
+ LOG.info("Trying to enter safe mode.");
+ LOG.info("This should block for " + BLOCK_TIME + "sec, since flush will sleep that
long");
+
+ long st = System.currentTimeMillis();
+ namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ long et = System.currentTimeMillis();
+ LOG.info("Entered safe mode");
+ // Make sure we really waited for the flush to complete!
+ assertTrue(et - st > (BLOCK_TIME - 1)*1000);
+
+ // Once we're in safe mode, save namespace.
+ namesystem.saveNamespace();
+
+ LOG.info("Joining on edit thread...");
+ doAnEditThread.join();
+ assertNull(deferredException.get());
+
+ verifyEditLogs(namesystem, fsimage);
+ } finally {
+ LOG.info("Closing namesystem");
+ if(namesystem != null) namesystem.close();
+ }
+ }
+
+ /**
+ * Most of the FSNamesystem methods have a synchronized section where they
+ * update the name system itself and write to the edit log, and then
+ * unsynchronized, they call logSync. This test verifies that, if an
+ * operation has written to the edit log but not yet synced it,
+ * we wait for that sync before entering safe mode.
+ */
+ @Test
+ public void testSaveRightBeforeSync() throws Exception {
+ Configuration conf = getConf();
+ NameNode.initMetrics(conf, NamenodeRole.ACTIVE);
+ NameNode.format(conf);
+ final FSNamesystem namesystem = new FSNamesystem(conf);
+
+ try {
+ FSImage fsimage = namesystem.getFSImage();
+ FSEditLog editLog = spy(fsimage.getEditLog());
+ fsimage.editLog = editLog;
+
+ final AtomicReference<Throwable> deferredException =
+ new AtomicReference<Throwable>();
+ final CountDownLatch waitToEnterSync = new CountDownLatch(1);
+
+ final Thread doAnEditThread = new Thread() {
+ public void run() {
+ try {
+ LOG.info("Starting mkdirs");
+ namesystem.mkdirs("/test",
+ new PermissionStatus("test","test", new FsPermission((short)00755)),
+ true);
+ LOG.info("mkdirs complete");
+ } catch (Throwable ioe) {
+ deferredException.set(ioe);
+ waitToEnterSync.countDown();
+ }
+ }
+ };
+
+ Answer<Void> blockingSync = new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ LOG.info("logSync called");
+ if (Thread.currentThread() == doAnEditThread) {
+ LOG.info("edit thread: Telling main thread we made it just before logSync...");
+ waitToEnterSync.countDown();
+ LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
+ Thread.sleep(BLOCK_TIME*1000);
+ LOG.info("Going through to logSync. This will allow the main thread to continue.");
+ }
+ invocation.callRealMethod();
+ LOG.info("logSync complete");
+ return null;
+ }
+ };
+ doAnswer(blockingSync).when(editLog).logSync();
+
+ doAnEditThread.start();
+ LOG.info("Main thread: waiting to just before logSync...");
+ waitToEnterSync.await();
+ assertNull(deferredException.get());
+ LOG.info("Main thread: detected that logSync about to be called.");
+ LOG.info("Trying to enter safe mode.");
+ LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
+
+ long st = System.currentTimeMillis();
+ namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ long et = System.currentTimeMillis();
+ LOG.info("Entered safe mode");
+ // Make sure we really waited for the flush to complete!
+ assertTrue(et - st > (BLOCK_TIME - 1)*1000);
+
+ // Once we're in safe mode, save namespace.
+ namesystem.saveNamespace();
+
+ LOG.info("Joining on edit thread...");
+ doAnEditThread.join();
+ assertNull(deferredException.get());
+
+ verifyEditLogs(namesystem, fsimage);
+ } finally {
+ LOG.info("Closing namesystem");
+ if(namesystem != null) namesystem.close();
+ }
+ }
+}
Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
|