hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvasu...@apache.org
Subject [35/50] [abbrv] hadoop git commit: HDFS-8964. When validating the edit log, do not read at or beyond the file offset that is being written (Zhe Zhang via Colin P. McCabe)
Date Mon, 07 Sep 2015 15:46:37 GMT
HDFS-8964. When validating the edit log, do not read at or beyond the file offset that is being
written (Zhe Zhang via Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53c38cc8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53c38cc8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53c38cc8

Branch: refs/heads/YARN-3926
Commit: 53c38cc89ab979ec47557dcfa7affbad20578c0a
Parents: 524ba87
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Thu Sep 3 11:22:47 2015 -0700
Committer: Colin Patrick Mccabe <cmccabe@cloudera.com>
Committed: Thu Sep 3 11:22:47 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSEditLogTestUtil.java |   3 +-
 .../hadoop/hdfs/qjournal/server/Journal.java    |  22 ++--
 .../server/namenode/EditLogFileInputStream.java |  15 ++-
 .../hadoop/hdfs/server/namenode/FSEditLog.java  |  10 ++
 .../hdfs/server/namenode/FSEditLogLoader.java   |  12 ++-
 .../server/namenode/FileJournalManager.java     |  39 +++++--
 .../hdfs/server/namenode/SecondaryNameNode.java |   2 +-
 .../TestCheckPointForSecurityTokens.java        |   4 +-
 .../hdfs/server/namenode/TestEditLog.java       | 103 ++++++++++++++++++-
 .../server/namenode/TestFSEditLogLoader.java    |  13 ++-
 11 files changed, 199 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 275dce2..afc6cf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1289,6 +1289,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-9009. Send metrics logs to NullAppender by default. (Arpit Agarwal)
 
+    HDFS-8964. When validating the edit log, do not read at or beyond the file
+    offset that is being written (Zhe Zhang via Colin P. McCabe)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
index a46f9cf..e5b9d01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
@@ -33,7 +33,8 @@ public class FSEditLogTestUtil {
 
   public static long countTransactionsInStream(EditLogInputStream in) 
       throws IOException {
-    FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
+    FSEditLogLoader.EditLogValidation validation =
+        FSEditLogLoader.validateEditLog(in, Long.MAX_VALUE);
     return (validation.getEndTxId() - in.getFirstTxId()) + 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 2953055..813f267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -151,7 +151,7 @@ public class Journal implements Closeable {
     
     EditLogFile latest = scanStorageForLatestEdits();
     if (latest != null) {
-      highestWrittenTxId = latest.getLastTxId();
+      updateHighestWrittenTxId(latest.getLastTxId());
     }
   }
 
@@ -266,7 +266,17 @@ public class Journal implements Closeable {
   synchronized long getHighestWrittenTxId() {
     return highestWrittenTxId;
   }
-  
+
+  /**
+   * Update the highest Tx ID that has been written to the journal. Also update
+   * the {@link FileJournalManager#lastReadableTxId} of the underlying fjm.
+   * @param val The new value
+   */
+  private void updateHighestWrittenTxId(long val) {
+    highestWrittenTxId = val;
+    fjm.setLastReadableTxId(val);
+  }
+
   @VisibleForTesting
   JournalMetrics getMetricsForTests() {
     return metrics;
@@ -399,7 +409,7 @@ public class Journal implements Closeable {
     metrics.bytesWritten.incr(records.length);
     metrics.txnsWritten.incr(numTxns);
     
-    highestWrittenTxId = lastTxnId;
+    updateHighestWrittenTxId(lastTxnId);
     nextTxId = lastTxnId + 1;
   }
 
@@ -782,8 +792,8 @@ public class Journal implements Closeable {
             ": no current segment in place");
         
         // Update the highest txid for lag metrics
-        highestWrittenTxId = Math.max(segment.getEndTxId(),
-            highestWrittenTxId);
+        updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
+            highestWrittenTxId));
       } else {
         LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
             ": old segment " + TextFormat.shortDebugString(currentSegment) +
@@ -812,7 +822,7 @@ public class Journal implements Closeable {
         // If we're shortening the log, update our highest txid
         // used for lag metrics.
         if (txnRange(currentSegment).containsLong(highestWrittenTxId)) {
-          highestWrittenTxId = segment.getEndTxId();
+          updateHighestWrittenTxId(segment.getEndTxId());
         }
       }
       syncedFile = syncLog(reqInfo, segment, fromUrl);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 73a162e..3bf0ab4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -300,8 +300,17 @@ public class EditLogFileInputStream extends EditLogInputStream {
     return getName();
   }
 
-  static FSEditLogLoader.EditLogValidation validateEditLog(File file)
-      throws IOException {
+  /**
+   * @param file File being validated.
+   * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
+   *                          returns after reading this or a higher ID.
+   *                          The file portion beyond this ID is potentially
+   *                          being updated.
+   * @return Result of the validation
+   * @throws IOException
+   */
+  static FSEditLogLoader.EditLogValidation validateEditLog(File file,
+      long maxTxIdToValidate) throws IOException {
     EditLogFileInputStream in;
     try {
       in = new EditLogFileInputStream(file);
@@ -314,7 +323,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
     }
     
     try {
-      return FSEditLogLoader.validateEditLog(in);
+      return FSEditLogLoader.validateEditLog(in, maxTxIdToValidate);
     } finally {
       IOUtils.closeStream(in);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index edf88c9..e255cff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -674,6 +674,16 @@ public class FSEditLog implements LogsPurgeable {
       synchronized (this) {
         if (sync) {
           synctxid = syncStart;
+          for (JournalManager jm : journalSet.getJournalManagers()) {
+            /**
+             * {@link FileJournalManager#lastReadableTxId} is only meaningful
+             * for file-based journals. Therefore the interface is not added to
+             * other types of {@link JournalManager}.
+             */
+            if (jm instanceof FileJournalManager) {
+              ((FileJournalManager)jm).setLastReadableTxId(syncStart);
+            }
+          }
           isSyncRunning = false;
         }
         this.notifyAll();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index fc0bb78..bb36ca2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -1112,8 +1112,14 @@ public class FSEditLogLoader {
    * If there are invalid or corrupt transactions in the middle of the stream,
    * validateEditLog will skip over them.
    * This reads through the stream but does not close it.
+   *
+   * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
+   *                          returns after reading this or a higher ID.
+   *                          The file portion beyond this ID is potentially
+   *                          being updated.
    */
-  static EditLogValidation validateEditLog(EditLogInputStream in) {
+  static EditLogValidation validateEditLog(EditLogInputStream in,
+      long maxTxIdToValidate) {
     long lastPos = 0;
     long lastTxId = HdfsServerConstants.INVALID_TXID;
     long numValid = 0;
@@ -1136,6 +1142,10 @@ public class FSEditLogLoader {
           || op.getTransactionId() > lastTxId) {
         lastTxId = op.getTransactionId();
       }
+      if (lastTxId >= maxTxIdToValidate) {
+        break;
+      }
+
       numValid++;
     }
     return new EditLogValidation(lastPos, lastTxId, false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index ebd7475..a1488eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -76,6 +76,15 @@ public class FileJournalManager implements JournalManager {
 
   private File currentInProgress = null;
 
+  /**
+   * A FileJournalManager should maintain the largest Tx ID that has been
+   * safely written to its edit log files.
+   * It should limit readers to read beyond this ID to avoid potential race
+   * with ongoing writers.
+   * Initial value indicates that all transactions can be read.
+   */
+  private long lastReadableTxId = Long.MAX_VALUE;
+
   @VisibleForTesting
   StoragePurger purger
     = new NNStorageRetentionManager.DeletionStoragePurger();
@@ -159,6 +168,15 @@ public class FileJournalManager implements JournalManager {
     this.outputBufferCapacity = size;
   }
 
+
+  public long getLastReadableTxId() {
+    return lastReadableTxId;
+  }
+
+  public void setLastReadableTxId(long id) {
+    this.lastReadableTxId = id;
+  }
+
   @Override
   public void purgeLogsOlderThan(long minTxIdToKeep)
       throws IOException {
@@ -193,7 +211,7 @@ public class FileJournalManager implements JournalManager {
       }
       if (elf.isInProgress()) {
         try {
-          elf.validateLog();
+          elf.validateLog(getLastReadableTxId());
         } catch (IOException e) {
           LOG.error("got IOException while trying to validate header of " +
               elf + ".  Skipping.", e);
@@ -325,11 +343,13 @@ public class FileJournalManager implements JournalManager {
           (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
           "from among " + elfs.size() + " candidate file(s)");
     }
-    addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
+    addStreamsToCollectionFromFiles(elfs, streams, fromTxId,
+        getLastReadableTxId(), inProgressOk);
   }
   
   static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
-      Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk)
{
+      Collection<EditLogInputStream> streams, long fromTxId, long maxTxIdToValidate,
+      boolean inProgressOk) {
     for (EditLogFile elf : elfs) {
       if (elf.isInProgress()) {
         if (!inProgressOk) {
@@ -340,7 +360,7 @@ public class FileJournalManager implements JournalManager {
           continue;
         }
         try {
-          elf.validateLog();
+          elf.validateLog(maxTxIdToValidate);
         } catch (IOException e) {
           LOG.error("got IOException while trying to validate header of " +
               elf + ".  Skipping.", e);
@@ -384,7 +404,7 @@ public class FileJournalManager implements JournalManager {
           continue;
         }
 
-        elf.validateLog();
+        elf.validateLog(getLastReadableTxId());
 
         if (elf.hasCorruptHeader()) {
           elf.moveAsideCorruptFile();
@@ -516,9 +536,14 @@ public class FileJournalManager implements JournalManager {
      * Find out where the edit log ends.
      * This will update the lastTxId of the EditLogFile or
      * mark it as corrupt if it is.
+     * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
+     *                          returns after reading this or a higher ID.
+     *                          The file portion beyond this ID is potentially
+     *                          being updated.
      */
-    public void validateLog() throws IOException {
-      EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
+    public void validateLog(long maxTxIdToValidate) throws IOException {
+      EditLogValidation val = EditLogFileInputStream.validateEditLog(file,
+          maxTxIdToValidate);
       this.lastTxId = val.getEndTxId();
       this.hasCorruptHeader = val.hasCorruptHeader();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
index 2267853..e3e0a7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
@@ -907,7 +907,7 @@ public class SecondaryNameNode implements Runnable,
             throw new RuntimeException(ioe);
           }
           FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
-              fromTxId, inProgressOk);
+              fromTxId, Long.MAX_VALUE, inProgressOk);
         }
       }
       

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
index 9401d07..d5e64ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
@@ -88,7 +88,7 @@ public class TestCheckPointForSecurityTokens {
       for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
         EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
         assertTrue(log.isInProgress());
-        log.validateLog();
+        log.validateLog(Long.MAX_VALUE);
         long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
         assertEquals("In-progress log " + log + " should have 5 transactions",
                      5, numTransactions);;
@@ -105,7 +105,7 @@ public class TestCheckPointForSecurityTokens {
       for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
         EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
         assertTrue(log.isInProgress());
-        log.validateLog();
+        log.validateLog(Long.MAX_VALUE);
         long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
         assertEquals("In-progress log " + log + " should only have START txn",
             1, numTransactions);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index e59dec4..0495860 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -83,6 +85,9 @@ import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.xml.sax.ContentHandler;
@@ -1223,7 +1228,8 @@ public class TestEditLog {
                                                                           TXNS_PER_ROLL*11);
 
     for (EditLogInputStream edits : editStreams) {
-      FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
+      FSEditLogLoader.EditLogValidation val =
+          FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE);
       long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
       LOG.info("Loading edits " + edits + " read " + read);
       assertEquals(startTxId, edits.getFirstTxId());
@@ -1573,4 +1579,99 @@ public class TestEditLog {
       }
     }
   }
+
+  class TestAppender extends AppenderSkeleton {
+    private final List<LoggingEvent> log = new ArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+      log.add(loggingEvent);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<LoggingEvent> getLog() {
+      return new ArrayList<>(log);
+    }
+  }
+
+  /**
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testReadActivelyUpdatedLog() throws Exception {
+    final TestAppender appender = new TestAppender();
+    LogManager.getRootLogger().addAppender(appender);
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // Set single handler thread, so all transactions hit same thread-local ops.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FSImage fsimage = cluster.getNamesystem().getFSImage();
+      StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
+
+      final DistributedFileSystem fileSys = cluster.getFileSystem();
+      DFSInotifyEventInputStream events = fileSys.getInotifyEventStream();
+      fileSys.mkdirs(new Path("/test"));
+      fileSys.mkdirs(new Path("/test/dir1"));
+      fileSys.delete(new Path("/test/dir1"), true);
+      fsimage.getEditLog().logSync();
+      fileSys.mkdirs(new Path("/test/dir2"));
+
+
+      final File inProgressEdit = NNStorage.getInProgressEditsFile(sd, 1);
+      assertTrue(inProgressEdit.exists());
+      EditLogFileInputStream elis = new EditLogFileInputStream(inProgressEdit);
+      FSEditLogOp op;
+      long pos = 0;
+
+      while (true) {
+        op = elis.readOp();
+        if (op != null && op.opCode != FSEditLogOpCodes.OP_INVALID) {
+          pos = elis.getPosition();
+        } else {
+          break;
+        }
+      }
+      elis.close();
+      assertTrue(pos > 0);
+
+      RandomAccessFile rwf = new RandomAccessFile(inProgressEdit, "rw");
+      rwf.seek(pos);
+      assertEquals(rwf.readByte(), (byte) -1);
+
+      rwf.seek(pos + 1);
+      rwf.writeByte(2);
+
+      rwf.close();
+
+      events.poll();
+      String pattern = "Caught exception after reading (.*) ops";
+      Pattern r = Pattern.compile(pattern);
+      final List<LoggingEvent> log = appender.getLog();
+      for (LoggingEvent event : log) {
+        Matcher m = r.matcher(event.getRenderedMessage());
+        if (m.find()) {
+          fail("Should not try to read past latest syned edit log op");
+        }
+      }
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      LogManager.getRootLogger().removeAppender(appender);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 55ba379..3c3423a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -318,7 +318,8 @@ public class TestFSEditLogLoader {
     } finally {
       rwf.close();
     }
-    EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
+    EditLogValidation validation =
+        EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
     assertTrue(validation.hasCorruptHeader());
   }
 
@@ -333,7 +334,7 @@ public class TestFSEditLogLoader {
     File logFileBak = new File(testDir, logFile.getName() + ".bak");
     Files.copy(logFile, logFileBak);
     EditLogValidation validation =
-        EditLogFileInputStream.validateEditLog(logFile);
+        EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
     assertTrue(!validation.hasCorruptHeader());
     // We expect that there will be an OP_START_LOG_SEGMENT, followed by
     // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
@@ -346,7 +347,8 @@ public class TestFSEditLogLoader {
       // Restore backup, corrupt the txn opcode
       Files.copy(logFileBak, logFile);
       corruptByteInFile(logFile, txOffset);
-      validation = EditLogFileInputStream.validateEditLog(logFile);
+      validation = EditLogFileInputStream.validateEditLog(logFile,
+          Long.MAX_VALUE);
       long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
           NUM_TXNS : (NUM_TXNS + 1);
       assertEquals("Failed when corrupting txn opcode at " + txOffset,
@@ -363,7 +365,8 @@ public class TestFSEditLogLoader {
       // Restore backup, corrupt the txn opcode
       Files.copy(logFileBak, logFile);
       truncateFile(logFile, txOffset);
-      validation = EditLogFileInputStream.validateEditLog(logFile);
+      validation = EditLogFileInputStream.validateEditLog(logFile,
+          Long.MAX_VALUE);
       long expectedEndTxId = (txId == 0) ?
           HdfsServerConstants.INVALID_TXID : (txId - 1);
       assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
@@ -381,7 +384,7 @@ public class TestFSEditLogLoader {
     // layout flags section.
     truncateFile(logFile, 8);
     EditLogValidation validation =
-        EditLogFileInputStream.validateEditLog(logFile);
+        EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
     assertTrue(!validation.hasCorruptHeader());
     assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId());
   }


Mime
View raw message