hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1102146 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Thu, 12 May 2011 00:18:43 GMT
Author: todd
Date: Thu May 12 00:18:42 2011
New Revision: 1102146

URL: http://svn.apache.org/viewvc?rev=1102146&view=rev
Log:
HDFS-1378. Edit log replay should track and report file offsets in case of errors. Contributed
by Aaron T. Myers and Todd Lipcon.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1102146&r1=1102145&r2=1102146&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu May 12 00:18:42 2011
@@ -401,6 +401,9 @@ Trunk (unreleased changes)
     HDFS-1906. Remove logging exception stack trace in client logs when one of
     the datanode targets to read from is not reachable. (suresh)
 
+    HDFS-1378. Edit log replay should track and report file offsets in case of
+    errors. (Aaron T. Myers and Todd Lipcon via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1102146&r1=1102145&r2=1102146&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
Thu May 12 00:18:42 2011
@@ -22,7 +22,10 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.EOFException;
+import java.io.FilterInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
@@ -139,381 +142,409 @@ public class FSEditLogLoader {
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
         numOpUpdateMasterKey = 0, numOpOther = 0;
 
+    // Keep track of the file offsets of the last several opcodes.
+    // This is handy when manually recovering corrupted edits files.
+    PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
+    in = new DataInputStream(tracker);
+    long recentOpcodeOffsets[] = new long[4];
+    Arrays.fill(recentOpcodeOffsets, -1);
+
     try {
-      while (true) {
-        long timestamp = 0;
-        long mtime = 0;
-        long atime = 0;
-        long blockSize = 0;
-        FSEditLogOpCodes opCode;
-        try {
-          if (checksum != null) {
-            checksum.reset();
-          }
-          in.mark(1);
-          byte opCodeByte = in.readByte();
-          opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-          if (opCode == FSEditLogOpCodes.OP_INVALID) {
-            in.reset(); // reset back to end of file if somebody reads it again
+      try {
+        while (true) {
+          long timestamp = 0;
+          long mtime = 0;
+          long atime = 0;
+          long blockSize = 0;
+          FSEditLogOpCodes opCode;
+          try {
+            if (checksum != null) {
+              checksum.reset();
+            }
+            in.mark(1);
+            byte opCodeByte = in.readByte();
+            opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+            if (opCode == FSEditLogOpCodes.OP_INVALID) {
+              in.reset(); // reset back to end of file if somebody reads it again
+              break; // no more transactions
+            }
+          } catch (EOFException e) {
             break; // no more transactions
           }
-        } catch (EOFException e) {
-          break; // no more transactions
-        }
-        numEdits++;
-        switch (opCode) {
-        case OP_ADD:
-        case OP_CLOSE: {
-          // versions > 0 support per file replication
-          // get name and replication
-          int length = in.readInt();
-          if (-7 == logVersion && length != 3||
-              -17 < logVersion && logVersion < -7 && length != 4 ||
-              logVersion <= -17 && length != 5) {
-              throw new IOException("Incorrect data format."  +
-                                    " logVersion is " + logVersion +
-                                    " but writables.length is " +
-                                    length + ". ");
-          }
-          path = FSImageSerialization.readString(in);
-          short replication = fsNamesys.adjustReplication(readShort(in));
-          mtime = readLong(in);
-          if (logVersion <= -17) {
-            atime = readLong(in);
-          }
-          if (logVersion < -7) {
-            blockSize = readLong(in);
-          }
-          // get blocks
-          boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
-          BlockInfo blocks[] = 
-            readBlocks(in, logVersion, isFileUnderConstruction, replication);
-
-          // Older versions of HDFS does not store the block size in inode.
-          // If the file has more than one block, use the size of the
-          // first block as the blocksize. Otherwise use the default
-          // block size.
-          if (-8 <= logVersion && blockSize == 0) {
-            if (blocks.length > 1) {
-              blockSize = blocks[0].getNumBytes();
+          recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
+              tracker.getPos();
+          numEdits++;
+          switch (opCode) {
+          case OP_ADD:
+          case OP_CLOSE: {
+            // versions > 0 support per file replication
+            // get name and replication
+            int length = in.readInt();
+            if (-7 == logVersion && length != 3||
+                -17 < logVersion && logVersion < -7 && length != 4
||
+                logVersion <= -17 && length != 5) {
+                throw new IOException("Incorrect data format."  +
+                                      " logVersion is " + logVersion +
+                                      " but writables.length is " +
+                                      length + ". ");
+            }
+            path = FSImageSerialization.readString(in);
+            short replication = fsNamesys.adjustReplication(readShort(in));
+            mtime = readLong(in);
+            if (logVersion <= -17) {
+              atime = readLong(in);
+            }
+            if (logVersion < -7) {
+              blockSize = readLong(in);
+            }
+            // get blocks
+            boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
+            BlockInfo blocks[] = 
+              readBlocks(in, logVersion, isFileUnderConstruction, replication);
+  
+            // Older versions of HDFS does not store the block size in inode.
+            // If the file has more than one block, use the size of the
+            // first block as the blocksize. Otherwise use the default
+            // block size.
+            if (-8 <= logVersion && blockSize == 0) {
+              if (blocks.length > 1) {
+                blockSize = blocks[0].getNumBytes();
+              } else {
+                long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
+                blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+              }
+            }
+             
+            PermissionStatus permissions = fsNamesys.getUpgradePermission();
+            if (logVersion <= -11) {
+              permissions = PermissionStatus.read(in);
+            }
+  
+            // clientname, clientMachine and block locations of last block.
+            if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
+              clientName = FSImageSerialization.readString(in);
+              clientMachine = FSImageSerialization.readString(in);
+              if (-13 <= logVersion) {
+                readDatanodeDescriptorArray(in);
+              }
             } else {
-              long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
-              blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+              clientName = "";
+              clientMachine = "";
+            }
+  
+            // The open lease transaction re-creates a file if necessary.
+            // Delete the file if it already exists.
+            if (FSNamesystem.LOG.isDebugEnabled()) {
+              FSNamesystem.LOG.debug(opCode + ": " + path + 
+                                     " numblocks : " + blocks.length +
+                                     " clientHolder " +  clientName +
+                                     " clientMachine " + clientMachine);
+            }
+  
+            fsDir.unprotectedDelete(path, mtime);
+  
+            // add to the file tree
+            INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+                                                      path, permissions,
+                                                      blocks, replication, 
+                                                      mtime, atime, blockSize);
+            if (isFileUnderConstruction) {
+              numOpAdd++;
+              //
+              // Replace current node with a INodeUnderConstruction.
+              // Recreate in-memory lease record.
+              //
+              INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                        node.getLocalNameBytes(),
+                                        node.getReplication(), 
+                                        node.getModificationTime(),
+                                        node.getPreferredBlockSize(),
+                                        node.getBlocks(),
+                                        node.getPermissionStatus(),
+                                        clientName, 
+                                        clientMachine, 
+                                        null);
+              fsDir.replaceNode(path, node, cons);
+              fsNamesys.leaseManager.addLease(cons.getClientName(), path);
+            }
+            break;
+          } 
+          case OP_SET_REPLICATION: {
+            numOpSetRepl++;
+            path = FSImageSerialization.readString(in);
+            short replication = fsNamesys.adjustReplication(readShort(in));
+            fsDir.unprotectedSetReplication(path, replication, null);
+            break;
+          } 
+          case OP_CONCAT_DELETE: {
+            if (logVersion > -22) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpConcatDelete++;
+            int length = in.readInt();
+            if (length < 3) { // trg, srcs.., timestam
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            String trg = FSImageSerialization.readString(in);
+            int srcSize = length - 1 - 1; //trg and timestamp
+            String [] srcs = new String [srcSize];
+            for(int i=0; i<srcSize;i++) {
+              srcs[i]= FSImageSerialization.readString(in);
             }
+            timestamp = readLong(in);
+            fsDir.unprotectedConcat(trg, srcs);
+            break;
+          }
+          case OP_RENAME_OLD: {
+            numOpRenameOld++;
+            int length = in.readInt();
+            if (length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            String s = FSImageSerialization.readString(in);
+            String d = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
+            fsDir.unprotectedRenameTo(s, d, timestamp);
+            fsNamesys.changeLease(s, d, dinfo);
+            break;
+          }
+          case OP_DELETE: {
+            numOpDelete++;
+            int length = in.readInt();
+            if (length != 2) {
+              throw new IOException("Incorrect data format. " 
+                                    + "delete operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+            fsDir.unprotectedDelete(path, timestamp);
+            break;
+          }
+          case OP_MKDIR: {
+            numOpMkDir++;
+            PermissionStatus permissions = fsNamesys.getUpgradePermission();
+            int length = in.readInt();
+            if (-17 < logVersion && length != 2 ||
+                logVersion <= -17 && length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+  
+            // The disk format stores atimes for directories as well.
+            // However, currently this is not being updated/used because of
+            // performance reasons.
+            if (logVersion <= -17) {
+              atime = readLong(in);
+            }
+  
+            if (logVersion <= -11) {
+              permissions = PermissionStatus.read(in);
+            }
+            fsDir.unprotectedMkdir(path, permissions, timestamp);
+            break;
           }
-           
-          PermissionStatus permissions = fsNamesys.getUpgradePermission();
-          if (logVersion <= -11) {
-            permissions = PermissionStatus.read(in);
-          }
-
-          // clientname, clientMachine and block locations of last block.
-          if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
-            clientName = FSImageSerialization.readString(in);
-            clientMachine = FSImageSerialization.readString(in);
-            if (-13 <= logVersion) {
-              readDatanodeDescriptorArray(in);
-            }
-          } else {
-            clientName = "";
-            clientMachine = "";
-          }
-
-          // The open lease transaction re-creates a file if necessary.
-          // Delete the file if it already exists.
-          if (FSNamesystem.LOG.isDebugEnabled()) {
-            FSNamesystem.LOG.debug(opCode + ": " + path + 
-                                   " numblocks : " + blocks.length +
-                                   " clientHolder " +  clientName +
-                                   " clientMachine " + clientMachine);
-          }
-
-          fsDir.unprotectedDelete(path, mtime);
-
-          // add to the file tree
-          INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                                                    path, permissions,
-                                                    blocks, replication, 
-                                                    mtime, atime, blockSize);
-          if (isFileUnderConstruction) {
-            numOpAdd++;
-            //
-            // Replace current node with a INodeUnderConstruction.
-            // Recreate in-memory lease record.
-            //
-            INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                      node.getLocalNameBytes(),
-                                      node.getReplication(), 
-                                      node.getModificationTime(),
-                                      node.getPreferredBlockSize(),
-                                      node.getBlocks(),
-                                      node.getPermissionStatus(),
-                                      clientName, 
-                                      clientMachine, 
-                                      null);
-            fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
-          }
-          break;
-        } 
-        case OP_SET_REPLICATION: {
-          numOpSetRepl++;
-          path = FSImageSerialization.readString(in);
-          short replication = fsNamesys.adjustReplication(readShort(in));
-          fsDir.unprotectedSetReplication(path, replication, null);
-          break;
-        } 
-        case OP_CONCAT_DELETE: {
-          if (logVersion > -22) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpConcatDelete++;
-          int length = in.readInt();
-          if (length < 3) { // trg, srcs.., timestam
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String trg = FSImageSerialization.readString(in);
-          int srcSize = length - 1 - 1; //trg and timestamp
-          String [] srcs = new String [srcSize];
-          for(int i=0; i<srcSize;i++) {
-            srcs[i]= FSImageSerialization.readString(in);
-          }
-          timestamp = readLong(in);
-          fsDir.unprotectedConcat(trg, srcs);
-          break;
-        }
-        case OP_RENAME_OLD: {
-          numOpRenameOld++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String s = FSImageSerialization.readString(in);
-          String d = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-          fsDir.unprotectedRenameTo(s, d, timestamp);
-          fsNamesys.changeLease(s, d, dinfo);
-          break;
-        }
-        case OP_DELETE: {
-          numOpDelete++;
-          int length = in.readInt();
-          if (length != 2) {
-            throw new IOException("Incorrect data format. " 
-                                  + "delete operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-          fsDir.unprotectedDelete(path, timestamp);
-          break;
-        }
-        case OP_MKDIR: {
-          numOpMkDir++;
-          PermissionStatus permissions = fsNamesys.getUpgradePermission();
-          int length = in.readInt();
-          if (-17 < logVersion && length != 2 ||
-              logVersion <= -17 && length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-
-          // The disk format stores atimes for directories as well.
-          // However, currently this is not being updated/used because of
-          // performance reasons.
-          if (logVersion <= -17) {
+          case OP_SET_GENSTAMP: {
+            numOpSetGenStamp++;
+            long lw = in.readLong();
+            fsNamesys.setGenerationStamp(lw);
+            break;
+          } 
+          case OP_DATANODE_ADD: {
+            numOpOther++;
+            //Datanodes are not persistent any more.
+            FSImageSerialization.DatanodeImage.skipOne(in);
+            break;
+          }
+          case OP_DATANODE_REMOVE: {
+            numOpOther++;
+            DatanodeID nodeID = new DatanodeID();
+            nodeID.readFields(in);
+            //Datanodes are not persistent any more.
+            break;
+          }
+          case OP_SET_PERMISSIONS: {
+            numOpSetPerm++;
+            if (logVersion > -11)
+              throw new IOException("Unexpected opCode " + opCode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetPermission(
+                FSImageSerialization.readString(in), FsPermission.read(in));
+            break;
+          }
+          case OP_SET_OWNER: {
+            numOpSetOwner++;
+            if (logVersion > -11)
+              throw new IOException("Unexpected opCode " + opCode
+                                    + " for version " + logVersion);
+            fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
+                FSImageSerialization.readString_EmptyAsNull(in),
+                FSImageSerialization.readString_EmptyAsNull(in));
+            break;
+          }
+          case OP_SET_NS_QUOTA: {
+            if (logVersion > -16) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
+                                      readLongWritable(in), 
+                                      FSConstants.QUOTA_DONT_SET);
+            break;
+          }
+          case OP_CLEAR_NS_QUOTA: {
+            if (logVersion > -16) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+                                      FSConstants.QUOTA_RESET,
+                                      FSConstants.QUOTA_DONT_SET);
+            break;
+          }
+  
+          case OP_SET_QUOTA:
+            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+                                      readLongWritable(in),
+                                      readLongWritable(in));
+                                        
+            break;
+  
+          case OP_TIMES: {
+            numOpTimes++;
+            int length = in.readInt();
+            if (length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "times operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            mtime = readLong(in);
             atime = readLong(in);
+            fsDir.unprotectedSetTimes(path, mtime, atime, true);
+            break;
           }
-
-          if (logVersion <= -11) {
-            permissions = PermissionStatus.read(in);
+          case OP_SYMLINK: {
+            numOpSymlink++;
+            int length = in.readInt();
+            if (length != 4) {
+              throw new IOException("Incorrect data format. " 
+                                    + "symlink operation.");
+            }
+            path = FSImageSerialization.readString(in);
+            String value = FSImageSerialization.readString(in);
+            mtime = readLong(in);
+            atime = readLong(in);
+            PermissionStatus perm = PermissionStatus.read(in);
+            fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+            break;
+          }
+          case OP_RENAME: {
+            if (logVersion > -21) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpRename++;
+            int length = in.readInt();
+            if (length != 3) {
+              throw new IOException("Incorrect data format. " 
+                                    + "Mkdir operation.");
+            }
+            String s = FSImageSerialization.readString(in);
+            String d = FSImageSerialization.readString(in);
+            timestamp = readLong(in);
+            Rename[] options = readRenameOptions(in);
+            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
+            fsDir.unprotectedRenameTo(s, d, timestamp, options);
+            fsNamesys.changeLease(s, d, dinfo);
+            break;
+          }
+          case OP_GET_DELEGATION_TOKEN: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpGetDelegationToken++;
+            DelegationTokenIdentifier delegationTokenId = 
+                new DelegationTokenIdentifier();
+            delegationTokenId.readFields(in);
+            long expiryTime = readLong(in);
+            fsNamesys.getDelegationTokenSecretManager()
+                .addPersistedDelegationToken(delegationTokenId, expiryTime);
+            break;
+          }
+          case OP_RENEW_DELEGATION_TOKEN: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpRenewDelegationToken++;
+            DelegationTokenIdentifier delegationTokenId = 
+                new DelegationTokenIdentifier();
+            delegationTokenId.readFields(in);
+            long expiryTime = readLong(in);
+            fsNamesys.getDelegationTokenSecretManager()
+                .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+            break;
+          }
+          case OP_CANCEL_DELEGATION_TOKEN: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpCancelDelegationToken++;
+            DelegationTokenIdentifier delegationTokenId = 
+                new DelegationTokenIdentifier();
+            delegationTokenId.readFields(in);
+            fsNamesys.getDelegationTokenSecretManager()
+                .updatePersistedTokenCancellation(delegationTokenId);
+            break;
+          }
+          case OP_UPDATE_MASTER_KEY: {
+            if (logVersion > -24) {
+              throw new IOException("Unexpected opCode " + opCode
+                  + " for version " + logVersion);
+            }
+            numOpUpdateMasterKey++;
+            DelegationKey delegationKey = new DelegationKey();
+            delegationKey.readFields(in);
+            fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
+                delegationKey);
+            break;
+          }
+          default: {
+            throw new IOException("Never seen opCode " + opCode);
+          }
+          }
+          validateChecksum(in, checksum, numEdits);
+        }
+      } catch (IOException ex) {
+        check203UpgradeFailure(logVersion, ex);
+      } finally {
+        if(closeOnExit)
+          in.close();
+      }
+    } catch (Throwable t) {
+      // Catch Throwable because in the case of a truly corrupt edits log, any
+      // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
+      StringBuilder sb = new StringBuilder();
+      sb.append("Error replaying edit log at offset " + tracker.getPos());
+      if (recentOpcodeOffsets[0] != -1) {
+        Arrays.sort(recentOpcodeOffsets);
+        sb.append("\nRecent opcode offsets:");
+        for (long offset : recentOpcodeOffsets) {
+          if (offset != -1) {
+            sb.append(' ').append(offset);
           }
-          fsDir.unprotectedMkdir(path, permissions, timestamp);
-          break;
-        }
-        case OP_SET_GENSTAMP: {
-          numOpSetGenStamp++;
-          long lw = in.readLong();
-          fsNamesys.setGenerationStamp(lw);
-          break;
-        } 
-        case OP_DATANODE_ADD: {
-          numOpOther++;
-          //Datanodes are not persistent any more.
-          FSImageSerialization.DatanodeImage.skipOne(in);
-          break;
-        }
-        case OP_DATANODE_REMOVE: {
-          numOpOther++;
-          DatanodeID nodeID = new DatanodeID();
-          nodeID.readFields(in);
-          //Datanodes are not persistent any more.
-          break;
-        }
-        case OP_SET_PERMISSIONS: {
-          numOpSetPerm++;
-          if (logVersion > -11)
-            throw new IOException("Unexpected opCode " + opCode
-                                  + " for version " + logVersion);
-          fsDir.unprotectedSetPermission(
-              FSImageSerialization.readString(in), FsPermission.read(in));
-          break;
-        }
-        case OP_SET_OWNER: {
-          numOpSetOwner++;
-          if (logVersion > -11)
-            throw new IOException("Unexpected opCode " + opCode
-                                  + " for version " + logVersion);
-          fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
-              FSImageSerialization.readString_EmptyAsNull(in),
-              FSImageSerialization.readString_EmptyAsNull(in));
-          break;
-        }
-        case OP_SET_NS_QUOTA: {
-          if (logVersion > -16) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
-                                    readLongWritable(in), 
-                                    FSConstants.QUOTA_DONT_SET);
-          break;
-        }
-        case OP_CLEAR_NS_QUOTA: {
-          if (logVersion > -16) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
-                                    FSConstants.QUOTA_RESET,
-                                    FSConstants.QUOTA_DONT_SET);
-          break;
         }
-
-        case OP_SET_QUOTA:
-          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
-                                    readLongWritable(in),
-                                    readLongWritable(in));
-                                      
-          break;
-
-        case OP_TIMES: {
-          numOpTimes++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "times operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          mtime = readLong(in);
-          atime = readLong(in);
-          fsDir.unprotectedSetTimes(path, mtime, atime, true);
-          break;
-        }
-        case OP_SYMLINK: {
-          numOpSymlink++;
-          int length = in.readInt();
-          if (length != 4) {
-            throw new IOException("Incorrect data format. " 
-                                  + "symlink operation.");
-          }
-          path = FSImageSerialization.readString(in);
-          String value = FSImageSerialization.readString(in);
-          mtime = readLong(in);
-          atime = readLong(in);
-          PermissionStatus perm = PermissionStatus.read(in);
-          fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
-          break;
-        }
-        case OP_RENAME: {
-          if (logVersion > -21) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpRename++;
-          int length = in.readInt();
-          if (length != 3) {
-            throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
-          }
-          String s = FSImageSerialization.readString(in);
-          String d = FSImageSerialization.readString(in);
-          timestamp = readLong(in);
-          Rename[] options = readRenameOptions(in);
-          HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-          fsDir.unprotectedRenameTo(s, d, timestamp, options);
-          fsNamesys.changeLease(s, d, dinfo);
-          break;
-        }
-        case OP_GET_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpGetDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          long expiryTime = readLong(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .addPersistedDelegationToken(delegationTokenId, expiryTime);
-          break;
-        }
-        case OP_RENEW_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpRenewDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          long expiryTime = readLong(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
-          break;
-        }
-        case OP_CANCEL_DELEGATION_TOKEN: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpCancelDelegationToken++;
-          DelegationTokenIdentifier delegationTokenId = 
-              new DelegationTokenIdentifier();
-          delegationTokenId.readFields(in);
-          fsNamesys.getDelegationTokenSecretManager()
-              .updatePersistedTokenCancellation(delegationTokenId);
-          break;
-        }
-        case OP_UPDATE_MASTER_KEY: {
-          if (logVersion > -24) {
-            throw new IOException("Unexpected opCode " + opCode
-                + " for version " + logVersion);
-          }
-          numOpUpdateMasterKey++;
-          DelegationKey delegationKey = new DelegationKey();
-          delegationKey.readFields(in);
-          fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
-              delegationKey);
-          break;
-        }
-        default: {
-          throw new IOException("Never seen opCode " + opCode);
-        }
-        }
-        validateChecksum(in, checksum, numEdits);
       }
-    } catch (IOException ex) {
-      check203UpgradeFailure(logVersion, ex);
-    } finally {
-      if(closeOnExit)
-        in.close();
+      String errorMessage = sb.toString();
+      FSImage.LOG.error(errorMessage);
+      throw new IOException(errorMessage, t);
     }
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
@@ -675,4 +706,52 @@ public class FSEditLogLoader {
       throw ex;
     }
   }
+  
+  /**
+   * Stream wrapper that keeps track of the current file position.
+   */
+  private static class PositionTrackingInputStream extends FilterInputStream {
+    private long curPos = 0;
+    private long markPos = -1;
+
+    public PositionTrackingInputStream(InputStream is) {
+      super(is);
+    }
+
+    public int read() throws IOException {
+      int ret = super.read();
+      if (ret != -1) curPos++;
+      return ret;
+    }
+
+    public int read(byte[] data) throws IOException {
+      int ret = super.read(data);
+      if (ret > 0) curPos += ret;
+      return ret;
+    }
+
+    public int read(byte[] data, int offset, int length) throws IOException {
+      int ret = super.read(data, offset, length);
+      if (ret > 0) curPos += ret;
+      return ret;
+    }
+
+    public void mark(int limit) {
+      super.mark(limit);
+      markPos = curPos;
+    }
+
+    public void reset() throws IOException {
+      if (markPos == -1) {
+        throw new IOException("Not marked!");
+      }
+      super.reset();
+      curPos = markPos;
+      markPos = -1;
+    }
+
+    public long getPos() {
+      return curPos;
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1102146&r1=1102145&r2=1102146&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Thu May 12 00:18:42 2011
@@ -335,8 +335,10 @@ public class TestEditLog extends TestCas
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
       fail("should not be able to start");
-    } catch (ChecksumException e) {
+    } catch (IOException e) {
       // expected
+      assertEquals("Cause of exception should be ChecksumException",
+          e.getCause().getClass(), ChecksumException.class);
     }
   }
 }

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1102146&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
(added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Thu May 12 00:18:42 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Test;
+
+public class TestFSEditLogLoader {
+  
+  private static final int NUM_DATA_NODES = 0;
+  
+  @Test
+  public void testDisplayRecentEditLogOpCodes() throws IOException {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
+        .build();
+    cluster.waitActive();
+    fileSys = cluster.getFileSystem();
+    final FSNamesystem namesystem = cluster.getNamesystem();
+
+    FSImage fsimage = namesystem.getFSImage();
+    final FSEditLog editLog = fsimage.getEditLog();
+    for (int i = 0; i < 20; i++) {
+      fileSys.mkdirs(new Path("/tmp/tmp" + i));
+    }
+    File editFile = editLog.getFsEditName();
+    editLog.close();
+    cluster.shutdown();
+    
+    // Corrupt the edits file.
+    long fileLen = editFile.length();
+    RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
+    rwf.seek(fileLen - 40);
+    for (int i = 0; i < 20; i++) {
+      rwf.write(FSEditLogOpCodes.OP_DELETE.getOpCode());
+    }
+    rwf.close();
+    
+    String expectedErrorMessage = "^Error replaying edit log at offset \\d+\n";
+    expectedErrorMessage += "Recent opcode offsets: (\\d+\\s*){4}$";
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
+          .format(false).build();
+      fail("should not be able to start");
+    } catch (IOException e) {
+      assertTrue("error message contains opcodes message",
+          e.getMessage().matches(expectedErrorMessage));
+    }
+  }
+}



Mime
View raw message