hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1507176 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenod...
Date Fri, 26 Jul 2013 01:34:19 GMT
Author: suresh
Date: Fri Jul 26 01:34:18 2013
New Revision: 1507176

URL: http://svn.apache.org/r1507176
Log:
HDFS-4979. Merge 1507173 from branch-2

Added:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
      - copied unchanged from r1507173, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri
Jul 26 01:34:18 2013
@@ -75,6 +75,8 @@ Release 2.1.0-beta - 2013-07-02
 
     HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode
     protocol methods. (suresh)
+
+    HDFS-4979. Implement retry cache on Namenode. (suresh)
     
   IMPROVEMENTS
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Fri Jul 26 01:34:18 2013
@@ -482,4 +482,11 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
+  
+  public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache";
+  public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
+  public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis";
+  public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; //
10 minutes
+  public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
+  public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Fri Jul 26 01:34:18 2013
@@ -86,7 +86,7 @@ import com.google.common.collect.Sets;
 public class BlockManager {
 
   static final Log LOG = LogFactory.getLog(BlockManager.class);
-  static final Log blockLog = NameNode.blockStateChangeLog;
+  public static final Log blockLog = NameNode.blockStateChangeLog;
 
   /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@@ -2659,64 +2659,58 @@ assert storedBlock.findDatanode(dn) < 0 
    * The given node is reporting incremental information about some blocks.
    * This includes blocks that are starting to be received, completed being
    * received, or deleted.
+   * 
+   * This method must be called with FSNamesystem lock held.
    */
-  public void processIncrementalBlockReport(final DatanodeID nodeID, 
-     final String poolId, 
-     final ReceivedDeletedBlockInfo blockInfos[]
-  ) throws IOException {
-    namesystem.writeLock();
+  public void processIncrementalBlockReport(final DatanodeID nodeID,
+      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      throws IOException {
+    assert namesystem.hasWriteLock();
     int received = 0;
     int deleted = 0;
     int receiving = 0;
-    try {
-      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
-        blockLog
-            .warn("BLOCK* processIncrementalBlockReport"
-                + " is received from dead or unregistered node "
-                + nodeID);
-        throw new IOException(
-            "Got incremental block report from unregistered or dead node");
-      }
+    final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+    if (node == null || !node.isAlive) {
+      blockLog
+          .warn("BLOCK* processIncrementalBlockReport"
+              + " is received from dead or unregistered node "
+              + nodeID);
+      throw new IOException(
+          "Got incremental block report from unregistered or dead node");
+    }
 
-      for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
-        switch (rdbi.getStatus()) {
-        case DELETED_BLOCK:
-          removeStoredBlock(rdbi.getBlock(), node);
-          deleted++;
-          break;
-        case RECEIVED_BLOCK:
-          addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
-          received++;
-          break;
-        case RECEIVING_BLOCK:
-          receiving++;
-          processAndHandleReportedBlock(node, rdbi.getBlock(),
-              ReplicaState.RBW, null);
-          break;
-        default:
-          String msg = 
-            "Unknown block status code reported by " + nodeID +
-            ": " + rdbi;
-          blockLog.warn(msg);
-          assert false : msg; // if assertions are enabled, throw.
-          break;
-        }
-        if (blockLog.isDebugEnabled()) {
-          blockLog.debug("BLOCK* block "
-              + (rdbi.getStatus()) + ": " + rdbi.getBlock()
-              + " is received from " + nodeID);
-        }
+    for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+      switch (rdbi.getStatus()) {
+      case DELETED_BLOCK:
+        removeStoredBlock(rdbi.getBlock(), node);
+        deleted++;
+        break;
+      case RECEIVED_BLOCK:
+        addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+        received++;
+        break;
+      case RECEIVING_BLOCK:
+        receiving++;
+        processAndHandleReportedBlock(node, rdbi.getBlock(),
+            ReplicaState.RBW, null);
+        break;
+      default:
+        String msg = 
+          "Unknown block status code reported by " + nodeID +
+          ": " + rdbi;
+        blockLog.warn(msg);
+        assert false : msg; // if assertions are enabled, throw.
+        break;
+      }
+      if (blockLog.isDebugEnabled()) {
+        blockLog.debug("BLOCK* block "
+            + (rdbi.getStatus()) + ": " + rdbi.getBlock()
+            + " is received from " + nodeID);
       }
-    } finally {
-      namesystem.writeUnlock();
-      blockLog
-          .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
-              + nodeID
-              +  " receiving: " + receiving + ", "
-              + " received: " + received + ", "
-              + " deleted: " + deleted);
     }
+    blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+        + nodeID + " receiving: " + receiving + ", " + " received: " + received
+        + ", " + " deleted: " + deleted);
   }
 
   /**

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
Fri Jul 26 01:34:18 2013
@@ -66,8 +66,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.util.IdGenerator;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Fri Jul 26 01:34:18 2013
@@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
@@ -194,8 +200,12 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RetryCache.CacheEntry;
+import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.annotation.Metric;
@@ -247,7 +257,8 @@ public class FSNamesystem implements Nam
       }
   };
 
-  private boolean isAuditEnabled() {
+  @VisibleForTesting
+  public boolean isAuditEnabled() {
     return !isDefaultAuditLogger || auditLog.isInfoEnabled();
   }
 
@@ -420,6 +431,8 @@ public class FSNamesystem implements Nam
     
   private INodeId inodeId;
   
+  private final RetryCache retryCache;
+  
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded. 
    */
@@ -654,6 +667,7 @@ public class FSNamesystem implements Nam
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
+      this.retryCache = initRetryCache(conf);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -664,6 +678,28 @@ public class FSNamesystem implements Nam
       throw re;
     }
   }
+  
+  @VisibleForTesting
+  static RetryCache initRetryCache(Configuration conf) {
+    boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
+        DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
+    LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
+    if (enable) {
+      float heapPercent = conf.getFloat(
+          DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
+          DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
+      long entryExpiryMillis = conf.getLong(
+          DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
+          DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
+      LOG.info("Retry cache will use " + heapPercent
+          + " of total heap and retry cache entry expiry time is "
+          + entryExpiryMillis + " millis");
+      long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
+      return new RetryCache("Namenode Retry Cache", heapPercent,
+          entryExpiryNanos);
+    }
+    return null;
+  }
 
   private List<AuditLogger> initAuditLoggers(Configuration conf) {
     // Initialize the custom access loggers if configured.
@@ -724,7 +760,6 @@ public class FSNamesystem implements Nam
       if (!haEnabled) {
         fsImage.openEditLogForWrite();
       }
-      
       success = true;
     } finally {
       if (!success) {
@@ -801,6 +836,7 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
     }
+    RetryCache.clear(retryCache);
   }
   
   /**
@@ -1469,15 +1505,26 @@ public class FSNamesystem implements Nam
    */
   void concat(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    
+    // Either there is no previous request in progres or it has failed
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
           " to " + target);
     }
+    
+    boolean success = false;
     try {
       concatInt(target, srcs);
+      success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
     }
   }
 
@@ -1686,17 +1733,25 @@ public class FSNamesystem implements Nam
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     if (!DFSUtil.isValidName(link)) {
       throw new InvalidPathException("Invalid link name: " + link);
     }
     if (FSDirectory.isReservedName(target)) {
       throw new InvalidPathException("Invalid target name: " + target);
     }
+    boolean success = false;
     try {
       createSymlinkInt(target, link, dirPerms, createParent);
+      success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "createSymlink", link, target, null);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
     }
   }
 
@@ -1835,13 +1890,17 @@ public class FSNamesystem implements Nam
       }
     }
   }
-
+  
   /**
    * Create a new file entry in the namespace.
    * 
-   * For description of parameters and exceptions thrown see 
-   * {@link ClientProtocol#create()}, except it returns valid  file status
-   * upon success
+   * For description of parameters and exceptions thrown see
+   * {@link ClientProtocol#create()}, except it returns valid file status upon
+   * success
+   * 
+   * For retryCache handling details see -
+   * {@link #getFileStatus(boolean, CacheEntryWithPayload)}
+   * 
    */
   HdfsFileStatus startFile(String src, PermissionStatus permissions,
       String holder, String clientMachine, EnumSet<CreateFlag> flag,
@@ -1849,13 +1908,23 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
+    HdfsFileStatus status = null;
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (HdfsFileStatus) cacheEntry.getPayload();
+    }
+    
     try {
-      return startFileInt(src, permissions, holder, clientMachine, flag,
+      status = startFileInt(src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
     } catch (AccessControlException e) {
       logAuditEvent(false, "create", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, status != null, status);
     }
+    return status;
   }
 
   private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
@@ -1878,7 +1947,7 @@ public class FSNamesystem implements Nam
     blockManager.verifyReplication(src, replication, clientMachine);
 
     boolean skipSync = false;
-    final HdfsFileStatus stat;
+    HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     if (blockSize < minBlockSize) {
@@ -1958,7 +2027,12 @@ public class FSNamesystem implements Nam
         }
       } else {
         if (overwrite) {
-          delete(src, true); // File exists - delete if overwrite
+          try {
+            deleteInt(src, true); // File exists - delete if overwrite
+          } catch (AccessControlException e) {
+            logAuditEvent(false, "delete", src);
+            throw e;
+          }
         } else {
           // If lease soft limit time is expired, recover the lease
           recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@@ -2207,16 +2281,28 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    LocatedBlock lb = null;
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (LocatedBlock) cacheEntry.getPayload();
+    }
+      
+    boolean success = false;
     try {
-      return appendFileInt(src, holder, clientMachine);
+      lb = appendFileInt(src, holder, clientMachine);
+      success = true;
+      return lb;
     } catch (AccessControlException e) {
       logAuditEvent(false, "append", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success, lb);
     }
   }
 
-  private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
-      throws AccessControlException, SafeModeException,
+  private LocatedBlock appendFileInt(String src, String holder,
+      String clientMachine) throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2779,12 +2865,20 @@ public class FSNamesystem implements Nam
   @Deprecated
   boolean renameTo(String src, String dst) 
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return true; // Return previous response
+    }
+    boolean ret = false;
     try {
-      return renameToInt(src, dst);
+      ret = renameToInt(src, dst);
     } catch (AccessControlException e) {
       logAuditEvent(false, "rename", src, dst, null);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, ret);
     }
+    return ret;
   }
 
   private boolean renameToInt(String src, String dst) 
@@ -2851,6 +2945,10 @@ public class FSNamesystem implements Nam
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
@@ -2863,6 +2961,7 @@ public class FSNamesystem implements Nam
     byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
     HdfsFileStatus resultingStat = null;
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2873,8 +2972,10 @@ public class FSNamesystem implements Nam
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       renameToInternal(pc, src, dst, options);
       resultingStat = getAuditFileInfo(dst, false);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     if (resultingStat != null) {
@@ -2906,12 +3007,20 @@ public class FSNamesystem implements Nam
   boolean delete(String src, boolean recursive)
       throws AccessControlException, SafeModeException,
       UnresolvedLinkException, IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return true; // Return previous response
+    }
+    boolean ret = false;
     try {
-      return deleteInt(src, recursive);
+      ret = deleteInt(src, recursive);
     } catch (AccessControlException e) {
       logAuditEvent(false, "delete", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, ret);
     }
+    return ret;
   }
       
   private boolean deleteInt(String src, boolean recursive)
@@ -2935,6 +3044,7 @@ public class FSNamesystem implements Nam
       throw new AccessControlException(ioe);
     }
   }
+  
   /**
    * Remove a file/directory from the namespace.
    * <p>
@@ -2955,6 +3065,7 @@ public class FSNamesystem implements Nam
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    boolean ret = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2973,6 +3084,7 @@ public class FSNamesystem implements Nam
       if (!dir.delete(src, collectedBlocks, removedINodes)) {
         return false;
       }
+      ret = true;
     } finally {
       writeUnlock();
     }
@@ -2990,7 +3102,7 @@ public class FSNamesystem implements Nam
       NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
         + src +" is removed");
     }
-    return true;
+    return ret;
   }
 
   /**
@@ -3156,12 +3268,14 @@ public class FSNamesystem implements Nam
    */
   boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
+    boolean ret = false;
     try {
-      return mkdirsInt(src, permissions, createParent);
+      ret = mkdirsInt(src, permissions, createParent);
     } catch (AccessControlException e) {
       logAuditEvent(false, "mkdirs", src);
       throw e;
     }
+    return ret;
   }
 
   private boolean mkdirsInt(String src, PermissionStatus permissions,
@@ -3221,7 +3335,7 @@ public class FSNamesystem implements Nam
     }
 
     // validate that we have enough inodes. This is, at best, a 
-    // heuristic because the mkdirs() operation migth need to 
+    // heuristic because the mkdirs() operation might need to 
     // create multiple inodes.
     checkFsObjectLimit();
 
@@ -4021,8 +4135,13 @@ public class FSNamesystem implements Nam
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.UNCHECKED);
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.UNCHECKED);
@@ -4031,10 +4150,12 @@ public class FSNamesystem implements Nam
                               "in order to create namespace image.");
       }
       getFSImage().saveNamespace(this);
-      LOG.info("New namespace image has been created");
+      success = true;
     } finally {
       readUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
+    LOG.info("New namespace image has been created");
   }
   
   /**
@@ -4592,6 +4713,7 @@ public class FSNamesystem implements Nam
         try {
           Thread.sleep(recheckInterval);
         } catch (InterruptedException ie) {
+          // Ignored
         }
       }
       if (!fsRunning) {
@@ -4850,30 +4972,51 @@ public class FSNamesystem implements Nam
     }
   }
 
-  NamenodeCommand startCheckpoint(
-                                NamenodeRegistration bnReg, // backup node
-                                NamenodeRegistration nnReg) // active name-node
-  throws IOException {
+  NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
+      NamenodeRegistration activeNamenode) throws IOException {
     checkOperation(OperationCategory.CHECKPOINT);
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (NamenodeCommand) cacheEntry.getPayload();
+    }
     writeLock();
+    NamenodeCommand cmd = null;
     try {
       checkOperation(OperationCategory.CHECKPOINT);
 
       if (isInSafeMode()) {
         throw new SafeModeException("Checkpoint not started", safeMode);
       }
-      LOG.info("Start checkpoint for " + bnReg.getAddress());
-      NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+      LOG.info("Start checkpoint for " + backupNode.getAddress());
+      cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
       getEditLog().logSync();
       return cmd;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, cmd != null, cmd);
     }
   }
 
+  public void processIncrementalBlockReport(final DatanodeID nodeID,
+      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      throws IOException {
+    writeLock();
+    try {
+      blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
+    } finally {
+      writeUnlock();
+    }
+  }
+  
   void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     checkOperation(OperationCategory.CHECKPOINT);
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.CHECKPOINT);
@@ -4883,8 +5026,10 @@ public class FSNamesystem implements Nam
       }
       LOG.info("End checkpoint for " + registration.getAddress());
       getFSImage().endCheckpoint(sig);
+      success = true;
     } finally {
       readUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
   }
 
@@ -5380,6 +5525,10 @@ public class FSNamesystem implements Nam
   void updatePipeline(String clientName, ExtendedBlock oldBlock, 
       ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     checkOperation(OperationCategory.WRITE);
     LOG.info("updatePipeline(block=" + oldBlock
              + ", newGenerationStamp=" + newBlock.getGenerationStamp()
@@ -5388,17 +5537,19 @@ public class FSNamesystem implements Nam
              + ", clientName=" + clientName
              + ")");
     writeLock();
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Pipeline not updated", safeMode);
       }
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
       updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
@@ -6178,9 +6329,14 @@ public class FSNamesystem implements Nam
    */
   String createSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
+    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+        null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (String) cacheEntry.getPayload();
+    }
     final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
-    final String snapshotPath;
+    String snapshotPath = null;
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
@@ -6204,6 +6360,7 @@ public class FSNamesystem implements Nam
       getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
     }
     getEditLog().logSync();
     
@@ -6223,8 +6380,13 @@ public class FSNamesystem implements Nam
    */
   void renameSnapshot(String path, String snapshotOldName,
       String snapshotNewName) throws SafeModeException, IOException {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
     final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
@@ -6238,8 +6400,10 @@ public class FSNamesystem implements Nam
       
       snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
       getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     
@@ -6332,6 +6496,11 @@ public class FSNamesystem implements Nam
   void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws SafeModeException, IOException {
     final FSPermissionChecker pc = getPermissionChecker();
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -6355,8 +6524,10 @@ public class FSNamesystem implements Nam
       this.removeBlocks(collectedBlocks);
       collectedBlocks.clear();
       getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+      success = true;
     } finally {
       writeUnlock();
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
     

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
Fri Jul 26 01:34:18 2013
@@ -951,7 +951,7 @@ class NameNodeRpcServer implements Namen
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
           +" blocks.");
     }
-    namesystem.getBlockManager().processIncrementalBlockReport(
+    namesystem.processIncrementalBlockReport(
         nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
   }
   

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
Fri Jul 26 01:34:18 2013
@@ -1307,4 +1307,43 @@
   </description>
 </property>
 
+<property>
+	<name>dfs.namenode.enable.retrycache</name>
+	<value>true</value>
+	<description>
+	  This enables the retry cache on the namenode. Namenode tracks for
+	  non-idempotent requests the corresponding response. If a client retries the
+	  request, the response from the retry cache is sent. Such operations
+	  are tagged with annotation @AtMostOnce in namenode protocols. It is
+	  recommended that this flag be set to true. Setting it to false, will result
+	  in clients getting failure responses to retried request. This flag must 
+	  be enabled in HA setup for transparent fail-overs.
+	  
+	  The entries in the cache have expiration time configurable
+	  using dfs.namenode.retrycache.expirytime.millis.
+	</description>
+</property>
+
+<property>
+	<name>dfs.namenode.retrycache.expirytime.millis</name>
+	<value>600000</value>
+	<description>
+	  The time for which retry cache entries are retained.
+	</description>
+</property>
+
+<property>
+	<name>dfs.namenode.retrycache.heap.percent</name>
+	<value>0.03f</value>
+	<description>
+	  This parameter configures the heap size allocated for retry cache
+	  (excluding the response cached). This corresponds to approximately
+	  4096 entries for every 64MB of namenode process java heap size.
+	  Assuming retry cache entry expiration time (configured using
+	  dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
+	  enables retry cache to support 7 operations per second sustained
+	  for 10 minutes. As the heap size is increased, the operation rate
+	  linearly increases.
+	</description>
+</property>
 </configuration>



Mime
View raw message