hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1417588 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/namenode/
Date Wed, 05 Dec 2012 19:13:30 GMT
Author: todd
Date: Wed Dec  5 19:13:29 2012
New Revision: 1417588

URL: http://svn.apache.org/viewvc?rev=1417588&view=rev
Log:
HDFS-3049. During the normal NN startup process, fall back on a different edit log if we see
one that is corrupt. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec  5
19:13:29 2012
@@ -128,6 +128,9 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4199. Provide test for HdfsVolumeId. (Ivan A. Veselovsky via atm)
 
+    HDFS-3049. During the normal NN startup process, fall back on a different
+    edit log if we see one that is corrupt (Colin Patrick McCabe via todd)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Wed Dec  5 19:13:29 2012
@@ -1145,18 +1145,6 @@ public class FSEditLog implements LogsPu
         throw e;
       }
     }
-    // This code will go away as soon as RedundantEditLogInputStream is
-    // introduced. (HDFS-3049)
-    try {
-      if (!streams.isEmpty()) {
-        streams.get(0).skipUntil(fromTxId);
-      }
-    } catch (IOException e) {
-      // We don't want to throw an exception from here, because that would make
-      // recovery impossible even if the user requested it.  An exception will
-      // be thrown later, when we don't read the starting txid we expect.
-      LOG.error("error skipping until transaction " + fromTxId, e);
-    }
     return streams;
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
Wed Dec  5 19:13:29 2012
@@ -665,7 +665,9 @@ public class FSEditLogLoader {
         FSImage.LOG.warn("Caught exception after reading " + numValid +
             " ops from " + in + " while determining its valid length." +
             "Position was " + lastPos, t);
-        break;
+        in.resync();
+        FSImage.LOG.warn("After resync, position is " + in.getPosition());
+        continue;
       }
       if (lastTxId == HdfsConstants.INVALID_TXID
           || op.getTransactionId() > lastTxId) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
Wed Dec  5 19:13:29 2012
@@ -24,7 +24,9 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.SortedSet;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,7 +45,6 @@ import com.google.common.collect.Immutab
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultiset;
 
 /**
  * Manages a collection of Journals. None of the methods are synchronized, it is
@@ -232,8 +233,9 @@ public class JournalSet implements Journ
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxId, boolean inProgressOk) throws IOException {
-    final TreeMultiset<EditLogInputStream> allStreams =
-        TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    final PriorityQueue<EditLogInputStream> allStreams = 
+        new PriorityQueue<EditLogInputStream>(64,
+            EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (JournalAndStream jas : journals) {
       if (jas.isDisabled()) {
         LOG.info("Skipping jas " + jas + " since it's disabled");
@@ -249,7 +251,8 @@ public class JournalSet implements Journ
     // transaction ID.
     LinkedList<EditLogInputStream> acc =
         new LinkedList<EditLogInputStream>();
-    for (EditLogInputStream elis : allStreams) {
+    EditLogInputStream elis;
+    while ((elis = allStreams.poll()) != null) {
       if (acc.isEmpty()) {
         acc.add(elis);
       } else {
@@ -257,7 +260,7 @@ public class JournalSet implements Journ
         if (accFirstTxId == elis.getFirstTxId()) {
           acc.add(elis);
         } else if (accFirstTxId < elis.getFirstTxId()) {
-          streams.add(acc.get(0));
+          streams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.add(elis);
         } else if (accFirstTxId > elis.getFirstTxId()) {
@@ -268,7 +271,7 @@ public class JournalSet implements Journ
       }
     }
     if (!acc.isEmpty()) {
-      streams.add(acc.get(0));
+      streams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
     }
   }

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java?rev=1417588&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
Wed Dec  5 19:13:29 2012
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+
+/**
+ * A merged input stream that handles failover between different edit logs.
+ *
+ * We will currently try each edit log stream exactly once.  In other words, we
+ * don't handle the "ping pong" scenario where different edit logs contain a
+ * different subset of the available edits.
+ */
+class RedundantEditLogInputStream extends EditLogInputStream {
+  public static final Log LOG = LogFactory.getLog(EditLogInputStream.class.getName());
+  private int curIdx;
+  private long prevTxId;
+  private final EditLogInputStream[] streams;
+
+  /**
+   * States that the RedundantEditLogInputStream can be in.
+   *
+   * <pre>
+   *                   start (if no streams)
+   *                           |
+   *                           V
+   * PrematureEOFException  +----------------+
+   *        +-------------->| EOF            |<--------------+
+   *        |               +----------------+               |
+   *        |                                                |
+   *        |          start (if there are streams)          |
+   *        |                  |                             |
+   *        |                  V                             | EOF
+   *        |   resync      +----------------+ skipUntil  +---------+
+   *        |   +---------->| SKIP_UNTIL     |----------->|  OK     |
+   *        |   |           +----------------+            +---------+
+   *        |   |                | IOE   ^ fail over to      | IOE
+   *        |   |                V       | next stream       |
+   * +----------------------+   +----------------+           |
+   * | STREAM_FAILED_RESYNC |   | STREAM_FAILED  |<----------+
+   * +----------------------+   +----------------+
+   *                  ^   Recovery mode    |
+   *                  +--------------------+
+   * </pre>
+   */
+  static private enum State {
+    /** We need to skip until prevTxId + 1 */
+    SKIP_UNTIL,
+    /** We're ready to read opcodes out of the current stream */
+    OK,
+    /** The current stream has failed. */
+    STREAM_FAILED,
+    /** The current stream has failed, and resync() was called.  */
+    STREAM_FAILED_RESYNC,
+    /** There are no more opcodes to read from this
+     * RedundantEditLogInputStream */
+    EOF;
+  }
+
+  private State state;
+  private IOException prevException;
+
+  RedundantEditLogInputStream(Collection<EditLogInputStream> streams,
+      long startTxId) {
+    this.curIdx = 0;
+    this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ?
+      HdfsConstants.INVALID_TXID : (startTxId - 1);
+    this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL;
+    this.prevException = null;
+    // EditLogInputStreams in a RedundantEditLogInputStream must be finalized,
+    // and can't be pre-transactional.
+    EditLogInputStream first = null;
+    for (EditLogInputStream s : streams) {
+      Preconditions.checkArgument(s.getFirstTxId() !=
+          HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s);
+      Preconditions.checkArgument(s.getLastTxId() !=
+          HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s);
+      if (first == null) {
+        first = s;
+      } else {
+        Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(),
+          "All streams in the RedundantEditLogInputStream must have the same " +
+          "start transaction ID!  " + first + " had start txId " +
+          first.getFirstTxId() + ", but " + s + " had start txId " +
+          s.getFirstTxId());
+      }
+    }
+
+    this.streams = streams.toArray(new EditLogInputStream[0]);
+
+    // We sort the streams here so that the streams that end later come first.
+    Arrays.sort(this.streams, new Comparator<EditLogInputStream>() {
+      @Override
+      public int compare(EditLogInputStream a, EditLogInputStream b) {
+        return Longs.compare(b.getLastTxId(), a.getLastTxId());
+      }
+    });
+  }
+
+  @Override
+  public String getName() {
+    StringBuilder bld = new StringBuilder();
+    String prefix = "";
+    for (EditLogInputStream elis : streams) {
+      bld.append(prefix);
+      bld.append(elis.getName());
+      prefix = ", ";
+    }
+    return bld.toString();
+  }
+
+  @Override
+  public long getFirstTxId() {
+    return streams[curIdx].getFirstTxId();
+  }
+
+  @Override
+  public long getLastTxId() {
+    return streams[curIdx].getLastTxId();
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.cleanup(LOG,  streams);
+  }
+
+  @Override
+  protected FSEditLogOp nextValidOp() {
+    try {
+      if (state == State.STREAM_FAILED) {
+        state = State.STREAM_FAILED_RESYNC;
+      }
+      return nextOp();
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  @Override
+  protected FSEditLogOp nextOp() throws IOException {
+    while (true) {
+      switch (state) {
+      case SKIP_UNTIL:
+       try {
+          if (prevTxId != HdfsConstants.INVALID_TXID) {
+            LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
+                "' to transaction ID " + (prevTxId + 1));
+            streams[curIdx].skipUntil(prevTxId + 1);
+          }
+        } catch (IOException e) {
+          prevException = e;
+          state = State.STREAM_FAILED;
+        }
+        state = State.OK;
+        break;
+      case OK:
+        try {
+          FSEditLogOp op = streams[curIdx].readOp();
+          if (op == null) {
+            state = State.EOF;
+            if (streams[curIdx].getLastTxId() == prevTxId) {
+              return null;
+            } else {
+              throw new PrematureEOFException("got premature end-of-file " +
+                  "at txid " + prevTxId + "; expected file to go up to " +
+                  streams[curIdx].getLastTxId());
+            }
+          }
+          prevTxId = op.getTransactionId();
+          return op;
+        } catch (IOException e) {
+          prevException = e;
+          state = State.STREAM_FAILED;
+        }
+        break;
+      case STREAM_FAILED:
+        if (curIdx + 1 == streams.length) {
+          throw prevException;
+        }
+        long oldLast = streams[curIdx].getLastTxId();
+        long newLast = streams[curIdx + 1].getLastTxId();
+        if (newLast < oldLast) {
+          throw new IOException("We encountered an error reading " +
+              streams[curIdx].getName() + ".  During automatic edit log " +
+              "failover, we noticed that all of the remaining edit log " +
+              "streams are shorter than the current one!  The best " +
+              "remaining edit log ends at transaction " +
+              newLast + ", but we thought we could read up to transaction " +
+              oldLast + ".  If you continue, metadata will be lost forever!");
+        }
+        LOG.error("Got error reading edit log input stream " +
+          streams[curIdx].getName() + "; failing over to edit log " +
+          streams[curIdx + 1].getName(), prevException);
+        curIdx++;
+        state = State.SKIP_UNTIL;
+        break;
+      case STREAM_FAILED_RESYNC:
+        if (curIdx + 1 == streams.length) {
+          if (prevException instanceof PrematureEOFException) {
+            // bypass early EOF check
+            state = State.EOF;
+          } else {
+            streams[curIdx].resync();
+            state = State.SKIP_UNTIL;
+          }
+        } else {
+          LOG.error("failing over to edit log " +
+              streams[curIdx + 1].getName());
+          curIdx++;
+          state = State.SKIP_UNTIL;
+        }
+        break;
+      case EOF:
+        return null;
+      }
+    }
+  }
+
+  @Override
+  public int getVersion() throws IOException {
+    return streams[curIdx].getVersion();
+  }
+
+  @Override
+  public long getPosition() {
+    return streams[curIdx].getPosition();
+  }
+
+  @Override
+  public long length() throws IOException {
+    return streams[curIdx].length();
+  }
+
+  @Override
+  public boolean isInProgress() {
+    return streams[curIdx].isInProgress();
+  }
+
+  static private final class PrematureEOFException extends IOException {
+    private static final long serialVersionUID = 1L;
+    PrematureEOFException(String msg) {
+      super(msg);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Wed Dec  5 19:13:29 2012
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -531,21 +532,29 @@ public class TestEditLog {
     FSImage fsimage = namesystem.getFSImage();
     final FSEditLog editLog = fsimage.getEditLog();
     fileSys.mkdirs(new Path("/tmp"));
-    StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
+
+    Iterator<StorageDirectory> iter = fsimage.getStorage().
+      dirIterator(NameNodeDirType.EDITS);
+    LinkedList<StorageDirectory> sds = new LinkedList<StorageDirectory>();
+    while (iter.hasNext()) {
+      sds.add(iter.next());
+    }
     editLog.close();
     cluster.shutdown();
 
-    File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
-    assertTrue(editFile.exists());
-
-    long fileLen = editFile.length();
-    System.out.println("File name: " + editFile + " len: " + fileLen);
-    RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
-    rwf.seek(fileLen-4); // seek to checksum bytes
-    int b = rwf.readInt();
-    rwf.seek(fileLen-4);
-    rwf.writeInt(b+1);
-    rwf.close();
+    for (StorageDirectory sd : sds) {
+      File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
+      assertTrue(editFile.exists());
+  
+      long fileLen = editFile.length();
+      LOG.debug("Corrupting Log File: " + editFile + " len: " + fileLen);
+      RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
+      rwf.seek(fileLen-4); // seek to checksum bytes
+      int b = rwf.readInt();
+      rwf.seek(fileLen-4);
+      rwf.writeInt(b+1);
+      rwf.close();
+    }
     
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
@@ -1264,6 +1273,113 @@ public class TestEditLog {
     }
   }
 
+  private static long readAllEdits(Collection<EditLogInputStream> streams,
+      long startTxId) throws IOException {
+    FSEditLogOp op;
+    long nextTxId = startTxId;
+    long numTx = 0;
+    for (EditLogInputStream s : streams) {
+      while (true) {
+        op = s.readOp();
+        if (op == null)
+          break;
+        if (op.getTransactionId() != nextTxId) {
+          throw new IOException("out of order transaction ID!  expected " +
+              nextTxId + " but got " + op.getTransactionId() + " when " +
+              "reading " + s.getName());
+        }
+        numTx++;
+        nextTxId = op.getTransactionId() + 1;
+      }
+    }
+    return numTx;
+  }
+
+  /**
+   * Test edit log failover.  If a single edit log is missing, other 
+   * edits logs should be used instead.
+   */
+  @Test
+  public void testEditLogFailOverFromMissing() throws IOException {
+    File f1 = new File(TEST_DIR + "/failover0");
+    File f2 = new File(TEST_DIR + "/failover1");
+    List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
+
+    NNStorage storage = setupEdits(editUris, 3);
+    
+    final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
+    final long endErrorTxId = 2*TXNS_PER_ROLL;
+
+    File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId, 
+                                  endErrorTxId))) {
+            return true;
+          }
+          return false;
+        }
+      });
+    assertEquals(1, files.length);
+    assertTrue(files[0].delete());
+
+    FSEditLog editlog = getFSEditLog(storage);
+    editlog.initJournalsForWrite();
+    long startTxId = 1;
+    try {
+      readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
+          startTxId);
+    } catch (IOException e) {
+      LOG.error("edit log failover didn't work", e);
+      fail("Edit log failover didn't work");
+    }
+  }
+
+  /** 
+   * Test edit log failover from a corrupt edit log
+   */
+  @Test
+  public void testEditLogFailOverFromCorrupt() throws IOException {
+    File f1 = new File(TEST_DIR + "/failover0");
+    File f2 = new File(TEST_DIR + "/failover1");
+    List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
+
+    NNStorage storage = setupEdits(editUris, 3);
+    
+    final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
+    final long endErrorTxId = 2*TXNS_PER_ROLL;
+
+    File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId, 
+                                  endErrorTxId))) {
+            return true;
+          }
+          return false;
+        }
+      });
+    assertEquals(1, files.length);
+
+    long fileLen = files[0].length();
+    LOG.debug("Corrupting Log File: " + files[0] + " len: " + fileLen);
+    RandomAccessFile rwf = new RandomAccessFile(files[0], "rw");
+    rwf.seek(fileLen-4); // seek to checksum bytes
+    int b = rwf.readInt();
+    rwf.seek(fileLen-4);
+    rwf.writeInt(b+1);
+    rwf.close();
+    
+    FSEditLog editlog = getFSEditLog(storage);
+    editlog.initJournalsForWrite();
+    long startTxId = 1;
+    try {
+      readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
+          startTxId);
+    } catch (IOException e) {
+      LOG.error("edit log failover didn't work", e);
+      fail("Edit log failover didn't work");
+    }
+  }
+
   /**
    * Test creating a directory with lots and lots of edit log segments
    */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Wed Dec  5 19:13:29 2012
@@ -31,6 +31,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
+import java.util.Map;
 import java.util.SortedMap;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -50,6 +51,7 @@ import org.apache.log4j.Level;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 
 public class TestFSEditLogLoader {
   
@@ -320,6 +322,56 @@ public class TestFSEditLogLoader {
   }
 
   @Test
+  public void testValidateEditLogWithCorruptBody() throws IOException {
+    File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptBody");
+    SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+    final int NUM_TXNS = 20;
+    File logFile = prepareUnfinalizedTestEditLog(testDir, NUM_TXNS,
+        offsetToTxId);
+    // Back up the uncorrupted log
+    File logFileBak = new File(testDir, logFile.getName() + ".bak");
+    Files.copy(logFile, logFileBak);
+    EditLogValidation validation =
+        EditLogFileInputStream.validateEditLog(logFile);
+    assertTrue(!validation.hasCorruptHeader());
+    // We expect that there will be an OP_START_LOG_SEGMENT, followed by
+    // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
+    assertEquals(NUM_TXNS + 1, validation.getEndTxId());
+    // Corrupt each edit and verify that validation continues to work
+    for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+      long txOffset = entry.getKey();
+      long txId = entry.getValue();
+
+      // Restore backup, corrupt the txn opcode
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, txOffset);
+      validation = EditLogFileInputStream.validateEditLog(logFile);
+      long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
+          NUM_TXNS : (NUM_TXNS + 1);
+      assertEquals("Failed when corrupting txn opcode at " + txOffset,
+          expectedEndTxId, validation.getEndTxId());
+      assertTrue(!validation.hasCorruptHeader());
+    }
+
+    // Truncate right before each edit and verify that validation continues
+    // to work
+    for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+      long txOffset = entry.getKey();
+      long txId = entry.getValue();
+
+      // Restore backup, corrupt the txn opcode
+      Files.copy(logFileBak, logFile);
+      truncateFile(logFile, txOffset);
+      validation = EditLogFileInputStream.validateEditLog(logFile);
+      long expectedEndTxId = (txId == 0) ?
+          HdfsConstants.INVALID_TXID : (txId - 1);
+      assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
+        "at " + txOffset, expectedEndTxId, validation.getEndTxId());
+      assertTrue(!validation.hasCorruptHeader());
+    }
+  }
+
+  @Test
   public void testValidateEmptyEditLog() throws IOException {
     File testDir = new File(TEST_DIR, "testValidateEmptyEditLog");
     SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java?rev=1417588&r1=1417587&r2=1417588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
Wed Dec  5 19:13:29 2012
@@ -32,6 +32,7 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.PriorityQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,12 +71,13 @@ public class TestFileJournalManager {
   static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
       boolean inProgressOk, boolean abortOnGap) throws IOException {
     long numTransactions = 0, txId = fromTxId;
-    final TreeMultiset<EditLogInputStream> allStreams =
-        TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    final PriorityQueue<EditLogInputStream> allStreams = 
+        new PriorityQueue<EditLogInputStream>(64,
+            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
-
+    EditLogInputStream elis = null;
     try {
-      for (EditLogInputStream elis : allStreams) {
+      while ((elis = allStreams.poll()) != null) {
         elis.skipUntil(txId);
         while (true) {
           FSEditLogOp op = elis.readOp();
@@ -93,6 +95,7 @@ public class TestFileJournalManager {
       }
     } finally {
       IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
+      IOUtils.cleanup(LOG, elis);
     }
     return numTransactions;
   }
@@ -387,27 +390,28 @@ public class TestFileJournalManager {
   
   private static EditLogInputStream getJournalInputStream(JournalManager jm,
       long txId, boolean inProgressOk) throws IOException {
-    final TreeMultiset<EditLogInputStream> allStreams =
-        TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    final PriorityQueue<EditLogInputStream> allStreams = 
+        new PriorityQueue<EditLogInputStream>(64,
+            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     jm.selectInputStreams(allStreams, txId, inProgressOk);
+    EditLogInputStream elis = null, ret;
     try {
-      for (Iterator<EditLogInputStream> iter = allStreams.iterator();
-          iter.hasNext();) {
-        EditLogInputStream elis = iter.next();
+      while ((elis = allStreams.poll()) != null) {
         if (elis.getFirstTxId() > txId) {
           break;
         }
         if (elis.getLastTxId() < txId) {
-          iter.remove();
           elis.close();
           continue;
         }
         elis.skipUntil(txId);
-        iter.remove();
-        return elis;
+        ret = elis;
+        elis = null;
+        return ret;
       }
     } finally {
       IOUtils.cleanup(LOG,  allStreams.toArray(new EditLogInputStream[0]));
+      IOUtils.cleanup(LOG,  elis);
     }
     return null;
   }



Mime
View raw message