hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1068721 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Date Wed, 09 Feb 2011 01:16:56 GMT
Author: todd
Date: Wed Feb  9 01:16:55 2011
New Revision: 1068721

URL: http://svn.apache.org/viewvc?rev=1068721&view=rev
Log:
HDFS-1597. Batched edit log syncs can reset synctxid and throw assertions. Contributed by
Todd Lipcon.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1068721&r1=1068720&r2=1068721&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Feb  9 01:16:55 2011
@@ -534,6 +534,9 @@ Release 0.22.0 - Unreleased
     HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause
     deadlock (todd)
 
+    HDFS-1597. Batched edit log syncs can reset synctxid and throw assertions
+    (todd)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1068721&r1=1068720&r2=1068721&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Feb
 9 01:16:55 2011
@@ -446,7 +446,6 @@ public class FSEditLog implements NNStor
     try {
       synchronized (this) {
         try {
-        assert editStreams.size() > 0 : "no editlog streams";
         printStatistics(false);
   
         // if somebody is already syncing, then wait
@@ -473,6 +472,7 @@ public class FSEditLog implements NNStor
         sync = true;
   
         // swap buffers
+        assert editStreams.size() > 0 : "no editlog streams";
         for(EditLogOutputStream eStream : editStreams) {
           try {
             eStream.setReadyToFlush();
@@ -518,8 +518,8 @@ public class FSEditLog implements NNStor
     } finally {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
-        synctxid = syncStart;
         if (sync) {
+          synctxid = syncStart;
           isSyncRunning = false;
         }
         this.notifyAll();
@@ -955,6 +955,15 @@ public class FSEditLog implements NNStor
     return 0;
   }
 
+  /**
+   * Return the txid of the last synced transaction.
+   * For test use only
+   */
+  synchronized long getSyncTxId() {
+    return synctxid;
+  }
+
+
   // sets the initial capacity of the flush buffer.
   public void setBufferCapacity(int size) {
     sizeOutputFlushBuffer = size;

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1068721&r1=1068720&r2=1068721&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Wed Feb  9 01:16:55 2011
@@ -21,6 +21,9 @@ import junit.framework.TestCase;
 import java.io.*;
 import java.net.URI;
 import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
@@ -31,12 +34,16 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+ 
+import org.mockito.Mockito;
 
 /**
  * This class tests the creation and validation of a checkpoint.
  */
 public class TestEditLog extends TestCase {
-  static final int NUM_DATA_NODES = 1;
+  static final int NUM_DATA_NODES = 0;
 
   // This test creates NUM_THREADS threads and each thread does
   // 2 * NUM_TRANSACTIONS Transactions concurrently.
@@ -141,7 +148,7 @@ public class TestEditLog extends TestCas
       FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       for (Iterator<StorageDirectory> it = 
               fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
+        File editFile = NNStorage.getStorageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
         int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));
@@ -159,4 +166,141 @@ public class TestEditLog extends TestCas
       if(cluster != null) cluster.shutdown();
     }
   }
+
+  private void doLogEdit(ExecutorService exec, final FSEditLog log,
+    final String filename) throws Exception
+  {
+    exec.submit(new Callable<Void>() {
+      public Void call() {
+        log.logSetReplication(filename, (short)1);
+        return null;
+      }
+    }).get();
+  }
+  
+  private void doCallLogSync(ExecutorService exec, final FSEditLog log)
+    throws Exception
+  {
+    exec.submit(new Callable<Void>() {
+      public Void call() {
+        log.logSync();
+        return null;
+      }
+    }).get();
+  }
+
+  private void doCallLogSyncAll(ExecutorService exec, final FSEditLog log)
+    throws Exception
+  {
+    exec.submit(new Callable<Void>() {
+      public Void call() throws Exception {
+        log.logSyncAll();
+        return null;
+      }
+    }).get();
+  }
+
+  public void testSyncBatching() throws Exception {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    ExecutorService threadA = Executors.newSingleThreadExecutor();
+    ExecutorService threadB = Executors.newSingleThreadExecutor();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      FSImage fsimage = namesystem.getFSImage();
+      final FSEditLog editLog = fsimage.getEditLog();
+
+      assertEquals("should start with no txids synced",
+        0, editLog.getSyncTxId());
+      
+      // Log an edit from thread A
+      doLogEdit(threadA, editLog, "thread-a 1");
+      assertEquals("logging edit without syncing should do not affect txid",
+        0, editLog.getSyncTxId());
+
+      // Log an edit from thread B
+      doLogEdit(threadB, editLog, "thread-b 1");
+      assertEquals("logging edit without syncing should do not affect txid",
+        0, editLog.getSyncTxId());
+
+      // Now ask to sync edit from B, which should sync both edits.
+      doCallLogSync(threadB, editLog);
+      assertEquals("logSync from second thread should bump txid up to 2",
+        2, editLog.getSyncTxId());
+
+      // Now ask to sync edit from A, which was already batched in - thus
+      // it should increment the batch count metric
+      NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+      metrics.transactionsBatchedInSync = Mockito.mock(MetricsTimeVaryingInt.class);
+
+      doCallLogSync(threadA, editLog);
+      assertEquals("logSync from first thread shouldn't change txid",
+        2, editLog.getSyncTxId());
+
+      //Should have incremented the batch count exactly once
+      Mockito.verify(metrics.transactionsBatchedInSync,
+                    Mockito.times(1)).inc();
+    } finally {
+      threadA.shutdown();
+      threadB.shutdown();
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Test what happens with the following sequence:
+   *
+   *  Thread A writes edit
+   *  Thread B calls logSyncAll
+   *           calls close() on stream
+   *  Thread A calls logSync
+   *
+   * This sequence is legal and can occur if enterSafeMode() is closely
+   * followed by saveNamespace.
+   */
+  public void testBatchedSyncWithClosedLogs() throws Exception {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    ExecutorService threadA = Executors.newSingleThreadExecutor();
+    ExecutorService threadB = Executors.newSingleThreadExecutor();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      FSImage fsimage = namesystem.getFSImage();
+      final FSEditLog editLog = fsimage.getEditLog();
+
+      // Log an edit from thread A
+      doLogEdit(threadA, editLog, "thread-a 1");
+      assertEquals("logging edit without syncing should do not affect txid",
+        0, editLog.getSyncTxId());
+
+      // logSyncAll in Thread B
+      doCallLogSyncAll(threadB, editLog);
+      assertEquals("logSyncAll should sync thread A's transaction",
+        1, editLog.getSyncTxId());
+
+      // Close edit log
+      editLog.close();
+
+      // Ask thread A to finish sync (which should be a no-op)
+      doCallLogSync(threadA, editLog);
+    } finally {
+      threadA.shutdown();
+      threadB.shutdown();
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
 }



Mime
View raw message