hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1311518 [2/3] - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/serve...
Date Mon, 09 Apr 2012 23:25:22 GMT
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Apr  9 23:25:17 2012
@@ -71,9 +71,11 @@ public class FSEditLogLoader {
   static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
   static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
   private final FSNamesystem fsNamesys;
-
-  public FSEditLogLoader(FSNamesystem fsNamesys) {
+  private long lastAppliedTxId;
+  
+  public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
     this.fsNamesys = fsNamesys;
+    this.lastAppliedTxId = lastAppliedTxId;
   }
   
   /**
@@ -81,32 +83,29 @@ public class FSEditLogLoader {
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
-      throws IOException {
-    long numEdits = 0;
+  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
+      MetaRecoveryContext recovery) throws IOException {
     int logVersion = edits.getVersion();
 
     fsNamesys.writeLock();
     try {
       long startTime = now();
-      numEdits = loadEditRecords(logVersion, edits, false, 
-                                 expectedStartingTxId);
+      long numEdits = loadEditRecords(logVersion, edits, false, 
+                                 expectedStartingTxId, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
           + " loaded in " + (now()-startTime)/1000 + " seconds.");
+      return numEdits;
     } finally {
       edits.close();
       fsNamesys.writeUnlock();
     }
-    
-    return numEdits;
   }
 
   long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
-                      long expectedStartingTxId)
-      throws IOException, EditLogInputException {
+                      long expectedStartingTxId, MetaRecoveryContext recovery)
+      throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
-    long numEdits = 0;
 
     EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
       new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@@ -120,72 +119,99 @@ public class FSEditLogLoader {
 
     long recentOpcodeOffsets[] = new long[4];
     Arrays.fill(recentOpcodeOffsets, -1);
-
-    long txId = expectedStartingTxId - 1;
+    
+    long expectedTxId = expectedStartingTxId;
+    long numEdits = 0;
     long lastTxId = in.getLastTxId();
     long numTxns = (lastTxId - expectedStartingTxId) + 1;
-
     long lastLogTime = now();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("edit log length: " + in.length() + ", start txid: "
           + expectedStartingTxId + ", last txid: " + lastTxId);
     }
-
     try {
-      try {
-        while (true) {
+      while (true) {
+        try {
           FSEditLogOp op;
           try {
-            if ((op = in.readOp()) == null) {
+            op = in.readOp();
+            if (op == null) {
               break;
             }
-          } catch (IOException ioe) {
-            long badTxId = txId + 1; // because txId hasn't been incremented yet
-            String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, badTxId);
+          } catch (Throwable e) {
+            // Handle a problem with our input
+            check203UpgradeFailure(logVersion, e);
+            String errorMessage =
+              formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
             FSImage.LOG.error(errorMessage);
-            throw new EditLogInputException(errorMessage,
-                ioe, numEdits);
+            if (recovery == null) {
+               // We will only try to skip over problematic opcodes when in
+               // recovery mode.
+              throw new EditLogInputException(errorMessage, e, numEdits);
+            }
+            MetaRecoveryContext.editLogLoaderPrompt(
+                "We failed to read txId " + expectedTxId,
+                recovery, "skipping the bad section in the log");
+            in.resync();
+            continue;
           }
           recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
             in.getPosition();
           if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
-            long expectedTxId = txId + 1;
-            txId = op.txid;
-            if (txId != expectedTxId) {
-              throw new IOException("Expected transaction ID " +
-                  expectedTxId + " but got " + txId);
+            if (op.getTransactionId() > expectedTxId) { 
+              MetaRecoveryContext.editLogLoaderPrompt("There appears " +
+                  "to be a gap in the edit log.  We expected txid " +
+                  expectedTxId + ", but got txid " +
+                  op.getTransactionId() + ".", recovery, "ignoring missing " +
+                  " transaction IDs");
+            } else if (op.getTransactionId() < expectedTxId) { 
+              MetaRecoveryContext.editLogLoaderPrompt("There appears " +
+                  "to be an out-of-order edit in the edit log.  We " +
+                  "expected txid " + expectedTxId + ", but got txid " +
+                  op.getTransactionId() + ".", recovery,
+                  "skipping the out-of-order edit");
+              continue;
             }
           }
-
-          incrOpCount(op.opCode, opCounts);
           try {
             applyEditLogOp(op, fsDir, logVersion);
-          } catch (Throwable t) {
-            // Catch Throwable because in the case of a truly corrupt edits log, any
-            // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
-            String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, txId);
-            FSImage.LOG.error(errorMessage);
-            throw new IOException(errorMessage, t);
+          } catch (Throwable e) {
+            LOG.error("Encountered exception on operation " + op, e);
+            MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
+             "apply edit log operation " + op + ": error " +
+             e.getMessage(), recovery, "applying edits");
+          }
+          // Now that the operation has been successfully decoded and
+          // applied, update our bookkeeping.
+          incrOpCount(op.opCode, opCounts);
+          if (op.hasTransactionId()) {
+            lastAppliedTxId = op.getTransactionId();
+            expectedTxId = lastAppliedTxId + 1;
+          } else {
+            expectedTxId = lastAppliedTxId = expectedStartingTxId;
           }
-
           // log progress
-          if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
-            int percent = Math.round((float) txId / numTxns * 100);
-            LOG.info("replaying edit log: " + txId + "/" + numTxns
-                + " transactions completed. (" + percent + "%)");
-            lastLogTime = now();
+          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+            long now = now();
+            if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
+              int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
+              LOG.info("replaying edit log: " + lastAppliedTxId + "/" + numTxns
+                  + " transactions completed. (" + percent + "%)");
+              lastLogTime = now;
+            }
           }
-
           numEdits++;
+        } catch (MetaRecoveryContext.RequestStopException e) {
+          MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
+              in.getPosition() + "/"  + in.length());
+          break;
         }
-      } catch (IOException ex) {
-        check203UpgradeFailure(logVersion, ex);
-      } finally {
-        if(closeOnExit)
-          in.close();
       }
     } finally {
+      if(closeOnExit) {
+        in.close();
+      }
       fsDir.writeUnlock();
       fsNamesys.writeUnlock();
 
@@ -472,7 +498,7 @@ public class FSEditLogLoader {
       long recentOpcodeOffsets[], long txid) {
     StringBuilder sb = new StringBuilder();
     sb.append("Error replaying edit log at offset " + in.getPosition());
-    sb.append(" on transaction ID ").append(txid);
+    sb.append(".  Expected transaction ID was ").append(txid);
     if (recentOpcodeOffsets[0] != -1) {
       Arrays.sort(recentOpcodeOffsets);
       sb.append("\nRecent opcode offsets:");
@@ -519,7 +545,7 @@ public class FSEditLogLoader {
       if (oldBlock.getBlockId() != newBlock.getBlockId() ||
           (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
               !(isGenStampUpdate && isLastBlock))) {
-        throw new IOException("Mismatched block IDs or generation stamps, " + 
+        throw new IOException("Mismatched block IDs or generation stamps, " +
             "attempting to replace block " + oldBlock + " with " + newBlock +
             " as block # " + i + "/" + newBlocks.length + " of " +
             path);
@@ -605,7 +631,7 @@ public class FSEditLogLoader {
    * Throw appropriate exception during upgrade from 203, when editlog loading
    * could fail due to opcode conflicts.
    */
-  private void check203UpgradeFailure(int logVersion, IOException ex)
+  private void check203UpgradeFailure(int logVersion, Throwable e)
       throws IOException {
     // 0.20.203 version version has conflicting opcodes with the later releases.
     // The editlog must be emptied by restarting the namenode, before proceeding
@@ -616,9 +642,7 @@ public class FSEditLogLoader {
           + logVersion + " from release 0.20.203. Please go back to the old "
           + " release and restart the namenode. This empties the editlog "
           + " and saves the namespace. Resume the upgrade after this step.";
-      throw new IOException(msg, ex);
-    } else {
-      throw ex;
+      throw new IOException(msg, e);
     }
   }
   
@@ -643,14 +667,14 @@ public class FSEditLogLoader {
           break;
         }
         if (firstTxId == HdfsConstants.INVALID_TXID) {
-          firstTxId = op.txid;
+          firstTxId = op.getTransactionId();
         }
         if (lastTxId == HdfsConstants.INVALID_TXID
-            || op.txid == lastTxId + 1) {
-          lastTxId = op.txid;
+            || op.getTransactionId() == lastTxId + 1) {
+          lastTxId = op.getTransactionId();
         } else {
-          FSImage.LOG.error("Out of order txid found. Found " + op.txid 
-                            + ", expected " + (lastTxId + 1));
+          FSImage.LOG.error("Out of order txid found. Found " +
+            op.getTransactionId() + ", expected " + (lastTxId + 1));
           break;
         }
         numValid++;
@@ -743,4 +767,7 @@ public class FSEditLogLoader {
     }
   }
 
+  public long getLastAppliedTxId() {
+    return lastAppliedTxId;
+  }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Apr  9 23:25:17 2012
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Options.Rena
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.util.PureJavaCrc32;
@@ -54,6 +56,8 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
 
+import com.google.common.base.Preconditions;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataInputStream;
@@ -74,42 +78,44 @@ public abstract class FSEditLogOp {
 
 
   @SuppressWarnings("deprecation")
-  private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
-    new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
-      @Override
-      protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
-        EnumMap<FSEditLogOpCodes, FSEditLogOp> instances 
-          = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
-        instances.put(OP_ADD, new AddOp());
-        instances.put(OP_CLOSE, new CloseOp());
-        instances.put(OP_SET_REPLICATION, new SetReplicationOp());
-        instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
-        instances.put(OP_RENAME_OLD, new RenameOldOp());
-        instances.put(OP_DELETE, new DeleteOp());
-        instances.put(OP_MKDIR, new MkdirOp());
-        instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
-        instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
-        instances.put(OP_SET_OWNER, new SetOwnerOp());
-        instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
-        instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
-        instances.put(OP_SET_QUOTA, new SetQuotaOp());
-        instances.put(OP_TIMES, new TimesOp());
-        instances.put(OP_SYMLINK, new SymlinkOp());
-        instances.put(OP_RENAME, new RenameOp());
-        instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
-        instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
-        instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
-        instances.put(OP_CANCEL_DELEGATION_TOKEN, 
-                      new CancelDelegationTokenOp());
-        instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
-        instances.put(OP_START_LOG_SEGMENT,
-                      new LogSegmentOp(OP_START_LOG_SEGMENT));
-        instances.put(OP_END_LOG_SEGMENT,
-                      new LogSegmentOp(OP_END_LOG_SEGMENT));
-        instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
-        return instances;
-      }
-  };
+  final public static class OpInstanceCache {
+    private EnumMap<FSEditLogOpCodes, FSEditLogOp> inst = 
+        new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
+    
+    public OpInstanceCache() {
+      inst.put(OP_ADD, new AddOp());
+      inst.put(OP_CLOSE, new CloseOp());
+      inst.put(OP_SET_REPLICATION, new SetReplicationOp());
+      inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+      inst.put(OP_RENAME_OLD, new RenameOldOp());
+      inst.put(OP_DELETE, new DeleteOp());
+      inst.put(OP_MKDIR, new MkdirOp());
+      inst.put(OP_SET_GENSTAMP, new SetGenstampOp());
+      inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+      inst.put(OP_SET_OWNER, new SetOwnerOp());
+      inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+      inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+      inst.put(OP_SET_QUOTA, new SetQuotaOp());
+      inst.put(OP_TIMES, new TimesOp());
+      inst.put(OP_SYMLINK, new SymlinkOp());
+      inst.put(OP_RENAME, new RenameOp());
+      inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+      inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+      inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+      inst.put(OP_CANCEL_DELEGATION_TOKEN, 
+                    new CancelDelegationTokenOp());
+      inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+      inst.put(OP_START_LOG_SEGMENT,
+                    new LogSegmentOp(OP_START_LOG_SEGMENT));
+      inst.put(OP_END_LOG_SEGMENT,
+                    new LogSegmentOp(OP_END_LOG_SEGMENT));
+      inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
+    }
+    
+    public FSEditLogOp get(FSEditLogOpCodes opcode) {
+      return inst.get(opcode);
+    }
+  }
 
   /**
    * Constructor for an EditLog Op. EditLog ops cannot be constructed
@@ -117,13 +123,22 @@ public abstract class FSEditLogOp {
    */
   private FSEditLogOp(FSEditLogOpCodes opCode) {
     this.opCode = opCode;
-    this.txid = 0;
+    this.txid = HdfsConstants.INVALID_TXID;
   }
 
   public long getTransactionId() {
+    Preconditions.checkState(txid != HdfsConstants.INVALID_TXID);
     return txid;
   }
 
+  public String getTransactionIdStr() {
+    return (txid == HdfsConstants.INVALID_TXID) ? "(none)" : "" + txid;
+  }
+  
+  public boolean hasTransactionId() {
+    return (txid != HdfsConstants.INVALID_TXID);
+  }
+
   public void setTransactionId(long txid) {
     this.txid = txid;
   }
@@ -373,8 +388,8 @@ public abstract class FSEditLogOp {
       super(OP_ADD);
     }
 
-    static AddOp getInstance() {
-      return (AddOp)opInstances.get().get(OP_ADD);
+    static AddOp getInstance(OpInstanceCache cache) {
+      return (AddOp)cache.get(OP_ADD);
     }
 
     public boolean shouldCompleteLastBlock() {
@@ -395,8 +410,8 @@ public abstract class FSEditLogOp {
       super(OP_CLOSE);
     }
 
-    static CloseOp getInstance() {
-      return (CloseOp)opInstances.get().get(OP_CLOSE);
+    static CloseOp getInstance(OpInstanceCache cache) {
+      return (CloseOp)cache.get(OP_CLOSE);
     }
 
     public boolean shouldCompleteLastBlock() {
@@ -420,9 +435,8 @@ public abstract class FSEditLogOp {
       super(OP_UPDATE_BLOCKS);
     }
     
-    static UpdateBlocksOp getInstance() {
-      return (UpdateBlocksOp)opInstances.get()
-        .get(OP_UPDATE_BLOCKS);
+    static UpdateBlocksOp getInstance(OpInstanceCache cache) {
+      return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
     }
     
     
@@ -500,9 +514,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_REPLICATION);
     }
 
-    static SetReplicationOp getInstance() {
-      return (SetReplicationOp)opInstances.get()
-        .get(OP_SET_REPLICATION);
+    static SetReplicationOp getInstance(OpInstanceCache cache) {
+      return (SetReplicationOp)cache.get(OP_SET_REPLICATION);
     }
 
     SetReplicationOp setPath(String path) {
@@ -571,9 +584,8 @@ public abstract class FSEditLogOp {
       super(OP_CONCAT_DELETE);
     }
 
-    static ConcatDeleteOp getInstance() {
-      return (ConcatDeleteOp)opInstances.get()
-        .get(OP_CONCAT_DELETE);
+    static ConcatDeleteOp getInstance(OpInstanceCache cache) {
+      return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE);
     }
 
     ConcatDeleteOp setTarget(String trg) {
@@ -697,9 +709,8 @@ public abstract class FSEditLogOp {
       super(OP_RENAME_OLD);
     }
 
-    static RenameOldOp getInstance() {
-      return (RenameOldOp)opInstances.get()
-        .get(OP_RENAME_OLD);
+    static RenameOldOp getInstance(OpInstanceCache cache) {
+      return (RenameOldOp)cache.get(OP_RENAME_OLD);
     }
 
     RenameOldOp setSource(String src) {
@@ -790,9 +801,8 @@ public abstract class FSEditLogOp {
       super(OP_DELETE);
     }
 
-    static DeleteOp getInstance() {
-      return (DeleteOp)opInstances.get()
-        .get(OP_DELETE);
+    static DeleteOp getInstance(OpInstanceCache cache) {
+      return (DeleteOp)cache.get(OP_DELETE);
     }
 
     DeleteOp setPath(String path) {
@@ -872,9 +882,8 @@ public abstract class FSEditLogOp {
       super(OP_MKDIR);
     }
     
-    static MkdirOp getInstance() {
-      return (MkdirOp)opInstances.get()
-        .get(OP_MKDIR);
+    static MkdirOp getInstance(OpInstanceCache cache) {
+      return (MkdirOp)cache.get(OP_MKDIR);
     }
 
     MkdirOp setPath(String path) {
@@ -977,9 +986,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_GENSTAMP);
     }
 
-    static SetGenstampOp getInstance() {
-      return (SetGenstampOp)opInstances.get()
-        .get(OP_SET_GENSTAMP);
+    static SetGenstampOp getInstance(OpInstanceCache cache) {
+      return (SetGenstampOp)cache.get(OP_SET_GENSTAMP);
     }
 
     SetGenstampOp setGenerationStamp(long genStamp) {
@@ -1031,9 +1039,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_PERMISSIONS);
     }
 
-    static SetPermissionsOp getInstance() {
-      return (SetPermissionsOp)opInstances.get()
-        .get(OP_SET_PERMISSIONS);
+    static SetPermissionsOp getInstance(OpInstanceCache cache) {
+      return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS);
     }
 
     SetPermissionsOp setSource(String src) {
@@ -1098,9 +1105,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_OWNER);
     }
 
-    static SetOwnerOp getInstance() {
-      return (SetOwnerOp)opInstances.get()
-        .get(OP_SET_OWNER);
+    static SetOwnerOp getInstance(OpInstanceCache cache) {
+      return (SetOwnerOp)cache.get(OP_SET_OWNER);
     }
 
     SetOwnerOp setSource(String src) {
@@ -1179,9 +1185,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_NS_QUOTA);
     }
 
-    static SetNSQuotaOp getInstance() {
-      return (SetNSQuotaOp)opInstances.get()
-        .get(OP_SET_NS_QUOTA);
+    static SetNSQuotaOp getInstance(OpInstanceCache cache) {
+      return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA);
     }
 
     @Override
@@ -1232,9 +1237,8 @@ public abstract class FSEditLogOp {
       super(OP_CLEAR_NS_QUOTA);
     }
 
-    static ClearNSQuotaOp getInstance() {
-      return (ClearNSQuotaOp)opInstances.get()
-        .get(OP_CLEAR_NS_QUOTA);
+    static ClearNSQuotaOp getInstance(OpInstanceCache cache) {
+      return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA);
     }
 
     @Override
@@ -1281,9 +1285,8 @@ public abstract class FSEditLogOp {
       super(OP_SET_QUOTA);
     }
 
-    static SetQuotaOp getInstance() {
-      return (SetQuotaOp)opInstances.get()
-        .get(OP_SET_QUOTA);
+    static SetQuotaOp getInstance(OpInstanceCache cache) {
+      return (SetQuotaOp)cache.get(OP_SET_QUOTA);
     }
 
     SetQuotaOp setSource(String src) {
@@ -1360,9 +1363,8 @@ public abstract class FSEditLogOp {
       super(OP_TIMES);
     }
 
-    static TimesOp getInstance() {
-      return (TimesOp)opInstances.get()
-        .get(OP_TIMES);
+    static TimesOp getInstance(OpInstanceCache cache) {
+      return (TimesOp)cache.get(OP_TIMES);
     }
 
     TimesOp setPath(String path) {
@@ -1458,9 +1460,8 @@ public abstract class FSEditLogOp {
       super(OP_SYMLINK);
     }
 
-    static SymlinkOp getInstance() {
-      return (SymlinkOp)opInstances.get()
-        .get(OP_SYMLINK);
+    static SymlinkOp getInstance(OpInstanceCache cache) {
+      return (SymlinkOp)cache.get(OP_SYMLINK);
     }
 
     SymlinkOp setPath(String path) {
@@ -1579,9 +1580,8 @@ public abstract class FSEditLogOp {
       super(OP_RENAME);
     }
 
-    static RenameOp getInstance() {
-      return (RenameOp)opInstances.get()
-        .get(OP_RENAME);
+    static RenameOp getInstance(OpInstanceCache cache) {
+      return (RenameOp)cache.get(OP_RENAME);
     }
 
     RenameOp setSource(String src) {
@@ -1723,9 +1723,8 @@ public abstract class FSEditLogOp {
       super(OP_REASSIGN_LEASE);
     }
 
-    static ReassignLeaseOp getInstance() {
-      return (ReassignLeaseOp)opInstances.get()
-        .get(OP_REASSIGN_LEASE);
+    static ReassignLeaseOp getInstance(OpInstanceCache cache) {
+      return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE);
     }
 
     ReassignLeaseOp setLeaseHolder(String leaseHolder) {
@@ -1798,9 +1797,8 @@ public abstract class FSEditLogOp {
       super(OP_GET_DELEGATION_TOKEN);
     }
 
-    static GetDelegationTokenOp getInstance() {
-      return (GetDelegationTokenOp)opInstances.get()
-        .get(OP_GET_DELEGATION_TOKEN);
+    static GetDelegationTokenOp getInstance(OpInstanceCache cache) {
+      return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN);
     }
 
     GetDelegationTokenOp setDelegationTokenIdentifier(
@@ -1870,9 +1868,8 @@ public abstract class FSEditLogOp {
       super(OP_RENEW_DELEGATION_TOKEN);
     }
 
-    static RenewDelegationTokenOp getInstance() {
-      return (RenewDelegationTokenOp)opInstances.get()
-          .get(OP_RENEW_DELEGATION_TOKEN);
+    static RenewDelegationTokenOp getInstance(OpInstanceCache cache) {
+      return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN);
     }
 
     RenewDelegationTokenOp setDelegationTokenIdentifier(
@@ -1941,9 +1938,8 @@ public abstract class FSEditLogOp {
       super(OP_CANCEL_DELEGATION_TOKEN);
     }
 
-    static CancelDelegationTokenOp getInstance() {
-      return (CancelDelegationTokenOp)opInstances.get()
-          .get(OP_CANCEL_DELEGATION_TOKEN);
+    static CancelDelegationTokenOp getInstance(OpInstanceCache cache) {
+      return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN);
     }
 
     CancelDelegationTokenOp setDelegationTokenIdentifier(
@@ -1996,9 +1992,8 @@ public abstract class FSEditLogOp {
       super(OP_UPDATE_MASTER_KEY);
     }
 
-    static UpdateMasterKeyOp getInstance() {
-      return (UpdateMasterKeyOp)opInstances.get()
-          .get(OP_UPDATE_MASTER_KEY);
+    static UpdateMasterKeyOp getInstance(OpInstanceCache cache) {
+      return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY);
     }
 
     UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
@@ -2050,8 +2045,9 @@ public abstract class FSEditLogOp {
              code == OP_END_LOG_SEGMENT : "Bad op: " + code;
     }
 
-    static LogSegmentOp getInstance(FSEditLogOpCodes code) {
-      return (LogSegmentOp)opInstances.get().get(code);
+    static LogSegmentOp getInstance(OpInstanceCache cache,
+        FSEditLogOpCodes code) {
+      return (LogSegmentOp)cache.get(code);
     }
 
     public void readFields(DataInputStream in, int logVersion)
@@ -2091,8 +2087,8 @@ public abstract class FSEditLogOp {
       super(OP_INVALID);
     }
 
-    static InvalidOp getInstance() {
-      return (InvalidOp)opInstances.get().get(OP_INVALID);
+    static InvalidOp getInstance(OpInstanceCache cache) {
+      return (InvalidOp)cache.get(OP_INVALID);
     }
 
     @Override
@@ -2207,6 +2203,7 @@ public abstract class FSEditLogOp {
     private final DataInputStream in;
     private final int logVersion;
     private final Checksum checksum;
+    private final OpInstanceCache cache;
 
     /**
      * Construct the reader
@@ -2228,6 +2225,7 @@ public abstract class FSEditLogOp {
       } else {
         this.in = in;
       }
+      this.cache = new OpInstanceCache();
     }
 
     /**
@@ -2236,16 +2234,42 @@ public abstract class FSEditLogOp {
      * Note that the objects returned from this method may be re-used by future
      * calls to the same method.
      * 
+     * @param skipBrokenEdits    If true, attempt to skip over damaged parts of
+     * the input stream, rather than throwing an IOException
      * @return the operation read from the stream, or null at the end of the file
      * @throws IOException on error.
      */
-    public FSEditLogOp readOp() throws IOException {
+    public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
+      FSEditLogOp op = null;
+      while (true) {
+        try {
+          in.mark(in.available());
+          try {
+            op = decodeOp();
+          } finally {
+            // If we encountered an exception or an end-of-file condition,
+            // do not advance the input stream.
+            if (op == null) {
+              in.reset();
+            }
+          }
+          return op;
+        } catch (IOException e) {
+          if (!skipBrokenEdits) {
+            throw e;
+          }
+          if (in.skip(1) < 1) {
+            return null;
+          }
+        }
+      }
+    }
+
+    private FSEditLogOp decodeOp() throws IOException {
       if (checksum != null) {
         checksum.reset();
       }
 
-      in.mark(1);
-
       byte opCodeByte;
       try {
         opCodeByte = in.readByte();
@@ -2255,12 +2279,10 @@ public abstract class FSEditLogOp {
       }
 
       FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-      if (opCode == OP_INVALID) {
-        in.reset(); // reset back to end of file if somebody reads it again
+      if (opCode == OP_INVALID)
         return null;
-      }
 
-      FSEditLogOp op = opInstances.get().get(opCode);
+      FSEditLogOp op = cache.get(opCode);
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }
@@ -2268,6 +2290,8 @@ public abstract class FSEditLogOp {
       if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
         // Read the txid
         op.setTransactionId(in.readLong());
+      } else {
+        op.setTransactionId(HdfsConstants.INVALID_TXID);
       }
 
       op.readFields(in, logVersion);
@@ -2426,8 +2450,4 @@ public abstract class FSEditLogOp {
     short mode = Short.valueOf(st.getValue("MODE"));
     return new PermissionStatus(username, groupname, new FsPermission(mode));
   }
-
-  public static FSEditLogOp getOpInstance(FSEditLogOpCodes opCode) {
-    return opInstances.get().get(opCode);
-  }
-}
+		}

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Apr  9 23:25:17 2012
@@ -158,8 +158,8 @@ public class FSImage implements Closeabl
    * @throws IOException
    * @return true if the image needs to be saved or false otherwise
    */
-  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
-      throws IOException {
+  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
+      MetaRecoveryContext recovery) throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
     
@@ -244,7 +244,7 @@ public class FSImage implements Closeabl
       // just load the image
     }
     
-    return loadFSImage(target);
+    return loadFSImage(target, recovery);
   }
   
   /**
@@ -304,7 +304,7 @@ public class FSImage implements Closeabl
     if(storage.getDistributedUpgradeState()) {
       // only distributed upgrade need to continue
       // don't do version upgrade
-      this.loadFSImage(target);
+      this.loadFSImage(target, null);
       storage.initializeDistributedUpgrade();
       return;
     }
@@ -319,7 +319,7 @@ public class FSImage implements Closeabl
     }
 
     // load the latest image
-    this.loadFSImage(target);
+    this.loadFSImage(target, null);
 
     // Do upgrade for each directory
     long oldCTime = storage.getCTime();
@@ -505,7 +505,7 @@ public class FSImage implements Closeabl
     target.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
-      ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
+      ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
     } finally {
       ckptImage.close();
     }
@@ -550,7 +550,7 @@ public class FSImage implements Closeabl
     target.dir.reset();
 
     LOG.debug("Reloading namespace from " + file);
-    loadFSImage(file, target);
+    loadFSImage(file, target, null);
   }
 
   /**
@@ -568,7 +568,8 @@ public class FSImage implements Closeabl
    * @return whether the image should be saved
    * @throws IOException
    */
-  boolean loadFSImage(FSNamesystem target) throws IOException {
+  boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
+      throws IOException {
     FSImageStorageInspector inspector = storage.readAndInspectDirs();
     
     isUpgradeFinalized = inspector.isUpgradeFinalized();
@@ -583,7 +584,6 @@ public class FSImage implements Closeabl
       // We only want to recover streams if we're going into Active mode.
       editLog.recoverUnclosedStreams();
     }
-
     if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
                                getLayoutVersion())) {
       // If we're open for write, we're either non-HA or we're the active NN, so
@@ -610,7 +610,7 @@ public class FSImage implements Closeabl
                                  getLayoutVersion())) {
         // For txid-based layout, we should have a .md5 file
         // next to the image file
-        loadFSImage(imageFile.getFile(), target);
+        loadFSImage(imageFile.getFile(), target, recovery);
       } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
                                         getLayoutVersion())) {
         // In 0.22, we have the checksum stored in the VERSION file.
@@ -622,22 +622,19 @@ public class FSImage implements Closeabl
               NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
               " not set for storage directory " + sdForProperties.getRoot());
         }
-        loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
+        loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery);
       } else {
         // We don't have any record of the md5sum
-        loadFSImage(imageFile.getFile(), null, target);
+        loadFSImage(imageFile.getFile(), null, target, recovery);
       }
     } catch (IOException ioe) {
       FSEditLog.closeAllStreams(editStreams);
       throw new IOException("Failed to load image from " + imageFile, ioe);
     }
-    
-    long numLoaded = loadEdits(editStreams, target);
+    long txnsAdvanced = loadEdits(editStreams, target, recovery);
     needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
-                                                    numLoaded);
-    
-    // update the txid for the edit log
-    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
+                                                    txnsAdvanced);
+    editLog.setNextTxId(lastAppliedTxId + 1);
     return needToSave;
   }
 
@@ -664,33 +661,29 @@ public class FSImage implements Closeabl
   
   /**
    * Load the specified list of edit files into the image.
-   * @return the number of transactions loaded
    */
   public long loadEdits(Iterable<EditLogInputStream> editStreams,
-      FSNamesystem target) throws IOException, EditLogInputException {
+      FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
-
-    long startingTxId = getLastAppliedTxId() + 1;
-    long numLoaded = 0;
-
+    
+    long prevLastAppliedTxId = lastAppliedTxId;  
     try {    
-      FSEditLogLoader loader = new FSEditLogLoader(target);
+      FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
       
       // Load latest edits
       for (EditLogInputStream editIn : editStreams) {
-        LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
-        long thisNumLoaded = 0;
+        LOG.info("Reading " + editIn + " expecting start txid #" +
+              (lastAppliedTxId + 1));
         try {
-          thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
-        } catch (EditLogInputException elie) {
-          thisNumLoaded = elie.getNumEditsLoaded();
-          throw elie;
+          loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
         } finally {
           // Update lastAppliedTxId even in case of error, since some ops may
           // have been successfully applied before the error.
-          lastAppliedTxId = startingTxId + thisNumLoaded - 1;
-          startingTxId += thisNumLoaded;
-          numLoaded += thisNumLoaded;
+          lastAppliedTxId = loader.getLastAppliedTxId();
+        }
+        // If we are in recovery mode, we may have skipped over some txids.
+        if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
+          lastAppliedTxId = editIn.getLastTxId();
         }
       }
     } finally {
@@ -698,8 +691,7 @@ public class FSImage implements Closeabl
       // update the counts
       target.dir.updateCountForINodeWithQuota();   
     }
-    
-    return numLoaded;
+    return lastAppliedTxId - prevLastAppliedTxId;
   }
 
 
@@ -707,14 +699,14 @@ public class FSImage implements Closeabl
    * Load the image namespace from the given image file, verifying
    * it against the MD5 sum stored in its associated .md5 file.
    */
-  private void loadFSImage(File imageFile, FSNamesystem target)
-      throws IOException {
+  private void loadFSImage(File imageFile, FSNamesystem target,
+      MetaRecoveryContext recovery) throws IOException {
     MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
     if (expectedMD5 == null) {
       throw new IOException("No MD5 file found corresponding to image file "
           + imageFile);
     }
-    loadFSImage(imageFile, expectedMD5, target);
+    loadFSImage(imageFile, expectedMD5, target, recovery);
   }
   
   /**
@@ -722,7 +714,7 @@ public class FSImage implements Closeabl
    * filenames and blocks.
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
-      FSNamesystem target) throws IOException {
+      FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, target);
     loader.load(curFile);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Mon Apr  9 23:25:17 2012
@@ -56,7 +56,14 @@ class FSImageTransactionalStorageInspect
       return;
     }
     
-    maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    // Check for a seen_txid file, which marks a minimum transaction ID that
+    // must be included in our load plan.
+    try {
+      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    } catch (IOException ioe) {
+      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
+      return;
+    }
 
     File currentDir = sd.getCurrentDir();
     File filesInStorage[];
@@ -91,15 +98,6 @@ class FSImageTransactionalStorageInspect
       }
     }
     
-
-    // Check for a seen_txid file, which marks a minimum transaction ID that
-    // must be included in our load plan.
-    try {
-      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
-    } catch (IOException ioe) {
-      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
-    }
-    
     // set finalized flag
     isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Apr  9 23:25:17 2012
@@ -380,9 +380,12 @@ public class FSNamesystem implements Nam
 
     FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
     FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
+    StartupOption startOpt = NameNode.getStartupOption(conf);
+    if (startOpt == StartupOption.RECOVER) {
+      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    }
 
     long loadStart = now();
-    StartupOption startOpt = NameNode.getStartupOption(conf);
     String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
     namesystem.loadFSImage(startOpt, fsImage,
       HAUtil.isHAEnabled(conf, nameserviceId));
@@ -491,7 +494,8 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       // We shouldn't be calling saveNamespace if we've come up in standby state.
-      if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) {
+      MetaRecoveryContext recovery = startOpt.createRecoveryContext();
+      if (fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled) {
         fsImage.saveNamespace(this);
       }
       // This will start a new log segment and write to the seen_txid file, so
@@ -2120,10 +2124,12 @@ public class FSNamesystem implements Nam
 
   /** 
    * Check all blocks of a file. If any blocks are lower than their intended
-   * replication factor, then insert them into neededReplication
+   * replication factor, then insert them into neededReplication and if 
+   * the blocks are more than the intended replication factor then insert 
+   * them into invalidateBlocks.
    */
   private void checkReplicationFactor(INodeFile file) {
-    int numExpectedReplicas = file.getReplication();
+    short numExpectedReplicas = file.getReplication();
     Block[] pendingBlocks = file.getBlocks();
     int nrBlocks = pendingBlocks.length;
     for (int i = 0; i < nrBlocks; i++) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Mon Apr  9 23:25:17 2012
@@ -232,7 +232,10 @@ class FileJournalManager implements Jour
           LOG.info(String.format("Log begins at txid %d, but requested start "
               + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
               transactionsToSkip));
-          elfis.skipTransactions(transactionsToSkip);
+        }
+        if (elfis.skipUntil(fromTxId) == false) {
+          throw new IOException("failed to advance input stream to txid " +
+              fromTxId);
         }
         return elfis;
       }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Mon Apr  9 23:25:17 2012
@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
 
@@ -299,8 +298,7 @@ public class NNStorage extends Storage i
                           NameNodeDirType.IMAGE;
       // Add to the list of storage directories, only if the
       // URI is of type file://
-      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
-          == 0){
+      if(dirName.getScheme().compareTo("file") == 0) {
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
             dirType,
             !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
@@ -312,8 +310,7 @@ public class NNStorage extends Storage i
       checkSchemeConsistency(dirName);
       // Add to the list of storage directories, only if the
       // URI is of type file://
-      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
-          == 0)
+      if(dirName.getScheme().compareTo("file") == 0)
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
                     NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
     }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Apr  9 23:25:17 2012
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Trash;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -160,7 +162,8 @@ public class NameNode {
     DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
     DFS_NAMENODE_BACKUP_ADDRESS_KEY,
     DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
-    DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY
+    DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
+    DFS_HA_FENCE_METHODS_KEY
   };
   
   public long getProtocolVersion(String protocol, 
@@ -511,6 +514,8 @@ public class NameNode {
    * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
    * upgrade and create a snapshot of the current file system state</li> 
+   * <li>{@link StartupOption#RECOVERY RECOVERY} - recover name node
+   * metadata</li>
    * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the  
    *            cluster back to the previous state</li>
    * <li>{@link StartupOption#FINALIZE FINALIZE} - finalize 
@@ -829,7 +834,10 @@ public class NameNode {
       StartupOption.FINALIZE.getName() + "] | [" +
       StartupOption.IMPORT.getName() + "] | [" +
       StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
-      StartupOption.INITIALIZESHAREDEDITS.getName() + "]");
+      StartupOption.INITIALIZESHAREDEDITS.getName() + "] | [" +
+      StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" + 
+      StartupOption.RECOVER.getName() + " [ " +
+        StartupOption.FORCE.getName() + " ] ]");
   }
 
   private static StartupOption parseArguments(String args[]) {
@@ -873,6 +881,21 @@ public class NameNode {
       } else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.INITIALIZESHAREDEDITS;
         return startOpt;
+      } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
+        if (startOpt != StartupOption.REGULAR) {
+          throw new RuntimeException("Can't combine -recover with " +
+              "other startup options.");
+        }
+        startOpt = StartupOption.RECOVER;
+        while (++i < argsLen) {
+          if (args[i].equalsIgnoreCase(
+                StartupOption.FORCE.getName())) {
+            startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
+          } else {
+            throw new RuntimeException("Error parsing recovery options: " + 
+              "can't understand option \"" + args[i] + "\"");
+          }
+        }
       } else {
         return null;
       }
@@ -889,6 +912,39 @@ public class NameNode {
                                           StartupOption.REGULAR.toString()));
   }
 
+  private static void doRecovery(StartupOption startOpt, Configuration conf)
+      throws IOException {
+    if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
+      if (!confirmPrompt("You have selected Metadata Recovery mode.  " +
+          "This mode is intended to recover lost metadata on a corrupt " +
+          "filesystem.  Metadata recovery mode often permanently deletes " +
+          "data from your HDFS filesystem.  Please back up your edit log " +
+          "and fsimage before trying this!\n\n" +
+          "Are you ready to proceed? (Y/N)\n")) {
+        System.err.println("Recovery aborted at user request.\n");
+        return;
+      }
+    }
+    MetaRecoveryContext.LOG.info("starting recovery...");
+    UserGroupInformation.setConfiguration(conf);
+    NameNode.initMetrics(conf, startOpt.toNodeRole());
+    FSNamesystem fsn = null;
+    try {
+      fsn = FSNamesystem.loadFromDisk(conf);
+      fsn.saveNamespace();
+      MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
+    } catch (IOException e) {
+      MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
+      throw e;
+    } catch (RuntimeException e) {
+      MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
+      throw e;
+    } finally {
+      if (fsn != null)
+        fsn.close();
+    }
+  }
+
   /**
    * Print out a prompt to the user, and return true if the user
    * responds with "Y" or "yes".
@@ -970,6 +1026,10 @@ public class NameNode {
         DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
         return new BackupNode(conf, role);
       }
+      case RECOVER: {
+        NameNode.doRecovery(startOpt, conf);
+        return null;
+      }
       default:
         DefaultMetricsSystem.initialize("NameNode");
         return new NameNode(conf);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java Mon Apr  9 23:25:17 2012
@@ -219,7 +219,7 @@ public class EditLogTailer {
       // disk are ignored.
       long editsLoaded = 0;
       try {
-        editsLoaded = image.loadEdits(streams, namesystem);
+        editsLoaded = image.loadEdits(streams, namesystem, null);
       } catch (EditLogInputException elie) {
         editsLoaded = elie.getNumEditsLoaded();
         throw elie;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java Mon Apr  9 23:25:17 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
 
 /**
@@ -53,12 +52,15 @@ public interface JournalProtocol {
    * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
    * changes with the backup namespace image.
    * 
-   * @param registration active node registration
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
    * @param firstTxnId the first transaction of this batch
    * @param numTxns number of transactions
    * @param records byte array containing serialized journal records
+   * @throws FencedException if the resource has been fenced
    */
-  public void journal(NamenodeRegistration registration,
+  public void journal(JournalInfo journalInfo,
+                      long epoch,
                       long firstTxnId,
                       int numTxns,
                       byte[] records) throws IOException;
@@ -66,9 +68,24 @@ public interface JournalProtocol {
   /**
    * Notify the BackupNode that the NameNode has rolled its edit logs
    * and is now writing a new log segment.
-   * @param registration the registration of the active NameNode
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
    * @param txid the first txid in the new log
+   * @throws FencedException if the resource has been fenced
    */
-  public void startLogSegment(NamenodeRegistration registration,
+  public void startLogSegment(JournalInfo journalInfo, long epoch,
       long txid) throws IOException;
+  
+  /**
+   * Request to fence any other journal writers.
+   * Older writers with at previous epoch will be fenced and can no longer
+   * perform journal operations.
+   * 
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
+   * @param fencerInfo info about fencer for debugging purposes
+   * @throws FencedException if the resource has been fenced
+   */
+  public FenceResponse fence(JournalInfo journalInfo, long epoch,
+      String fencerInfo) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java Mon Apr  9 23:25:17 2012
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -70,7 +72,8 @@ public class GetConf extends Configured 
     EXCLUDE_FILE("-excludeFile",
         "gets the exclude file path that defines the datanodes " +
         "that need to decommissioned."),
-    NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses");
+    NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"),
+    CONFKEY("-confKey [key]", "gets a specific key from the configuration");
 
     private static Map<String, CommandHandler> map;
     static  {
@@ -87,6 +90,8 @@ public class GetConf extends Configured 
           new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE"));
       map.put(NNRPCADDRESSES.getName().toLowerCase(),
           new NNRpcAddressesCommandHandler());
+      map.put(CONFKEY.getName().toLowerCase(),
+          new PrintConfKeyCommandHandler());
     }
     
     private final String cmd;
@@ -98,6 +103,10 @@ public class GetConf extends Configured 
     }
 
     public String getName() {
+      return cmd.split(" ")[0];
+    }
+    
+    public String getUsage() {
       return cmd;
     }
     
@@ -105,8 +114,8 @@ public class GetConf extends Configured 
       return description;
     }
     
-    public static CommandHandler getHandler(String name) {
-      return map.get(name.toLowerCase());
+    public static CommandHandler getHandler(String cmd) {
+      return map.get(cmd.toLowerCase());
     }
   }
   
@@ -118,7 +127,7 @@ public class GetConf extends Configured 
     StringBuilder usage = new StringBuilder(DESCRIPTION);
     usage.append("\nhadoop getconf \n");
     for (Command cmd : Command.values()) {
-      usage.append("\t[" + cmd.getName() + "]\t\t\t" + cmd.getDescription()
+      usage.append("\t[" + cmd.getUsage() + "]\t\t\t" + cmd.getDescription()
           + "\n");
     }
     USAGE = usage.toString();
@@ -128,7 +137,7 @@ public class GetConf extends Configured 
    * Handler to return value for key corresponding to the {@link Command}
    */
   static class CommandHandler {
-    final String key; // Configuration key to lookup
+    String key; // Configuration key to lookup
     
     CommandHandler() {
       this(null);
@@ -138,18 +147,30 @@ public class GetConf extends Configured 
       this.key = key;
     }
 
-    final int doWork(GetConf tool) {
+    final int doWork(GetConf tool, String[] args) {
       try {
-        return doWorkInternal(tool);
+        checkArgs(args);
+
+        return doWorkInternal(tool, args);
       } catch (Exception e) {
         tool.printError(e.getMessage());
       }
       return -1;
     }
+
+    protected void checkArgs(String args[]) {
+      if (args.length > 0) {
+        throw new HadoopIllegalArgumentException(
+            "Did not expect argument: " + args[0]);
+      }
+    }
+
     
-    /** Method to be overridden by sub classes for specific behavior */
-    int doWorkInternal(GetConf tool) throws Exception {
-      String value = tool.getConf().get(key);
+    /** Method to be overridden by sub classes for specific behavior 
+     * @param args */
+    int doWorkInternal(GetConf tool, String[] args) throws Exception {
+
+      String value = tool.getConf().getTrimmed(key);
       if (value != null) {
         tool.printOut(value);
         return 0;
@@ -164,7 +185,7 @@ public class GetConf extends Configured 
    */
   static class NameNodesCommandHandler extends CommandHandler {
     @Override
-    int doWorkInternal(GetConf tool) throws IOException {
+    int doWorkInternal(GetConf tool, String []args) throws IOException {
       tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf()));
       return 0;
     }
@@ -175,7 +196,7 @@ public class GetConf extends Configured 
    */
   static class BackupNodesCommandHandler extends CommandHandler {
     @Override
-    public int doWorkInternal(GetConf tool) throws IOException {
+    public int doWorkInternal(GetConf tool, String []args) throws IOException {
       tool.printMap(DFSUtil.getBackupNodeAddresses(tool.getConf()));
       return 0;
     }
@@ -186,7 +207,7 @@ public class GetConf extends Configured 
    */
   static class SecondaryNameNodesCommandHandler extends CommandHandler {
     @Override
-    public int doWorkInternal(GetConf tool) throws IOException {
+    public int doWorkInternal(GetConf tool, String []args) throws IOException {
       tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf()));
       return 0;
     }
@@ -199,7 +220,7 @@ public class GetConf extends Configured 
    */
   static class NNRpcAddressesCommandHandler extends CommandHandler {
     @Override
-    public int doWorkInternal(GetConf tool) throws IOException {
+    public int doWorkInternal(GetConf tool, String []args) throws IOException {
       Configuration config = tool.getConf();
       List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap(
           DFSUtil.getNNServiceRpcAddresses(config));
@@ -215,6 +236,23 @@ public class GetConf extends Configured 
     }
   }
   
+  static class PrintConfKeyCommandHandler extends CommandHandler {
+    @Override
+    protected void checkArgs(String[] args) {
+      if (args.length != 1) {
+        throw new HadoopIllegalArgumentException(
+            "usage: " + Command.CONFKEY.getUsage());
+      }
+    }
+
+    @Override
+    int doWorkInternal(GetConf tool, String[] args) throws Exception {
+      this.key = args[0];
+      System.err.println("key: " + key);
+      return super.doWorkInternal(tool, args);
+    }
+  }
+  
   private final PrintStream out; // Stream for printing command output
   private final PrintStream err; // Stream for printing error
 
@@ -260,10 +298,11 @@ public class GetConf extends Configured 
    * @return return status of the command
    */
   private int doWork(String[] args) {
-    if (args.length == 1) {
+    if (args.length >= 1) {
       CommandHandler handler = Command.getHandler(args[0]);
       if (handler != null) {
-        return handler.doWork(this);
+        return handler.doWork(this,
+            Arrays.copyOfRange(args, 1, args.length));
       }
     }
     printUsage();

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java Mon Apr  9 23:25:17 2012
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.ha.BadFencingConfigurationException;
 import org.apache.hadoop.ha.HAServiceTarget;
 import org.apache.hadoop.ha.NodeFencer;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -75,7 +76,8 @@ public class NNHAServiceTarget extends H
     this.addr = NetUtils.createSocketAddr(serviceAddr,
         NameNode.DEFAULT_PORT);
     try {
-      this.fencer = NodeFencer.create(targetConf);
+      this.fencer = NodeFencer.create(targetConf,
+          DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY);
     } catch (BadFencingConfigurationException e) {
       this.fenceConfigError = e;
     }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java Mon Apr  9 23:25:17 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
 
 import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
 import org.xml.sax.Attributes;
@@ -54,6 +55,7 @@ class OfflineEditsXmlLoader 
   private FSEditLogOpCodes opCode;
   private StringBuffer cbuf;
   private long nextTxId;
+  private final OpInstanceCache opCache = new OpInstanceCache();
   
   static enum ParseState {
     EXPECT_EDITS_TAG,
@@ -207,7 +209,7 @@ class OfflineEditsXmlLoader 
           throw new InvalidXmlException("expected </DATA>");
         }
         state = ParseState.EXPECT_RECORD;
-        FSEditLogOp op = FSEditLogOp.getOpInstance(opCode);
+        FSEditLogOp op = opCache.get(opCode);
         opCode = null;
         try {
           op.decodeXml(stanza);

Propchange: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1310141-1311517

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c Mon Apr  9 23:25:17 2012
@@ -123,6 +123,11 @@ static int errnoFromException(jthrowable
       goto done;
     }
 
+    if (!strcmp(excClass, "java.lang.UnsupportedOperationException")) {
+      errnum = ENOTSUP;
+      goto done;
+    }
+
     if (!strcmp(excClass, "org.apache.hadoop.security."
                 "AccessControlException")) {
         errnum = EACCES;
@@ -614,8 +619,29 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const c
     } else {
         file->file = (*env)->NewGlobalRef(env, jVal.l);
         file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT);
+        file->flags = 0;
 
         destroyLocalReference(env, jVal.l);
+
+        if ((flags & O_WRONLY) == 0) {
+          // Try a test read to see if we can do direct reads
+          errno = 0;
+          char buf;
+          if (readDirect(fs, file, &buf, 0) == 0) {
+            // Success - 0-byte read should return 0
+            file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
+          } else {
+            if (errno != ENOTSUP) {
+              // Unexpected error. Clear it, don't set the direct flag.
+              fprintf(stderr,
+                      "WARN: Unexpected error %d when testing "
+                      "for direct read compatibility\n", errno);
+              errno = 0;
+              goto done;
+            }
+          }
+          errno = 0;
+        }
     }
 
     done:
@@ -706,10 +732,57 @@ int hdfsExists(hdfsFS fs, const char *pa
     return jVal.z ? 0 : -1;
 }
 
+// Checks input file for readiness for reading.
+static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
+                       jobject* jInputStream)
+{
+    *jInputStream = (jobject)(f ? f->file : NULL);
 
+    //Sanity check
+    if (!f || f->type == UNINITIALIZED) {
+      errno = EBADF;
+      return -1;
+    }
+
+    //Error checking... make sure that this file is 'readable'
+    if (f->type != INPUT) {
+      fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+      errno = EINVAL;
+      return -1;
+    }
+
+    return 0;
+}
+
+// Common error-handling code between read paths.
+static int handleReadResult(int success, jvalue jVal, jthrowable jExc,
+                            JNIEnv* env)
+{
+  int noReadBytes;
+  if (success != 0) {
+    if ((*env)->ExceptionCheck(env)) {
+      errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+                                 "FSDataInputStream::read");
+    }
+    noReadBytes = -1;
+  } else {
+    noReadBytes = jVal.i;
+    if (noReadBytes < 0) {
+      // -1 from Java is EOF, which is 0 here
+      noReadBytes = 0;
+    }
+    errno = 0;
+  }
+
+  return noReadBytes;
+}
 
 tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
 {
+    if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
+      return readDirect(fs, f, buffer, length);
+    }
+
     // JAVA EQUIVALENT:
     //  byte [] bR = new byte[length];
     //  fis.read(bR);
@@ -722,49 +795,75 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, vo
     }
 
     //Parameters
-    jobject jInputStream = (jobject)(f ? f->file : NULL);
+    jobject jInputStream;
+    if (readPrepare(env, fs, f, &jInputStream) == -1) {
+      return -1;
+    }
 
     jbyteArray jbRarray;
     jint noReadBytes = 0;
     jvalue jVal;
     jthrowable jExc = NULL;
 
-    //Sanity check
-    if (!f || f->type == UNINITIALIZED) {
-        errno = EBADF;
-        return -1;
+    //Read the requisite bytes
+    jbRarray = (*env)->NewByteArray(env, length);
+
+    int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
+                               "read", "([B)I", jbRarray);
+
+    noReadBytes = handleReadResult(success, jVal, jExc, env);;
+
+    if (noReadBytes > 0) {
+      (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
     }
 
-    //Error checking... make sure that this file is 'readable'
-    if (f->type != INPUT) {
-        fprintf(stderr, "Cannot read from a non-InputStream object!\n");
-        errno = EINVAL;
-        return -1;
+    destroyLocalReference(env, jbRarray);
+
+    return noReadBytes;
+}
+
+// Reads using the read(ByteBuffer) API, which does fewer copies
+tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
+{
+    // JAVA EQUIVALENT:
+    //  ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
+    //  fis.read(bbuffer);
+
+    //Get the JNIEnv* corresponding to current thread
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
     }
 
-    //Read the requisite bytes
-    jbRarray = (*env)->NewByteArray(env, length);
-    if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
-                     "read", "([B)I", jbRarray) != 0) {
-        errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
-                                   "FSDataInputStream::read");
-        noReadBytes = -1;
+    jobject jInputStream;
+    if (readPrepare(env, fs, f, &jInputStream) == -1) {
+      return -1;
     }
-    else {
-        noReadBytes = jVal.i;
-        if (noReadBytes > 0) {
-            (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
-        }  else {
-            //This is a valid case: there aren't any bytes left to read!
-          if (noReadBytes == 0 || noReadBytes < -1) {
-            fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes);
-          }
-            noReadBytes = 0;
-        }
-        errno = 0;
+
+    jint noReadBytes = 0;
+    jvalue jVal;
+    jthrowable jExc = NULL;
+
+    //Read the requisite bytes
+    jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length);
+    if (bb == NULL) {
+      fprintf(stderr, "Could not allocate ByteBuffer");
+      if ((*env)->ExceptionCheck(env)) {
+        errno = errnoFromException(NULL, env, "JNIEnv::NewDirectByteBuffer");
+      } else {
+        errno = ENOMEM; // Best guess if there's no exception waiting
+      }
+      return -1;
     }
 
-    destroyLocalReference(env, jbRarray);
+    int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream,
+                               HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I",
+                               bb);
+
+    noReadBytes = handleReadResult(success, jVal, jExc, env);
+
+    destroyLocalReference(env, bb);
 
     return noReadBytes;
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h Mon Apr  9 23:25:17 2012
@@ -81,12 +81,16 @@ extern  "C" {
     };
 
     
+    // Bit fields for hdfsFile_internal flags
+    #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
+
     /**
      * The 'file-handle' to a file in hdfs.
      */
     struct hdfsFile_internal {
         void* file;
         enum hdfsStreamType type;
+        uint32_t flags;
     };
     typedef struct hdfsFile_internal* hdfsFile;
       
@@ -203,7 +207,6 @@ extern  "C" {
      */
     tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
 
-
     /** 
      * hdfsPread - Positional read of data from an open file.
      * @param fs The configured filesystem handle.

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c Mon Apr  9 23:25:17 2012
@@ -18,6 +18,8 @@
 
 #include "hdfs.h" 
 
+tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
+
 void permission_disp(short permissions, char *rtr) {
   rtr[9] = '\0';
   int i;
@@ -51,7 +53,6 @@ void permission_disp(short permissions, 
 } 
 
 int main(int argc, char **argv) {
-
     hdfsFS fs = hdfsConnectNewInstance("default", 0);
     if(!fs) {
         fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
@@ -64,20 +65,25 @@ int main(int argc, char **argv) {
         exit(-1);
     } 
 
-        const char* writePath = "/tmp/testfile.txt";
+    const char* writePath = "/tmp/testfile.txt";
+    const char* fileContents = "Hello, World!";
+
     {
         //Write tests
         
-        
         hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
         if(!writeFile) {
             fprintf(stderr, "Failed to open %s for writing!\n", writePath);
             exit(-1);
         }
         fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
-
-        char* buffer = "Hello, World!";
-        tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
+        tSize num_written_bytes =
+          hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents)+1);
+        if (num_written_bytes != strlen(fileContents) + 1) {
+          fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
+                  (int)(strlen(fileContents) + 1), (int)num_written_bytes);
+            exit(-1);
+        }
         fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
 
         tOffset currentPos = -1;
@@ -138,18 +144,86 @@ int main(int argc, char **argv) {
         }
         fprintf(stderr, "Current position: %ld\n", currentPos);
 
+        if ((readFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) == 0) {
+          fprintf(stderr, "Direct read support incorrectly not detected "
+                  "for HDFS filesystem\n");
+          exit(-1);
+        }
+
+        fprintf(stderr, "Direct read support detected for HDFS\n");
+
+        // Clear flags so that we really go through slow read path
+        readFile->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
+
         static char buffer[32];
         tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, 
                 sizeof(buffer));
         fprintf(stderr, "Read following %d bytes:\n%s\n", 
                 num_read_bytes, buffer);
 
+        memset(buffer, 0, strlen(fileContents + 1));
+
         num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, 
                 sizeof(buffer));
         fprintf(stderr, "Read following %d bytes:\n%s\n", 
                 num_read_bytes, buffer);
 
+        if (hdfsSeek(fs, readFile, 0L)) {
+            fprintf(stderr,
+                    "Failed to seek to file start for direct read test!\n");
+            exit(-1);
+        }
+
+        readFile->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
+
+        memset(buffer, 0, strlen(fileContents + 1));
+        num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
+                sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n",
+                    fileContents, buffer, num_read_bytes);
+            exit(-1);
+        }
+        fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
+                num_read_bytes, buffer);
         hdfsCloseFile(fs, readFile);
+
+        // Test correct behaviour for unsupported filesystems
+        hdfsFile localFile = hdfsOpenFile(lfs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
+        if(!localFile) {
+            fprintf(stderr, "Failed to open %s for writing!\n", writePath);
+            exit(-1);
+        }
+
+        tSize num_written_bytes = hdfsWrite(lfs, localFile,
+                                            (void*)fileContents,
+                                            strlen(fileContents) + 1);
+
+        hdfsCloseFile(lfs, localFile);
+        localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
+
+        if (localFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
+          fprintf(stderr, "Direct read support incorrectly detected for local "
+                  "filesystem\n");
+          exit(-1);
+        }
+
+        memset(buffer, 0, strlen(fileContents + 1));
+        int result = readDirect(lfs, localFile, (void*)buffer, sizeof(buffer));
+        if (result != -1) {
+          fprintf(stderr, "Expected error from local direct read not seen!\n");
+          exit(-1);
+        }
+
+        if (errno != ENOTSUP) {
+          fprintf(stderr, "Error code not correctly set to ENOTSUP, was %d!\n",
+                  errno);
+          exit(-1);
+        }
+
+        fprintf(stderr, "Expected exception thrown for unsupported direct read\n");
+
+        hdfsCloseFile(lfs, localFile);
     }
 
     int totalResult = 0;
@@ -446,4 +520,3 @@ int main(int argc, char **argv) {
 /**
  * vim: ts=4: sw=4: et:
  */
-

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh Mon Apr  9 23:25:17 2012
@@ -17,126 +17,64 @@
 #
 
 #
-# Note: This script depends on 8 environment variables to function correctly:
-# a) CLASSPATH
-# b) HADOOP_PREFIX
-# c) HADOOP_CONF_DIR 
-# d) HADOOP_LOG_DIR 
-# e) LIBHDFS_BUILD_DIR
-# f) LIBHDFS_INSTALL_DIR
-# g) OS_NAME
-# h) CLOVER_JAR
-# i} HADOOP_VERSION
-# j) HADOOP_HDFS_HOME
-# All these are passed by build.xml.
+# Note: This script depends on 5 environment variables to function correctly:
+# a) HADOOP_HOME - must be set
+# b) HDFS_TEST_CONF_DIR - optional; the directory to read and write
+# core-site.xml to. Defaults to /tmp
+# c) LIBHDFS_BUILD_DIR - optional; the location of the hdfs_test
+# executable. Defaults to the parent directory.
+# d) OS_NAME - used to choose how to locate libjvm.so
+# e) CLOVER_JAR - optional; the location of the Clover code coverage tool's jar.
 #
 
-HDFS_TEST=hdfs_test
-HADOOP_LIB_DIR=$HADOOP_PREFIX/lib
-HADOOP_BIN_DIR=$HADOOP_PREFIX/bin
-
-COMMON_BUILD_DIR=$HADOOP_PREFIX/build/ivy/lib/hadoop-hdfs/common
-COMMON_JAR=$COMMON_BUILD_DIR/hadoop-common-$HADOOP_VERSION.jar
+if [ "x$HADOOP_HOME" == "x" ]; then
+  echo "HADOOP_HOME is unset!"
+  exit 1
+fi
 
-cat > $HADOOP_CONF_DIR/core-site.xml <<EOF
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<configuration>
-<property>
-  <name>hadoop.tmp.dir</name>
-  <value>file:///$LIBHDFS_TEST_DIR</value>
-</property>
-<property>
-  <name>fs.default.name</name>
-  <value>hdfs://localhost:23000/</value>
-</property>
-</configuration>
-EOF
-
-cat > $HADOOP_CONF_DIR/hdfs-site.xml <<EOF
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<configuration>
-<property>
-  <name>dfs.replication</name>
-  <value>1</value>
-</property>
-<property>
-  <name>dfs.support.append</name>
-  <value>true</value>
-</property>
-<property>
-  <name>dfs.namenode.logging.level</name>
-  <value>DEBUG</value>
-</property>
-</configuration>
-EOF
-
-cat > $HADOOP_CONF_DIR/slaves <<EOF
-localhost
-EOF
-
-# If we are running from the hdfs repo we need to make sure
-# HADOOP_BIN_DIR contains the common scripts.  
-# If the bin directory does not and we've got a common jar extract its
-# bin directory to HADOOP_PREFIX/bin. The bin scripts hdfs-config.sh and
-# hadoop-config.sh assume the bin directory is named "bin" and that it
-# is located in HADOOP_PREFIX.
-unpacked_common_bin_dir=0
-if [ ! -f $HADOOP_BIN_DIR/hadoop-config.sh ]; then
-  if [ -f $COMMON_JAR ]; then
-    jar xf $COMMON_JAR bin.tgz
-    tar xfz bin.tgz -C $HADOOP_BIN_DIR
-    unpacked_common_bin_dir=1
-  fi
+if [ "x$LIBHDFS_BUILD_DIR" == "x" ]; then
+  LIBHDFS_BUILD_DIR=`pwd`/../
 fi
 
-# Manipulate HADOOP_CONF_DIR too
-# which is necessary to circumvent bin/hadoop
-HADOOP_CONF_DIR=$HADOOP_CONF_DIR:$HADOOP_PREFIX/conf
+if [ "x$HDFS_TEST_CONF_DIR" == "x" ]; then
+  HDFS_TEST_CONF_DIR=/tmp
+fi
 
-# set pid file dir so they are not written to /tmp
-export HADOOP_PID_DIR=$HADOOP_LOG_DIR
+# LIBHDFS_INSTALL_DIR is the directory containing libhdfs.so
+LIBHDFS_INSTALL_DIR=$HADOOP_HOME/lib/native/
+HDFS_TEST=hdfs_test
 
-# CLASSPATH initially contains $HADOOP_CONF_DIR
-CLASSPATH="${HADOOP_CONF_DIR}"
-CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
+HDFS_TEST_JAR=`find $HADOOP_HOME/share/hadoop/hdfs/ \
+-name "hadoop-hdfs-*-tests.jar" | head -n 1`
 
-# for developers, add Hadoop classes to CLASSPATH
-if [ -d "$HADOOP_PREFIX/build/classes" ]; then
-  CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/classes
-fi
-if [ -d "$HADOOP_PREFIX/build/web/webapps" ]; then
-  CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/web
-fi
-if [ -d "$HADOOP_PREFIX/build/test/classes" ]; then
-  CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/test/classes
+if [ "x$HDFS_TEST_JAR" == "x" ]; then
+  echo "HDFS test jar not found! Tried looking in all subdirectories \
+of $HADOOP_HOME/share/hadoop/hdfs/"
+  exit 1
 fi
 
+echo "Found HDFS test jar at $HDFS_TEST_JAR"
+
+# CLASSPATH initially contains $HDFS_TEST_CONF_DIR
+CLASSPATH="${HDFS_TEST_CONF_DIR}"
+CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
+
 # add Clover jar file needed for code coverage runs
 CLASSPATH=${CLASSPATH}:${CLOVER_JAR};
 
 # so that filenames w/ spaces are handled correctly in loops below
-IFS=
+IFS=$'\n'
 
-# add libs to CLASSPATH
-for f in $HADOOP_PREFIX/lib/*.jar; do
-  CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_PREFIX/*.jar; do 
-  CLASSPATH=${CLASSPATH}:$f
-done
-for f in $HADOOP_PREFIX/lib/jsp-2.1/*.jar; do
-  CLASSPATH=${CLASSPATH}:$f;
-done
-
-if [ -d "$COMMON_BUILD_DIR" ]; then
-  CLASSPATH=$CLASSPATH:$COMMON_JAR
-  for f in $COMMON_BUILD_DIR/*.jar; do
-    CLASSPATH=${CLASSPATH}:$f;
-  done
-fi
+JAR_DIRS="$HADOOP_HOME/share/hadoop/common/lib/
+$HADOOP_HOME/share/hadoop/common/
+$HADOOP_HOME/share/hadoop/hdfs
+$HADOOP_HOME/share/hadoop/hdfs/lib/"
+
+for d in $JAR_DIRS; do 
+  for j in $d/*.jar; do
+    CLASSPATH=${CLASSPATH}:$j
+  done;
+done;
 
 # restore ordinary behaviour
 unset IFS
@@ -178,21 +116,37 @@ echo  LIB_JVM_DIR = $LIB_JVM_DIR
 echo  "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
 # Put delays to ensure hdfs is up and running and also shuts down 
 # after the tests are complete
-cd $HADOOP_PREFIX
-echo Y | $HADOOP_BIN_DIR/hdfs namenode -format &&
-$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs start namenode && sleep 2
-$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs start datanode && sleep 2
-echo "Wait 30s for the datanode to start up..."
-sleep 30
-CLASSPATH=$CLASSPATH LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" $LIBHDFS_BUILD_DIR/$HDFS_TEST
-BUILD_STATUS=$?
-sleep 3
-$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs stop datanode && sleep 2
-$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs stop namenode && sleep 2 
+rm $HDFS_TEST_CONF_DIR/core-site.xml
 
-if [ $unpacked_common_bin_dir -eq 1 ]; then
-  rm -rf bin.tgz
+$HADOOP_HOME/bin/hadoop jar $HDFS_TEST_JAR \
+    org.apache.hadoop.test.MiniDFSClusterManager \
+    -format -nnport 20300 -writeConfig $HDFS_TEST_CONF_DIR/core-site.xml \
+    > /tmp/libhdfs-test-cluster.out 2>&1 & 
+
+MINI_CLUSTER_PID=$!
+for i in {1..15}; do
+  echo "Waiting for DFS cluster, attempt $i of 15"
+  [ -f $HDFS_TEST_CONF_DIR/core-site.xml ] && break;
+  sleep 2
+done
+
+if [ ! -f $HDFS_TEST_CONF_DIR/core-site.xml ]; then
+  echo "Cluster did not come up in 30s"
+  kill -9 $MINI_CLUSTER_PID
+  exit 1
 fi
 
-echo exiting with $BUILD_STATUS
+echo "Cluster up, running tests"
+# Disable error checking to make sure we get to cluster cleanup
+set +e
+
+CLASSPATH=$CLASSPATH \
+LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" \
+$LIBHDFS_BUILD_DIR/$HDFS_TEST
+
+BUILD_STATUS=$?
+
+echo "Tearing cluster down"
+kill -9 $MINI_CLUSTER_PID
+echo "Exiting with $BUILD_STATUS"
 exit $BUILD_STATUS

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto?rev=1311518&r1=1311517&r2=1311518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto Mon Apr  9 23:25:17 2012
@@ -38,8 +38,11 @@ message InitReplicaRecoveryRequestProto 
  * Repica recovery information
  */
 message InitReplicaRecoveryResponseProto {
-  required ReplicaStateProto state = 1; // State of the replica
-  required BlockProto block = 2;   // block information
+  required bool replicaFound = 1;
+
+  // The following entries are not set if there was no replica found.
+  optional ReplicaStateProto state = 2; // State of the replica
+  optional BlockProto block = 3;   // block information
 }
 
 /**



Mime
View raw message