hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1513258 [3/6] - in /hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/tomcat/ROOT/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/m...
Date Mon, 12 Aug 2013 21:26:09 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Aug 12 21:25:49 2013
@@ -55,6 +55,8 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -66,8 +68,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotException;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
@@ -381,12 +381,13 @@ public class FSDirectory implements Clos
   /**
    * Persist the block list for the inode.
    */
-  void persistBlocks(String path, INodeFileUnderConstruction file) {
+  void persistBlocks(String path, INodeFileUnderConstruction file,
+      boolean logRetryCache) {
     waitForReady();
 
     writeLock();
     try {
-      fsImage.getEditLog().logUpdateBlocks(path, file);
+      fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
             +path+" with "+ file.getBlocks().length 
@@ -418,23 +419,27 @@ public class FSDirectory implements Clos
 
   /**
    * Remove a block from the file.
+   * @return Whether the block exists in the corresponding file
    */
-  void removeBlock(String path, INodeFileUnderConstruction fileNode,
+  boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
                       Block block) throws IOException {
     waitForReady();
 
     writeLock();
     try {
-      unprotectedRemoveBlock(path, fileNode, block);
+      return unprotectedRemoveBlock(path, fileNode, block);
     } finally {
       writeUnlock();
     }
   }
   
-  void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode, 
-      Block block) throws IOException {
+  boolean unprotectedRemoveBlock(String path,
+      INodeFileUnderConstruction fileNode, Block block) throws IOException {
     // modify file-> block and blocksMap
-    fileNode.removeLastBlock(block);
+    boolean removed = fileNode.removeLastBlock(block);
+    if (!removed) {
+      return false;
+    }
     getBlockManager().removeBlockFromMap(block);
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -446,6 +451,7 @@ public class FSDirectory implements Clos
     // update space consumed
     final INodesInPath iip = rootDir.getINodesInPath4Write(path, true);
     updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
+    return true;
   }
 
   /**
@@ -454,7 +460,7 @@ public class FSDirectory implements Clos
    * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    */
   @Deprecated
-  boolean renameTo(String src, String dst) 
+  boolean renameTo(String src, String dst, boolean logRetryCache) 
       throws QuotaExceededException, UnresolvedLinkException, 
       FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -470,14 +476,15 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now);
+    fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
     return true;
   }
 
   /**
    * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
    */
-  void renameTo(String src, String dst, Options.Rename... options)
+  void renameTo(String src, String dst, boolean logRetryCache, 
+      Options.Rename... options)
       throws FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, QuotaExceededException,
       UnresolvedLinkException, IOException {
@@ -495,7 +502,7 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now, options);
+    fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
   }
 
   /**
@@ -1171,7 +1178,7 @@ public class FSDirectory implements Clos
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  void concat(String target, String [] srcs) 
+  void concat(String target, String [] srcs, boolean supportRetryCache) 
       throws UnresolvedLinkException, QuotaExceededException,
       SnapshotAccessControlException, SnapshotException {
     writeLock();
@@ -1181,7 +1188,8 @@ public class FSDirectory implements Clos
       long timestamp = now();
       unprotectedConcat(target, srcs, timestamp);
       // do the commit
-      fsImage.getEditLog().logConcat(target, srcs, timestamp);
+      fsImage.getEditLog().logConcat(target, srcs, timestamp, 
+          supportRetryCache);
     } finally {
       writeUnlock();
     }
@@ -1256,10 +1264,12 @@ public class FSDirectory implements Clos
    * @param src Path of a directory to delete
    * @param collectedBlocks Blocks under the deleted directory
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
+   * @param logRetryCache Whether to record RPC IDs in editlog to support retry
+   *                      cache rebuilding.
    * @return true on successful deletion; else false
    */
   boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes) throws IOException {
+      List<INode> removedINodes, boolean logRetryCache) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
@@ -1294,7 +1304,7 @@ public class FSDirectory implements Clos
     if (filesRemoved < 0) {
       return false;
     }
-    fsImage.getEditLog().logDelete(src, now);
+    fsImage.getEditLog().logDelete(src, now, logRetryCache);
     incrDeletedFileCount(filesRemoved);
     // Blocks/INodes will be handled later by the caller of this method
     getFSNamesystem().removePathAndBlocks(src, null, null);
@@ -2518,7 +2528,7 @@ public class FSDirectory implements Clos
   /**
    * Create FileStatus by file INode 
    */
-   private HdfsFileStatus createFileStatus(byte[] path, INode node,
+   HdfsFileStatus createFileStatus(byte[] path, INode node,
        Snapshot snapshot) {
      long size = 0;     // length is zero for directories
      short replication = 0;
@@ -2591,7 +2601,7 @@ public class FSDirectory implements Clos
    * Add the given symbolic link to the fs. Record it in the edits log.
    */
   INodeSymlink addSymlink(String path, String target,
-      PermissionStatus dirPerms, boolean createParent)
+      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
       throws UnresolvedLinkException, FileAlreadyExistsException,
       QuotaExceededException, SnapshotAccessControlException {
     waitForReady();
@@ -2617,7 +2627,8 @@ public class FSDirectory implements Clos
       NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
       return null;
     }
-    fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
+    fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
+        logRetryCache);
     
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Mon Aug 12 21:25:49 2013
@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.conf.Configuration;
 
@@ -662,11 +663,20 @@ public class FSEditLog implements LogsPu
     LOG.info(buf);
   }
 
+  /** Record the RPC IDs if necessary */
+  private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
+    if (toLogRpcIds) {
+      op.setRpcClientId(Server.getClientId());
+      op.setRpcCallId(Server.getCallId());
+    }
+  }
+  
   /** 
    * Add open lease record to edit log. 
    * Records the block locations of the last block.
    */
-  public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
+  public void logOpenFile(String path, INodeFileUnderConstruction newNode,
+      boolean toLogRpcIds) {
     AddOp op = AddOp.getInstance(cache.get())
       .setInodeId(newNode.getId())
       .setPath(path)
@@ -678,8 +688,8 @@ public class FSEditLog implements LogsPu
       .setPermissionStatus(newNode.getPermissionStatus())
       .setClientName(newNode.getClientName())
       .setClientMachine(newNode.getClientMachine());
-    
-      logEdit(op);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
   }
 
   /** 
@@ -698,10 +708,12 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
+  public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
+      boolean toLogRpcIds) {
     UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
       .setPath(path)
       .setBlocks(file.getBlocks());
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
@@ -721,23 +733,26 @@ public class FSEditLog implements LogsPu
    * Add rename record to edit log
    * TODO: use String parameters until just before writing to disk
    */
-  void logRename(String src, String dst, long timestamp) {
+  void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) {
     RenameOldOp op = RenameOldOp.getInstance(cache.get())
       .setSource(src)
       .setDestination(dst)
       .setTimestamp(timestamp);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
   /** 
    * Add rename record to edit log
    */
-  void logRename(String src, String dst, long timestamp, Options.Rename... options) {
+  void logRename(String src, String dst, long timestamp, boolean toLogRpcIds,
+      Options.Rename... options) {
     RenameOp op = RenameOp.getInstance(cache.get())
       .setSource(src)
       .setDestination(dst)
       .setTimestamp(timestamp)
       .setOptions(options);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
@@ -784,21 +799,23 @@ public class FSEditLog implements LogsPu
   /**
    * concat(trg,src..) log
    */
-  void logConcat(String trg, String [] srcs, long timestamp) {
+  void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) {
     ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
       .setTarget(trg)
       .setSources(srcs)
       .setTimestamp(timestamp);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
   /** 
    * Add delete file record to edit log
    */
-  void logDelete(String src, long timestamp) {
+  void logDelete(String src, long timestamp, boolean toLogRpcIds) {
     DeleteOp op = DeleteOp.getInstance(cache.get())
       .setPath(src)
       .setTimestamp(timestamp);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
 
@@ -843,8 +860,8 @@ public class FSEditLog implements LogsPu
   /** 
    * Add a create symlink record.
    */
-  void logSymlink(String path, String value, long mtime, 
-                  long atime, INodeSymlink node) {
+  void logSymlink(String path, String value, long mtime, long atime,
+      INodeSymlink node, boolean toLogRpcIds) {
     SymlinkOp op = SymlinkOp.getInstance(cache.get())
       .setId(node.getId())
       .setPath(path)
@@ -852,6 +869,7 @@ public class FSEditLog implements LogsPu
       .setModificationTime(mtime)
       .setAccessTime(atime)
       .setPermissionStatus(node.getPermissionStatus());
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
@@ -896,22 +914,26 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  void logCreateSnapshot(String snapRoot, String snapName) {
+  void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
     CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
         .setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
-  void logDeleteSnapshot(String snapRoot, String snapName) {
+  void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
     DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
         .setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
-  void logRenameSnapshot(String path, String snapOldName, String snapNewName) {
+  void logRenameSnapshot(String path, String snapOldName, String snapNewName,
+      boolean toLogRpcIds) {
     RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
         .setSnapshotRoot(path).setSnapshotOldName(snapOldName)
         .setSnapshotNewName(snapNewName);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Aug 12 21:25:49 2013
@@ -34,13 +34,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream.LogHeaderCorruptException;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@@ -57,6 +60,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
@@ -66,9 +71,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -76,7 +78,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.util.Holder;
-import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Joiner;
 
@@ -278,7 +279,8 @@ public class FSEditLogLoader {
     if (LOG.isTraceEnabled()) {
       LOG.trace("replaying edit log: " + op);
     }
-
+    final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
+    
     switch (op.opCode) {
     case OP_ADD: {
       AddCloseOp addCloseOp = (AddCloseOp)op;
@@ -301,8 +303,8 @@ public class FSEditLogLoader {
       if (oldFile == null) { // this is OP_ADD on a new file (case 1)
         // versions > 0 support per file replication
         // get name and replication
-        final short replication  = fsNamesys.getBlockManager(
-            ).adjustReplication(addCloseOp.replication);
+        final short replication = fsNamesys.getBlockManager()
+            .adjustReplication(addCloseOp.replication);
         assert addCloseOp.blocks.length == 0;
 
         // add to the file tree
@@ -314,6 +316,13 @@ public class FSEditLogLoader {
             addCloseOp.clientName, addCloseOp.clientMachine);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
 
+        // add the op into retry cache if necessary
+        if (toAddRetryCache) {
+          HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
+              HdfsFileStatus.EMPTY_NAME, newFile, null);
+          fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
+              addCloseOp.rpcCallId, stat);
+        }
       } else { // This is OP_ADD on an existing file
         if (!oldFile.isUnderConstruction()) {
           // This is case 3: a call to append() on an already-closed file.
@@ -321,11 +330,17 @@ public class FSEditLogLoader {
             FSNamesystem.LOG.debug("Reopening an already-closed file " +
                 "for append");
           }
-          fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
-              addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false, iip.getLatestSnapshot());
+          LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path,
+              oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
+              false, iip.getLatestSnapshot(), false);
           newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
               addCloseOp.path, true);
+          
+          // add the op into retry cache is necessary
+          if (toAddRetryCache) {
+            fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
+                addCloseOp.rpcCallId, lb);
+          }
         }
       }
       // Fall-through for case 2.
@@ -385,6 +400,10 @@ public class FSEditLogLoader {
           updateOp.path);
       // Update in-memory data structures
       updateBlocks(fsDir, updateOp, oldFile);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
+      }
       break;
     }
       
@@ -400,17 +419,30 @@ public class FSEditLogLoader {
       ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
       fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
           concatDeleteOp.timestamp);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
+            concatDeleteOp.rpcCallId);
+      }
       break;
     }
     case OP_RENAME_OLD: {
       RenameOldOp renameOp = (RenameOldOp)op;
       fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                 renameOp.timestamp);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
+      }
       break;
     }
     case OP_DELETE: {
       DeleteOp deleteOp = (DeleteOp)op;
       fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
+      }
       break;
     }
     case OP_MKDIR: {
@@ -475,12 +507,20 @@ public class FSEditLogLoader {
       fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path,
                                   symlinkOp.value, symlinkOp.mtime, 
                                   symlinkOp.atime, symlinkOp.permissionStatus);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
+      }
       break;
     }
     case OP_RENAME: {
       RenameOp renameOp = (RenameOp)op;
       fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                 renameOp.timestamp, renameOp.options);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
+      }
       break;
     }
     case OP_GET_DELEGATION_TOKEN: {
@@ -533,8 +573,12 @@ public class FSEditLogLoader {
     }
     case OP_CREATE_SNAPSHOT: {
       CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
-      fsNamesys.getSnapshotManager().createSnapshot(
+      String path = fsNamesys.getSnapshotManager().createSnapshot(
           createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
+            createSnapshotOp.rpcCallId, path);
+      }
       break;
     }
     case OP_DELETE_SNAPSHOT: {
@@ -548,6 +592,11 @@ public class FSEditLogLoader {
       collectedBlocks.clear();
       fsNamesys.dir.removeFromInodeMap(removedINodes);
       removedINodes.clear();
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
+            deleteSnapshotOp.rpcCallId);
+      }
       break;
     }
     case OP_RENAME_SNAPSHOT: {
@@ -555,6 +604,11 @@ public class FSEditLogLoader {
       fsNamesys.getSnapshotManager().renameSnapshot(
           renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName,
           renameSnapshotOp.snapshotNewName);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
+            renameSnapshotOp.rpcCallId);
+      }
       break;
     }
     case OP_ALLOW_SNAPSHOT: {
@@ -661,8 +715,12 @@ public class FSEditLogLoader {
         throw new IOException("Trying to remove more than one block from file "
             + path);
       }
-      fsDir.unprotectedRemoveBlock(path,
-          (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
+      Block oldBlock = oldBlocks[oldBlocks.length - 1];
+      boolean removed = fsDir.unprotectedRemoveBlock(path,
+          (INodeFileUnderConstruction) file, oldBlock);
+      if (!removed && !(op instanceof UpdateBlocksOp)) {
+        throw new IOException("Trying to delete non-existant block " + oldBlock);
+      }
     } else if (newBlocks.length > oldBlocks.length) {
       // We're adding blocks
       for (int i = oldBlocks.length; i < newBlocks.length; i++) {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Aug 12 21:25:49 2013
@@ -17,55 +17,88 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.util.PureJavaCrc32;
-
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.util.XMLUtils;
+import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
+import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.hdfs.util.XMLUtils;
-import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
-import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
 
 import com.google.common.base.Preconditions;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.EOFException;
-
 /**
  * Helper classes for reading the ops from an InputStream.
  * All ops derive from FSEditLogOp and are only
@@ -76,6 +109,8 @@ import java.io.EOFException;
 public abstract class FSEditLogOp {
   public final FSEditLogOpCodes opCode;
   long txid;
+  byte[] rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
+  int rpcCallId = RpcConstants.INVALID_CALL_ID;
 
   @SuppressWarnings("deprecation")
   final public static class OpInstanceCache {
@@ -150,6 +185,31 @@ public abstract class FSEditLogOp {
   public void setTransactionId(long txid) {
     this.txid = txid;
   }
+  
+  public boolean hasRpcIds() {
+    return rpcClientId != RpcConstants.DUMMY_CLIENT_ID
+        && rpcCallId != RpcConstants.INVALID_CALL_ID;
+  }
+  
+  /** this has to be called after calling {@link #hasRpcIds()} */
+  public byte[] getClientId() {
+    Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID);
+    return rpcClientId;
+  }
+  
+  public void setRpcClientId(byte[] clientId) {
+    this.rpcClientId = clientId;
+  }
+  
+  /** this has to be called after calling {@link #hasRpcIds()} */
+  public int getCallId() {
+    Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID);
+    return rpcCallId;
+  }
+  
+  public void setRpcCallId(int callId) {
+    this.rpcCallId = callId;
+  }
 
   abstract void readFields(DataInputStream in, int logVersion)
       throws IOException;
@@ -163,6 +223,46 @@ public abstract class FSEditLogOp {
     boolean shouldCompleteLastBlock();
   }
   
+  private static void writeRpcIds(final byte[] clientId, final int callId,
+      DataOutputStream out) throws IOException {
+    FSImageSerialization.writeBytes(clientId, out);
+    FSImageSerialization.writeInt(callId, out);
+  }
+  
+  void readRpcIds(DataInputStream in, int logVersion)
+      throws IOException {
+    if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE,
+        logVersion)) {
+      this.rpcClientId = FSImageSerialization.readBytes(in);
+      this.rpcCallId = FSImageSerialization.readInt(in);
+    }
+  }
+  
+  void readRpcIdsFromXml(Stanza st) {
+    this.rpcClientId = st.hasChildren("RPC_CLIENTID") ? 
+        ClientId.toBytes(st.getValue("RPC_CLIENTID"))
+        : RpcConstants.DUMMY_CLIENT_ID;
+    this.rpcCallId = st.hasChildren("RPC_CALLID") ? 
+        Integer.valueOf(st.getValue("RPC_CALLID"))
+        : RpcConstants.INVALID_CALL_ID;
+  }
+  
+  private static void appendRpcIdsToString(final StringBuilder builder,
+      final byte[] clientId, final int callId) {
+    builder.append(", RpcClientId=");
+    builder.append(ClientId.toString(clientId));
+    builder.append(", RpcCallId=");
+    builder.append(callId);
+  }
+  
+  private static void appendRpcIdsToXml(ContentHandler contentHandler,
+      final byte[] clientId, final int callId) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID",
+        ClientId.toString(clientId));
+    XMLUtils.addSaxString(contentHandler, "RPC_CALLID", 
+        Integer.valueOf(callId).toString());
+  }
+  
   @SuppressWarnings("unchecked")
   static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
     int length;
@@ -176,7 +276,7 @@ public abstract class FSEditLogOp {
     PermissionStatus permissions;
     String clientName;
     String clientMachine;
-
+    
     private AddCloseOp(FSEditLogOpCodes opCode) {
       super(opCode);
       assert(opCode == OP_ADD || opCode == OP_CLOSE);
@@ -247,8 +347,7 @@ public abstract class FSEditLogOp {
     }
 
     @Override
-    public 
-    void writeFields(DataOutputStream out) throws IOException {
+    public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeLong(inodeId, out);
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeShort(replication, out);
@@ -261,6 +360,8 @@ public abstract class FSEditLogOp {
       if (this.opCode == OP_ADD) {
         FSImageSerialization.writeString(clientName,out);
         FSImageSerialization.writeString(clientMachine,out);
+        // write clientId and callId
+        writeRpcIds(rpcClientId, rpcCallId, out);
       }
     }
 
@@ -317,6 +418,8 @@ public abstract class FSEditLogOp {
       if (this.opCode == OP_ADD) {
         this.clientName = FSImageSerialization.readString(in);
         this.clientMachine = FSImageSerialization.readString(in);
+        // read clientId and callId
+        readRpcIds(in, logVersion);
       } else {
         this.clientName = "";
         this.clientMachine = "";
@@ -368,6 +471,9 @@ public abstract class FSEditLogOp {
       builder.append(clientName);
       builder.append(", clientMachine=");
       builder.append(clientMachine);
+      if (this.opCode == OP_ADD) {
+        appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      }
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -397,9 +503,13 @@ public abstract class FSEditLogOp {
         FSEditLogOp.blockToXml(contentHandler, b);
       }
       FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
+      if (this.opCode == OP_ADD) {
+        appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+      }
     }
 
-    @Override void fromXml(Stanza st) throws InvalidXmlException {
+    @Override 
+    void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.inodeId = Long.valueOf(st.getValue("INODEID"));
       this.path = st.getValue("PATH");
@@ -420,9 +530,14 @@ public abstract class FSEditLogOp {
       }
       this.permissions =
           permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      readRpcIdsFromXml(st);
     }
   }
 
+  /**
+   * {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and
+   * {@link ClientProtocol#appendFile}
+   */
   static class AddOp extends AddCloseOp {
     private AddOp() {
       super(OP_ADD);
@@ -446,6 +561,11 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /**
+   * Although {@link ClientProtocol#appendFile} may also log a close op, we do
+   * not need to record the rpc ids here since a successful appendFile op will
+   * finally log an AddOp.
+   */
   static class CloseOp extends AddCloseOp {
     private CloseOp() {
       super(OP_CLOSE);
@@ -469,6 +589,10 @@ public abstract class FSEditLogOp {
     }
   }
   
+  /**
+   * {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but 
+   * {@literal @Idempotent} for some other ops.
+   */
   static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
     String path;
     Block[] blocks;
@@ -481,7 +605,6 @@ public abstract class FSEditLogOp {
       return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
     }
     
-    
     UpdateBlocksOp setPath(String path) {
       this.path = path;
       return this;
@@ -507,6 +630,8 @@ public abstract class FSEditLogOp {
     void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeCompactBlockArray(blocks, out);
+      // clientId and callId
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
     
     @Override
@@ -514,6 +639,7 @@ public abstract class FSEditLogOp {
       path = FSImageSerialization.readString(in);
       this.blocks = FSImageSerialization.readCompactBlockArray(
           in, logVersion);
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -527,8 +653,9 @@ public abstract class FSEditLogOp {
       sb.append("UpdateBlocksOp [path=")
         .append(path)
         .append(", blocks=")
-        .append(Arrays.toString(blocks))
-        .append("]");
+        .append(Arrays.toString(blocks));
+      appendRpcIdsToString(sb, rpcClientId, rpcCallId);
+      sb.append("]");
       return sb.toString();
     }
     
@@ -538,6 +665,7 @@ public abstract class FSEditLogOp {
       for (Block b : blocks) {
         FSEditLogOp.blockToXml(contentHandler, b);
       }
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -547,9 +675,11 @@ public abstract class FSEditLogOp {
       for (int i = 0; i < blocks.size(); i++) {
         this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
       }
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */
   static class SetReplicationOp extends FSEditLogOp {
     String path;
     short replication;
@@ -618,6 +748,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */
   static class ConcatDeleteOp extends FSEditLogOp {
     int length;
     String trg;
@@ -654,8 +785,7 @@ public abstract class FSEditLogOp {
     }
 
     @Override
-    public 
-    void writeFields(DataOutputStream out) throws IOException {
+    public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(trg, out);
             
       DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
@@ -666,6 +796,9 @@ public abstract class FSEditLogOp {
       new ArrayWritable(DeprecatedUTF8.class, info).write(out);
 
       FSImageSerialization.writeLong(timestamp, out);
+      
+      // rpc ids
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -704,6 +837,8 @@ public abstract class FSEditLogOp {
       } else {
         this.timestamp = readLong(in);
       }
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -717,6 +852,7 @@ public abstract class FSEditLogOp {
       builder.append(Arrays.toString(srcs));
       builder.append(", timestamp=");
       builder.append(timestamp);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -738,6 +874,7 @@ public abstract class FSEditLogOp {
             "SOURCE" + (i + 1), srcs[i]);
       }
       contentHandler.endElement("", "", "SOURCES");
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -755,9 +892,11 @@ public abstract class FSEditLogOp {
       for (i = 0; i < srcs.length; i++) {
         srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1));
       }
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */
   static class RenameOldOp extends FSEditLogOp {
     int length;
     String src;
@@ -793,6 +932,7 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(src, out);
       FSImageSerialization.writeString(dst, out);
       FSImageSerialization.writeLong(timestamp, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -812,6 +952,9 @@ public abstract class FSEditLogOp {
       } else {
         this.timestamp = readLong(in);
       }
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -825,6 +968,7 @@ public abstract class FSEditLogOp {
       builder.append(dst);
       builder.append(", timestamp=");
       builder.append(timestamp);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -841,16 +985,21 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "DST", dst);
       XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
           Long.valueOf(timestamp).toString());
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
-    @Override void fromXml(Stanza st) throws InvalidXmlException {
+    @Override 
+    void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.src = st.getValue("SRC");
       this.dst = st.getValue("DST");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
+      
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */
   static class DeleteOp extends FSEditLogOp {
     int length;
     String path;
@@ -879,6 +1028,7 @@ public abstract class FSEditLogOp {
     void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeLong(timestamp, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -896,6 +1046,8 @@ public abstract class FSEditLogOp {
       } else {
         this.timestamp = readLong(in);
       }
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -907,6 +1059,7 @@ public abstract class FSEditLogOp {
       builder.append(path);
       builder.append(", timestamp=");
       builder.append(timestamp);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -922,15 +1075,19 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "PATH", path);
       XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
           Long.valueOf(timestamp).toString());
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.path = st.getValue("PATH");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
+      
+      readRpcIdsFromXml(st);
     }
   }
-    
+
+  /** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */
   static class MkdirOp extends FSEditLogOp {
     int length;
     long inodeId;
@@ -1056,6 +1213,13 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /**
+   * The corresponding operations are either {@literal @Idempotent} (
+   * {@link ClientProtocol#updateBlockForPipeline},
+   * {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
+   * already bound with other editlog op which records rpc ids (
+   * {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
+   */
   static class SetGenstampV1Op extends FSEditLogOp {
     long genStampV1;
 
@@ -1108,6 +1272,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** Similar with {@link SetGenstampV1Op} */
   static class SetGenstampV2Op extends FSEditLogOp {
     long genStampV2;
 
@@ -1160,6 +1325,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */
   static class AllocateBlockIdOp extends FSEditLogOp {
     long blockId;
 
@@ -1212,6 +1378,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */
   static class SetPermissionsOp extends FSEditLogOp {
     String src;
     FsPermission permissions;
@@ -1277,6 +1444,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */
   static class SetOwnerOp extends FSEditLogOp {
     String src;
     String username;
@@ -1357,7 +1525,7 @@ public abstract class FSEditLogOp {
           st.getValue("GROUPNAME") : null;
     }
   }
-
+  
   static class SetNSQuotaOp extends FSEditLogOp {
     String src;
     long nsQuota;
@@ -1457,6 +1625,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
   static class SetQuotaOp extends FSEditLogOp {
     String src;
     long nsQuota;
@@ -1534,6 +1703,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
   static class TimesOp extends FSEditLogOp {
     int length;
     String path;
@@ -1629,6 +1799,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */
   static class SymlinkOp extends FSEditLogOp {
     int length;
     long inodeId;
@@ -1677,14 +1848,14 @@ public abstract class FSEditLogOp {
     }
 
     @Override
-    public 
-    void writeFields(DataOutputStream out) throws IOException {
+    public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeLong(inodeId, out);      
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeString(value, out);
       FSImageSerialization.writeLong(mtime, out);
       FSImageSerialization.writeLong(atime, out);
       permissionStatus.write(out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -1714,6 +1885,9 @@ public abstract class FSEditLogOp {
         this.atime = readLong(in);
       }
       this.permissionStatus = PermissionStatus.read(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -1733,6 +1907,7 @@ public abstract class FSEditLogOp {
       builder.append(atime);
       builder.append(", permissionStatus=");
       builder.append(permissionStatus);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -1754,9 +1929,11 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "ATIME",
           Long.valueOf(atime).toString());
       FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
-    @Override void fromXml(Stanza st) throws InvalidXmlException {
+    @Override 
+    void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.inodeId = Long.valueOf(st.getValue("INODEID"));
       this.path = st.getValue("PATH");
@@ -1765,9 +1942,12 @@ public abstract class FSEditLogOp {
       this.atime = Long.valueOf(st.getValue("ATIME"));
       this.permissionStatus =
           permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */
   static class RenameOp extends FSEditLogOp {
     int length;
     String src;
@@ -1810,6 +1990,7 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(dst, out);
       FSImageSerialization.writeLong(timestamp, out);
       toBytesWritable(options).write(out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -1830,6 +2011,9 @@ public abstract class FSEditLogOp {
         this.timestamp = readLong(in);
       }
       this.options = readRenameOptions(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
@@ -1866,6 +2050,7 @@ public abstract class FSEditLogOp {
       builder.append(timestamp);
       builder.append(", options=");
       builder.append(Arrays.toString(options));
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -1889,6 +2074,7 @@ public abstract class FSEditLogOp {
         prefix = "|";
       }
       XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString());
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -1910,9 +2096,15 @@ public abstract class FSEditLogOp {
           }
         }
       }
+      readRpcIdsFromXml(st);
     }
   }
-
+ 
+  /**
+   * {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
+   * meanwhile, startFile and appendFile both have their own corresponding
+   * editlog op.
+   */
   static class ReassignLeaseOp extends FSEditLogOp {
     String leaseHolder;
     String path;
@@ -1988,6 +2180,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */
   static class GetDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
     long expiryTime;
@@ -2059,6 +2252,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */
   static class RenewDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
     long expiryTime;
@@ -2130,6 +2324,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */
   static class CancelDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
 
@@ -2323,7 +2518,8 @@ public abstract class FSEditLogOp {
   }
 
   /**
-   * Operation corresponding to creating a snapshot
+   * Operation corresponding to creating a snapshot.
+   * {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}.
    */
   static class CreateSnapshotOp extends FSEditLogOp {
     String snapshotRoot;
@@ -2351,24 +2547,31 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion) throws IOException {
       snapshotRoot = FSImageSerialization.readString(in);
       snapshotName = FSImageSerialization.readString(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
     public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(snapshotRoot, out);
       FSImageSerialization.writeString(snapshotName, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
     protected void toXml(ContentHandler contentHandler) throws SAXException {
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
       snapshotRoot = st.getValue("SNAPSHOTROOT");
       snapshotName = st.getValue("SNAPSHOTNAME");
+      
+      readRpcIdsFromXml(st);
     }
     
     @Override
@@ -2378,13 +2581,15 @@ public abstract class FSEditLogOp {
       builder.append(snapshotRoot);
       builder.append(", snapshotName=");
       builder.append(snapshotName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
     }
   }
   
   /**
-   * Operation corresponding to delete a snapshot
+   * Operation corresponding to delete a snapshot.
+   * {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}.
    */
   static class DeleteSnapshotOp extends FSEditLogOp {
     String snapshotRoot;
@@ -2412,24 +2617,31 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion) throws IOException {
       snapshotRoot = FSImageSerialization.readString(in);
       snapshotName = FSImageSerialization.readString(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
     public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(snapshotRoot, out);
       FSImageSerialization.writeString(snapshotName, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
     protected void toXml(ContentHandler contentHandler) throws SAXException {
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
       snapshotRoot = st.getValue("SNAPSHOTROOT");
       snapshotName = st.getValue("SNAPSHOTNAME");
+      
+      readRpcIdsFromXml(st);
     }
     
     @Override
@@ -2439,13 +2651,15 @@ public abstract class FSEditLogOp {
       builder.append(snapshotRoot);
       builder.append(", snapshotName=");
       builder.append(snapshotName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
     }
   }
   
   /**
-   * Operation corresponding to rename a snapshot
+   * Operation corresponding to rename a snapshot.
+   * {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}.
    */
   static class RenameSnapshotOp extends FSEditLogOp {
     String snapshotRoot;
@@ -2480,6 +2694,9 @@ public abstract class FSEditLogOp {
       snapshotRoot = FSImageSerialization.readString(in);
       snapshotOldName = FSImageSerialization.readString(in);
       snapshotNewName = FSImageSerialization.readString(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -2487,6 +2704,8 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(snapshotRoot, out);
       FSImageSerialization.writeString(snapshotOldName, out);
       FSImageSerialization.writeString(snapshotNewName, out);
+      
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -2494,6 +2713,7 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
@@ -2501,6 +2721,8 @@ public abstract class FSEditLogOp {
       snapshotRoot = st.getValue("SNAPSHOTROOT");
       snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
       snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
+      
+      readRpcIdsFromXml(st);
     }
     
     @Override
@@ -2512,6 +2734,7 @@ public abstract class FSEditLogOp {
       builder.append(snapshotOldName);
       builder.append(", snapshotNewName=");
       builder.append(snapshotNewName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
     }
@@ -2520,7 +2743,7 @@ public abstract class FSEditLogOp {
   /**
    * Operation corresponding to allow creating snapshot on a directory
    */
-  static class AllowSnapshotOp extends FSEditLogOp {
+  static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent
     String snapshotRoot;
 
     public AllowSnapshotOp() {
@@ -2574,7 +2797,7 @@ public abstract class FSEditLogOp {
   /**
    * Operation corresponding to disallow creating snapshot on a directory
    */
-  static class DisallowSnapshotOp extends FSEditLogOp {
+  static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent
     String snapshotRoot;
 
     public DisallowSnapshotOp() {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Aug 12 21:25:49 2013
@@ -66,8 +66,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.util.IdGenerator;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Mon Aug 12 21:25:49 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
@@ -80,6 +81,7 @@ public class FSImageSerialization {
   static private final class TLData {
     final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
     final ShortWritable U_SHORT = new ShortWritable();
+    final IntWritable U_INT = new IntWritable();
     final LongWritable U_LONG = new LongWritable();
     final FsPermission FILE_PERM = new FsPermission((short) 0);
   }
@@ -350,9 +352,9 @@ public class FSImageSerialization {
   
   /** read the long value */
   static long readLong(DataInput in) throws IOException {
-    LongWritable ustr = TL_DATA.get().U_LONG;
-    ustr.readFields(in);
-    return ustr.get();
+    LongWritable uLong = TL_DATA.get().U_LONG;
+    uLong.readFields(in);
+    return uLong.get();
   }
 
   /** write the long value */
@@ -361,6 +363,20 @@ public class FSImageSerialization {
     uLong.set(value);
     uLong.write(out);
   }
+  
+  /** read the int value */
+  static int readInt(DataInput in) throws IOException {
+    IntWritable uInt = TL_DATA.get().U_INT;
+    uInt.readFields(in);
+    return uInt.get();
+  }
+  
+  /** write the int value */
+  static void writeInt(int value, DataOutputStream out) throws IOException {
+    IntWritable uInt = TL_DATA.get().U_INT;
+    uInt.set(value);
+    uInt.write(out);
+  }
 
   /** read short value */
   static short readShort(DataInput in) throws IOException {
@@ -414,8 +430,13 @@ public class FSImageSerialization {
   private static void writeLocalName(INodeAttributes inode, DataOutput out)
       throws IOException {
     final byte[] name = inode.getLocalNameBytes();
-    out.writeShort(name.length);
-    out.write(name);
+    writeBytes(name, out);
+  }
+  
+  public static void writeBytes(byte[] data, DataOutput out)
+      throws IOException {
+    out.writeShort(data.length);
+    out.write(data);
   }
 
   /**



Mime
View raw message