hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1507170 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/resourc...
Date Fri, 26 Jul 2013 01:09:27 GMT
Author: suresh
Date: Fri Jul 26 01:09:27 2013
New Revision: 1507170

URL: http://svn.apache.org/r1507170
Log:
HDFS-4979. Implement retry cache on Namenode. Contributed by Suresh Srinivas.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 26 01:09:27 2013
@@ -336,6 +336,8 @@ Release 2.1.0-beta - 2013-07-02
 
     HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode
     protocol methods. (suresh)
+
+    HDFS-4979. Implement retry cache on Namenode. (suresh)
     
   IMPROVEMENTS
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 26 01:09:27 2013
@@ -488,4 +488,11 @@ public class DFSConfigKeys extends Commo
   
   public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
   public static final long   DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
+  
+  public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache";
+  public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
+  public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis";
+  public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
+  public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
+  public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 26 01:09:27 2013
@@ -87,7 +87,7 @@ import com.google.common.collect.Sets;
 public class BlockManager {
 
   static final Log LOG = LogFactory.getLog(BlockManager.class);
-  static final Log blockLog = NameNode.blockStateChangeLog;
+  public static final Log blockLog = NameNode.blockStateChangeLog;
 
   /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@@ -2686,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0 
    * The given node is reporting incremental information about some blocks.
    * This includes blocks that are starting to be received, completed being
    * received, or deleted.
+   * 
+   * This method must be called with FSNamesystem lock held.
    */
-  public void processIncrementalBlockReport(final DatanodeID nodeID, 
-     final String poolId, 
-     final ReceivedDeletedBlockInfo blockInfos[]
-  ) throws IOException {
-    namesystem.writeLock();
+  public void processIncrementalBlockReport(final DatanodeID nodeID,
+      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      throws IOException {
+    assert namesystem.hasWriteLock();
     int received = 0;
     int deleted = 0;
     int receiving = 0;
-    try {
-      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        blockLog
-            .warn("BLOCK* processIncrementalBlockReport"
-                + " is received from dead or unregistered node "
-                + nodeID);
-        throw new IOException(
-            "Got incremental block report from unregistered or dead node");
-      }
+    final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+    if (node == null || !node.isAlive) {
+      blockLog
+          .warn("BLOCK* processIncrementalBlockReport"
+              + " is received from dead or unregistered node "
+              + nodeID);
+      throw new IOException(
+          "Got incremental block report from unregistered or dead node");
+    }
 
-      for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
-        switch (rdbi.getStatus()) {
-        case DELETED_BLOCK:
-          removeStoredBlock(rdbi.getBlock(), node);
-          deleted++;
-          break;
-        case RECEIVED_BLOCK:
-          addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
-          received++;
-          break;
-        case RECEIVING_BLOCK:
-          receiving++;
-          processAndHandleReportedBlock(node, rdbi.getBlock(),
-              ReplicaState.RBW, null);
-          break;
-        default:
-          String msg = 
-            "Unknown block status code reported by " + nodeID +
-            ": " + rdbi;
-          blockLog.warn(msg);
-          assert false : msg; // if assertions are enabled, throw.
-          break;
-        }
-        if (blockLog.isDebugEnabled()) {
-          blockLog.debug("BLOCK* block "
-              + (rdbi.getStatus()) + ": " + rdbi.getBlock()
-              + " is received from " + nodeID);
-        }
+    for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+      switch (rdbi.getStatus()) {
+      case DELETED_BLOCK:
+        removeStoredBlock(rdbi.getBlock(), node);
+        deleted++;
+        break;
+      case RECEIVED_BLOCK:
+        addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+        received++;
+        break;
+      case RECEIVING_BLOCK:
+        receiving++;
+        processAndHandleReportedBlock(node, rdbi.getBlock(),
+            ReplicaState.RBW, null);
+        break;
+      default:
+        String msg = 
+          "Unknown block status code reported by " + nodeID +
+          ": " + rdbi;
+        blockLog.warn(msg);
+        assert false : msg; // if assertions are enabled, throw.
+        break;
+      }
+      if (blockLog.isDebugEnabled()) {
+        blockLog.debug("BLOCK* block "
+            + (rdbi.getStatus()) + ": " + rdbi.getBlock()
+            + " is received from " + nodeID);
       }
-    } finally {
-      namesystem.writeUnlock();
-      blockLog
-          .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
-              + nodeID
-              +  " receiving: " + receiving + ", "
-              + " received: " + received + ", "
-              + " deleted: " + deleted);
     }
+    blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+        + nodeID + " receiving: " + receiving + ", " + " received: " + received
+        + ", " + " deleted: " + deleted);
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 26 01:09:27 2013
@@ -63,8 +63,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.util.IdGenerator;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 26 01:09:27 2013
@@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
@@ -195,8 +201,12 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RetryCache.CacheEntry;
+import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.annotation.Metric;
@@ -264,7 +274,8 @@ public class FSNamesystem implements Nam
       }
   };
 
-  private boolean isAuditEnabled() {
+  @VisibleForTesting
+  public boolean isAuditEnabled() {
     return !isDefaultAuditLogger || auditLog.isInfoEnabled();
   }
 
@@ -437,6 +448,8 @@ public class FSNamesystem implements Nam
     
   private INodeId inodeId;
   
+  private final RetryCache retryCache;
+  
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded. 
    */
@@ -671,6 +684,7 @@ public class FSNamesystem implements Nam
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
+      this.retryCache = initRetryCache(conf);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -681,6 +695,28 @@ public class FSNamesystem implements Nam
       throw re;
     }
   }
+  
+  @VisibleForTesting
+  static RetryCache initRetryCache(Configuration conf) {
+    boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
+        DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
+    LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
+    if (enable) {
+      float heapPercent = conf.getFloat(
+          DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
+          DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
+      long entryExpiryMillis = conf.getLong(
+          DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
+          DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
+      LOG.info("Retry cache will use " + heapPercent
+          + " of total heap and retry cache entry expiry time is "
+          + entryExpiryMillis + " millis");
+      long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
+      return new RetryCache("Namenode Retry Cache", heapPercent,
+          entryExpiryNanos);
+    }
+    return null;
+  }
 
   private List<AuditLogger> initAuditLoggers(Configuration conf) {
     // Initialize the custom access loggers if configured.
@@ -741,7 +777,6 @@ public class FSNamesystem implements Nam
       if (!haEnabled) {
         fsImage.openEditLogForWrite();
       }
-      
       success = true;
     } finally {
       if (!success) {
@@ -818,6 +853,7 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
     }
+    RetryCache.clear(retryCache);
   }
   
   /**
@@ -1487,15 +1523,26 @@ public class FSNamesystem implements Nam
    */
   void concat(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    
+    // Either there is no previous request in progres or it has failed
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
           " to " + target);
     }
+    
+    boolean success = false;
     try {
       concatInt(target, srcs);
+      success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
     }
   }
 
@@ -1704,17 +1751,25 @@ public class FSNamesystem implements Nam
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     if (!DFSUtil.isValidName(link)) {
       throw new InvalidPathException("Invalid link name: " + link);
     }
     if (FSDirectory.isReservedName(target)) {
       throw new InvalidPathException("Invalid target name: " + target);
     }
+    boolean success = false;
     try {
       createSymlinkInt(target, link, dirPerms, createParent);
+      success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "createSymlink", link, target, null);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
     }
   }
 
@@ -1853,13 +1908,17 @@ public class FSNamesystem implements Nam
       }
     }
   }
-
+  
   /**
    * Create a new file entry in the namespace.
    * 
-   * For description of parameters and exceptions thrown see 
-   * {@link ClientProtocol#create()}, except it returns valid  file status
-   * upon success
+   * For description of parameters and exceptions thrown see
+   * {@link ClientProtocol#create()}, except it returns valid file status upon
+   * success
+   * 
+   * For retryCache handling details see -
+   * {@link #getFileStatus(boolean, CacheEntryWithPayload)}
+   * 
    */
   HdfsFileStatus startFile(String src, PermissionStatus permissions,
       String holder, String clientMachine, EnumSet<CreateFlag> flag,
@@ -1867,13 +1926,23 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
+    HdfsFileStatus status = null;
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (HdfsFileStatus) cacheEntry.getPayload();
+    }
+    
     try {
-      return startFileInt(src, permissions, holder, clientMachine, flag,
+      status = startFileInt(src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
     } catch (AccessControlException e) {
       logAuditEvent(false, "create", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, status != null, status);
     }
+    return status;
   }
 
   private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
@@ -1896,7 +1965,7 @@ public class FSNamesystem implements Nam
     blockManager.verifyReplication(src, replication, clientMachine);
 
     boolean skipSync = false;
-    final HdfsFileStatus stat;
+    HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     if (blockSize < minBlockSize) {
@@ -1977,7 +2046,12 @@ public class FSNamesystem implements Nam
         }
       } else {
         if (overwrite) {
-          delete(src, true); // File exists - delete if overwrite
+          try {
+            deleteInt(src, true); // File exists - delete if overwrite
+          } catch (AccessControlException e) {
+            logAuditEvent(false, "delete", src);
+            throw e;
+          }
         } else {
           // If lease soft limit time is expired, recover the lease
           recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@@ -2226,16 +2300,28 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    LocatedBlock lb = null;
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (LocatedBlock) cacheEntry.getPayload();
+    }
+      
+    boolean success = false;
     try {
-      return appendFileInt(src, holder, clientMachine);
+      lb = appendFileInt(src, holder, clientMachine);
+      success = true;
+      return lb;
     } catch (AccessControlException e) {
       logAuditEvent(false, "append", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success, lb);
     }
   }
 
-  private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
-      throws AccessControlException, SafeModeException,
+  private LocatedBlock appendFileInt(String src, String holder,
+      String clientMachine) throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2798,12 +2884,20 @@ public class FSNamesystem implements Nam
   @Deprecated
   boolean renameTo(String src, String dst) 
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return true; // Return previous response
+    }
+    boolean ret = false;
     try {
-      return renameToInt(src, dst);
+      ret = renameToInt(src, dst);
     } catch (AccessControlException e) {
       logAuditEvent(false, "rename", src, dst, null);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, ret);
     }
+    return ret;
   }
 
   private boolean renameToInt(String src, String dst) 
@@ -2870,6 +2964,10 @@ public class FSNamesystem implements Nam
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -2882,6 +2980,7 @@ public class FSNamesystem implements Nam
     byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
     HdfsFileStatus resultingStat = null;
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2892,8 +2991,10 @@ public class FSNamesystem implements Nam
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       renameToInternal(pc, src, dst, options);
       resultingStat = getAuditFileInfo(dst, false);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     if (resultingStat != null) {
@@ -2925,12 +3026,20 @@ public class FSNamesystem implements Nam
   boolean delete(String src, boolean recursive)
       throws AccessControlException, SafeModeException,
       UnresolvedLinkException, IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return true; // Return previous response
+    }
+    boolean ret = false;
     try {
-      return deleteInt(src, recursive);
+      ret = deleteInt(src, recursive);
     } catch (AccessControlException e) {
       logAuditEvent(false, "delete", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, ret);
     }
+    return ret;
   }
       
   private boolean deleteInt(String src, boolean recursive)
@@ -2954,6 +3063,7 @@ public class FSNamesystem implements Nam
       throw new AccessControlException(ioe);
     }
   }
+  
   /**
    * Remove a file/directory from the namespace.
    * <p>
@@ -2974,6 +3084,7 @@ public class FSNamesystem implements Nam
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    boolean ret = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2992,6 +3103,7 @@ public class FSNamesystem implements Nam
       if (!dir.delete(src, collectedBlocks, removedINodes)) {
         return false;
       }
+      ret = true;
     } finally {
       writeUnlock();
     }
@@ -3009,7 +3121,7 @@ public class FSNamesystem implements Nam
       NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
         + src +" is removed");
     }
-    return true;
+    return ret;
   }
 
   /**
@@ -3175,12 +3287,14 @@ public class FSNamesystem implements Nam
    */
   boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
+    boolean ret = false;
     try {
-      return mkdirsInt(src, permissions, createParent);
+      ret = mkdirsInt(src, permissions, createParent);
     } catch (AccessControlException e) {
       logAuditEvent(false, "mkdirs", src);
       throw e;
     }
+    return ret;
   }
 
   private boolean mkdirsInt(String src, PermissionStatus permissions,
@@ -3240,7 +3354,7 @@ public class FSNamesystem implements Nam
     }
 
     // validate that we have enough inodes. This is, at best, a 
-    // heuristic because the mkdirs() operation migth need to 
+    // heuristic because the mkdirs() operation might need to 
     // create multiple inodes.
     checkFsObjectLimit();
 
@@ -4040,8 +4154,13 @@ public class FSNamesystem implements Nam
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.UNCHECKED);
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.UNCHECKED);
@@ -4050,10 +4169,12 @@ public class FSNamesystem implements Nam
                               "in order to create namespace image.");
       }
       getFSImage().saveNamespace(this);
-      LOG.info("New namespace image has been created");
+      success = true;
     } finally {
       readUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
+    LOG.info("New namespace image has been created");
   }
   
   /**
@@ -4611,6 +4732,7 @@ public class FSNamesystem implements Nam
         try {
           Thread.sleep(recheckInterval);
         } catch (InterruptedException ie) {
+          // Ignored
         }
       }
       if (!fsRunning) {
@@ -4869,30 +4991,51 @@ public class FSNamesystem implements Nam
     }
   }
 
-  NamenodeCommand startCheckpoint(
-                                NamenodeRegistration bnReg, // backup node
-                                NamenodeRegistration nnReg) // active name-node
-  throws IOException {
+  NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
+      NamenodeRegistration activeNamenode) throws IOException {
     checkOperation(OperationCategory.CHECKPOINT);
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (NamenodeCommand) cacheEntry.getPayload();
+    }
     writeLock();
+    NamenodeCommand cmd = null;
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
       if (isInSafeMode()) {
         throw new SafeModeException("Checkpoint not started", safeMode);
       }
-      LOG.info("Start checkpoint for " + bnReg.getAddress());
-      NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+      LOG.info("Start checkpoint for " + backupNode.getAddress());
+      cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
       getEditLog().logSync();
       return cmd;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, cmd != null, cmd);
     }
   }
 
+  public void processIncrementalBlockReport(final DatanodeID nodeID,
+      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      throws IOException {
+    writeLock();
+    try {
+      blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
+    } finally {
+      writeUnlock();
+    }
+  }
+  
   void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     checkOperation(OperationCategory.CHECKPOINT);
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.CHECKPOINT);
@@ -4902,8 +5045,10 @@ public class FSNamesystem implements Nam
       }
       LOG.info("End checkpoint for " + registration.getAddress());
       getFSImage().endCheckpoint(sig);
+      success = true;
     } finally {
       readUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
   }
 
@@ -5421,6 +5566,10 @@ public class FSNamesystem implements Nam
   void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     checkOperation(OperationCategory.WRITE);
     LOG.info("updatePipeline(block=" + oldBlock
              + ", newGenerationStamp=" + newBlock.getGenerationStamp()
@@ -5429,17 +5578,19 @@ public class FSNamesystem implements Nam
              + ", clientName=" + clientName
              + ")");
     writeLock();
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Pipeline not updated", safeMode);
       }
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
       updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
@@ -6304,9 +6455,14 @@ public class FSNamesystem implements Nam
    */
   String createSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (String) cacheEntry.getPayload();
+    }
     final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
-    final String snapshotPath;
+    String snapshotPath = null;
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
@@ -6330,6 +6486,7 @@ public class FSNamesystem implements Nam
       getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
     }
     getEditLog().logSync();
     
@@ -6349,8 +6506,13 @@ public class FSNamesystem implements Nam
    */
   void renameSnapshot(String path, String snapshotOldName,
       String snapshotNewName) throws SafeModeException, IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
@@ -6364,8 +6526,10 @@ public class FSNamesystem implements Nam
       
       snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
       getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     
@@ -6458,6 +6622,11 @@ public class FSNamesystem implements Nam
   void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
     final FSPermissionChecker pc = getPermissionChecker();
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -6481,8 +6650,10 @@ public class FSNamesystem implements Nam
       this.removeBlocks(collectedBlocks);
       collectedBlocks.clear();
       getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Jul 26 01:09:27 2013
@@ -960,7 +960,7 @@ class NameNodeRpcServer implements Namen
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
           +" blocks.");
     }
-    namesystem.getBlockManager().processIncrementalBlockReport(
+    namesystem.processIncrementalBlockReport(
         nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 26 01:09:27 2013
@@ -1352,4 +1352,43 @@
   </description>
 </property>
 
+<property>
+	<name>dfs.namenode.enable.retrycache</name>
+	<value>true</value>
+	<description>
+	  This enables the retry cache on the namenode. Namenode tracks for
+	  non-idempotent requests the corresponding response. If a client retries the
+	  request, the response from the retry cache is sent. Such operations
+	  are tagged with annotation @AtMostOnce in namenode protocols. It is
+	  recommended that this flag be set to true. Setting it to false, will result
+	  in clients getting failure responses to retried request. This flag must 
+	  be enabled in HA setup for transparent fail-overs.
+	  
+	  The entries in the cache have expiration time configurable
+	  using dfs.namenode.retrycache.expirytime.millis.
+	</description>
+</property>
+
+<property>
+	<name>dfs.namenode.retrycache.expirytime.millis</name>
+	<value>600000</value>
+	<description>
+	  The time for which retry cache entries are retained.
+	</description>
+</property>
+
+<property>
+	<name>dfs.namenode.retrycache.heap.percent</name>
+	<value>0.03f</value>
+	<description>
+	  This parameter configures the heap size allocated for retry cache
+	  (excluding the response cached). This corresponds to approximately
+	  4096 entries for every 64MB of namenode process java heap size.
+	  Assuming retry cache entry expiration time (configured using
+	  dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
+	  enables retry cache to support 7 operations per second sustained
+	  for 10 minutes. As the heap size is increased, the operation rate
+	  linearly increases.
+	</description>
+</property>
 </configuration>

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java?rev=1507170&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java Fri Jul 26 01:09:27 2013
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for ensuring the namenode retry cache works correctly for
+ * non-idempotent requests.
+ * 
+ * Retry cache works based on tracking previously received request based on the
+ * ClientId and CallId received in RPC requests and storing the response. The
+ * response is replayed on retry when the same request is received again.
+ * 
+ * The test works by manipulating the Rpc {@link Server} current RPC call. For
+ * testing retried requests, an Rpc callId is generated only once using
+ * {@link #newCall()} and reused for many method calls. For testing non-retried
+ * request, a new callId is generated using {@link #newCall()}.
+ */
+public class TestNamenodeRetryCache {
+  private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
+  private static MiniDFSCluster cluster;
+  private static FSNamesystem namesystem;
+  private static PermissionStatus perm = new PermissionStatus(
+      "TestNamenodeRetryCache", null, FsPermission.getDefault());
+  private static FileSystem filesystem;
+  private static int callId = 100;
+  
+  /** Start a cluster */
+  @BeforeClass
+  public static void setup() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    namesystem = cluster.getNamesystem();
+    filesystem = cluster.getFileSystem();
+  }
+  
+  /** Cleanup after the test 
+   * @throws IOException 
+   * @throws UnresolvedLinkException 
+   * @throws SafeModeException 
+   * @throws AccessControlException */
+  @After
+  public void cleanup() throws IOException {
+    namesystem.delete("/", true);
+  }
+  
+  public static void incrementCallId() {
+    callId++;
+  }
+  
+  /** Set the current Server RPC call */
+  public static void newCall() {
+    Server.Call call = new Server.Call(++callId, 1, null, null,
+        RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
+    Server.getCurCall().set(call);
+  }
+  
+  public static void resetCall() {
+    Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
+        null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
+    Server.getCurCall().set(call);
+  }
+  
+  private void concatSetup(String file1, String file2) throws Exception {
+    DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L);
+    DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L);
+  }
+  
+  /**
+   * Tests for concat call
+   */
+  @Test
+  public void testConcat() throws Exception {
+    resetCall();
+    String file1 = "/testNamenodeRetryCache/testConcat/file1";
+    String file2 = "/testNamenodeRetryCache/testConcat/file2";
+    
+    // Two retried concat calls succeed
+    concatSetup(file1, file2);
+    newCall();
+    namesystem.concat(file1, new String[]{file2});
+    namesystem.concat(file1, new String[]{file2});
+    namesystem.concat(file1, new String[]{file2});
+    
+    // A non-retried concat request fails
+    newCall();
+    try {
+      // Second non-retry call should fail with an exception
+      namesystem.concat(file1, new String[]{file2});
+      Assert.fail("testConcat - expected exception is not thrown");
+    } catch (IOException e) {
+      // Expected
+    }
+  }
+  
+  /**
+   * Tests for delete call
+   */
+  @Test
+  public void testDelete() throws Exception {
+    String dir = "/testNamenodeRetryCache/testDelete";
+    // Two retried calls to create a non existent file
+    newCall();
+    namesystem.mkdirs(dir, perm, true);
+    newCall();
+    Assert.assertTrue(namesystem.delete(dir, false));
+    Assert.assertTrue(namesystem.delete(dir, false));
+    Assert.assertTrue(namesystem.delete(dir, false));
+    
+    // non-retried call fails and gets false as return
+    newCall();
+    Assert.assertFalse(namesystem.delete(dir, false));
+  }
+  
+  /**
+   * Test for createSymlink
+   */
+  @Test
+  public void testCreateSymlink() throws Exception {
+    String target = "/testNamenodeRetryCache/testCreateSymlink/target";
+    
+    // Two retried symlink calls succeed
+    newCall();
+    namesystem.createSymlink(target, "/a/b", perm, true);
+    namesystem.createSymlink(target, "/a/b", perm, true);
+    namesystem.createSymlink(target, "/a/b", perm, true);
+    
+    // non-retried call fails
+    newCall();
+    try {
+      // Second non-retry call should fail with an exception
+      namesystem.createSymlink(target, "/a/b", perm, true);
+      Assert.fail("testCreateSymlink - expected exception is not thrown");
+    } catch (IOException e) {
+      // Expected
+    }
+  }
+  
+  /**
+   * Test for create file
+   */
+  @Test
+  public void testCreate() throws Exception {
+    String src = "/testNamenodeRetryCache/testCreate/file";
+    // Two retried calls succeed
+    newCall();
+    HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
+        "clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
+    Assert.assertEquals(status, namesystem.startFile(src, perm, 
+        "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE), 
+        true, (short) 1, 512));
+    Assert.assertEquals(status, namesystem.startFile(src, perm, 
+        "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE), 
+        true, (short) 1, 512));
+    
+    // A non-retried call fails
+    newCall();
+    try {
+      namesystem.startFile(src, perm, "holder", "clientmachine",
+          EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
+      Assert.fail("testCreate - expected exception is not thrown");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+  
+  /**
+   * Test for rename1
+   */
+  @Test
+  public void testAppend() throws Exception {
+    String src = "/testNamenodeRetryCache/testAppend/src";
+    resetCall();
+    // Create a file with partial block
+    DFSTestUtil.createFile(filesystem, new Path(src), 128, (short)1, 0L);
+    
+    // Retried append requests succeed
+    newCall();
+    LocatedBlock b = namesystem.appendFile(src, "holder", "clientMachine");
+    Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
+    Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
+    
+    // non-retried call fails
+    newCall();
+    try {
+      namesystem.appendFile(src, "holder", "clientMachine");
+      Assert.fail("testAppend - expected exception is not thrown");
+    } catch (Exception e) {
+      // Expected
+    }
+  }
+  
+  /**
+   * Test for rename1
+   */
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testRename1() throws Exception {
+    String src = "/testNamenodeRetryCache/testRename1/src";
+    String target = "/testNamenodeRetryCache/testRename1/target";
+    resetCall();
+    namesystem.mkdirs(src, perm, true);
+    
+    // Retried renames succeed
+    newCall();
+    Assert.assertTrue(namesystem.renameTo(src, target));
+    Assert.assertTrue(namesystem.renameTo(src, target));
+    Assert.assertTrue(namesystem.renameTo(src, target));
+    
+    // A non-retried request fails
+    newCall();
+    Assert.assertFalse(namesystem.renameTo(src, target));
+  }
+  
+  /**
+   * Test for rename2
+   */
+  @Test
+  public void testRename2() throws Exception {
+    String src = "/testNamenodeRetryCache/testRename2/src";
+    String target = "/testNamenodeRetryCache/testRename2/target";
+    resetCall();
+    namesystem.mkdirs(src, perm, true);
+    
+    // Retried renames succeed
+    newCall();
+    namesystem.renameTo(src, target, Rename.NONE);
+    namesystem.renameTo(src, target, Rename.NONE);
+    namesystem.renameTo(src, target, Rename.NONE);
+    
+    // A non-retried request fails
+    newCall();
+    try {
+      namesystem.renameTo(src, target, Rename.NONE);
+      Assert.fail("testRename 2 expected exception is not thrown");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+  
+  /**
+   * Test for crateSnapshot
+   */
+  @Test
+  public void testSnapshotMethods() throws Exception {
+    String dir = "/testNamenodeRetryCache/testCreateSnapshot/src";
+    resetCall();
+    namesystem.mkdirs(dir, perm, true);
+    namesystem.allowSnapshot(dir);
+    
+    // Test retry of create snapshot
+    newCall();
+    String name = namesystem.createSnapshot(dir, "snap1");
+    Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
+    Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
+    Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
+    
+    // Non retried calls should fail
+    newCall();
+    try {
+      namesystem.createSnapshot(dir, "snap1");
+      Assert.fail("testSnapshotMethods expected exception is not thrown");
+    } catch (IOException e) {
+      // exptected
+    }
+    
+    // Test retry of rename snapshot
+    newCall();
+    namesystem.renameSnapshot(dir, "snap1", "snap2");
+    namesystem.renameSnapshot(dir, "snap1", "snap2");
+    namesystem.renameSnapshot(dir, "snap1", "snap2");
+    
+    // Non retried calls should fail
+    newCall();
+    try {
+      namesystem.renameSnapshot(dir, "snap1", "snap2");
+      Assert.fail("testSnapshotMethods expected exception is not thrown");
+    } catch (IOException e) {
+      // expected
+    }
+    
+    // Test retry of delete snapshot
+    newCall();
+    namesystem.deleteSnapshot(dir, "snap2");
+    namesystem.deleteSnapshot(dir, "snap2");
+    namesystem.deleteSnapshot(dir, "snap2");
+    
+    // Non retried calls should fail
+    newCall();
+    try {
+      namesystem.deleteSnapshot(dir, "snap2");
+      Assert.fail("testSnapshotMethods expected exception is not thrown");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testRetryCacheConfig() {
+    // By default retry configuration should be enabled
+    Configuration conf = new HdfsConfiguration();
+    Assert.assertNotNull(FSNamesystem.initRetryCache(conf)); 
+    
+    // If retry cache is disabled, it should not be created
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
+    Assert.assertNull(FSNamesystem.initRetryCache(conf));
+  }
+}



Mime
View raw message