Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E683100A9 for ; Fri, 26 Jul 2013 01:19:49 +0000 (UTC) Received: (qmail 88714 invoked by uid 500); 26 Jul 2013 01:19:49 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 88682 invoked by uid 500); 26 Jul 2013 01:19:49 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 88674 invoked by uid 99); 26 Jul 2013 01:19:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jul 2013 01:19:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jul 2013 01:19:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 03D0723889DA; Fri, 26 Jul 2013 01:19:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1507173 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/... Date: Fri, 26 Jul 2013 01:19:25 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130726011926.03D0723889DA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Fri Jul 26 01:19:25 2013 New Revision: 1507173 URL: http://svn.apache.org/r1507173 Log: HDFS-4979. Merge 1507170 from trunk Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java - copied unchanged from r1507170, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 26 01:19:25 2013 @@ -109,6 +109,8 @@ Release 2.1.0-beta - 2013-07-02 HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode protocol methods. (suresh) + + HDFS-4979. Implement retry cache on Namenode. (suresh) IMPROVEMENTS Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 26 01:19:25 2013 @@ -488,4 +488,11 @@ public class DFSConfigKeys extends Commo public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log"; public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l; + + public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache"; + public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true; + public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis"; + public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes + public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; + public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 26 01:19:25 2013 @@ -87,7 +87,7 @@ import com.google.common.collect.Sets; public class BlockManager { static final Log LOG = LogFactory.getLog(BlockManager.class); - static final Log blockLog = NameNode.blockStateChangeLog; + public static final Log blockLog = NameNode.blockStateChangeLog; /** Default load factor of map */ public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; @@ -2686,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0 * The given node is reporting incremental information about some blocks. * This includes blocks that are starting to be received, completed being * received, or deleted. + * + * This method must be called with FSNamesystem lock held. */ - public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, - final ReceivedDeletedBlockInfo blockInfos[] - ) throws IOException { - namesystem.writeLock(); + public void processIncrementalBlockReport(final DatanodeID nodeID, + final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + throws IOException { + assert namesystem.hasWriteLock(); int received = 0; int deleted = 0; int receiving = 0; - try { - final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isAlive) { - blockLog - .warn("BLOCK* processIncrementalBlockReport" - + " is received from dead or unregistered node " - + nodeID); - throw new IOException( - "Got incremental block report from unregistered or dead node"); - } + final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + if (node == null || !node.isAlive) { + blockLog + .warn("BLOCK* processIncrementalBlockReport" + + " is received from dead or unregistered node " + + nodeID); + throw new IOException( + "Got incremental block report from unregistered or dead node"); + } - for (ReceivedDeletedBlockInfo rdbi : blockInfos) { - switch (rdbi.getStatus()) { - case DELETED_BLOCK: - removeStoredBlock(rdbi.getBlock(), node); - deleted++; - break; - case RECEIVED_BLOCK: - addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); - received++; - break; - case RECEIVING_BLOCK: - receiving++; - processAndHandleReportedBlock(node, rdbi.getBlock(), - ReplicaState.RBW, null); - break; - default: - String msg = - "Unknown block status code reported by " + nodeID + - ": " + rdbi; - blockLog.warn(msg); - assert false : msg; // if assertions are enabled, throw. - break; - } - if (blockLog.isDebugEnabled()) { - blockLog.debug("BLOCK* block " - + (rdbi.getStatus()) + ": " + rdbi.getBlock() - + " is received from " + nodeID); - } + for (ReceivedDeletedBlockInfo rdbi : blockInfos) { + switch (rdbi.getStatus()) { + case DELETED_BLOCK: + removeStoredBlock(rdbi.getBlock(), node); + deleted++; + break; + case RECEIVED_BLOCK: + addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); + received++; + break; + case RECEIVING_BLOCK: + receiving++; + processAndHandleReportedBlock(node, rdbi.getBlock(), + ReplicaState.RBW, null); + break; + default: + String msg = + "Unknown block status code reported by " + nodeID + + ": " + rdbi; + blockLog.warn(msg); + assert false : msg; // if assertions are enabled, throw. + break; + } + if (blockLog.isDebugEnabled()) { + blockLog.debug("BLOCK* block " + + (rdbi.getStatus()) + ": " + rdbi.getBlock() + + " is received from " + nodeID); } - } finally { - namesystem.writeUnlock(); - blockLog - .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from " - + nodeID - + " receiving: " + receiving + ", " - + " received: " + received + ", " - + " deleted: " + deleted); } + blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from " + + nodeID + " receiving: " + receiving + ", " + " received: " + received + + ", " + " deleted: " + deleted); } /** Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 26 01:19:25 2013 @@ -66,8 +66,6 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; -import org.apache.hadoop.util.IdGenerator; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 26 01:19:25 2013 @@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; @@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY; @@ -195,8 +201,12 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetryCache; +import org.apache.hadoop.ipc.RetryCache.CacheEntry; +import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.annotation.Metric; @@ -249,7 +259,8 @@ public class FSNamesystem implements Nam } }; - private boolean isAuditEnabled() { + @VisibleForTesting + public boolean isAuditEnabled() { return !isDefaultAuditLogger || auditLog.isInfoEnabled(); } @@ -422,6 +433,8 @@ public class FSNamesystem implements Nam private INodeId inodeId; + private final RetryCache retryCache; + /** * Set the last allocated inode id when fsimage or editlog is loaded. */ @@ -656,6 +669,7 @@ public class FSNamesystem implements Nam this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && auditLoggers.get(0) instanceof DefaultAuditLogger; + this.retryCache = initRetryCache(conf); } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); @@ -666,6 +680,28 @@ public class FSNamesystem implements Nam throw re; } } + + @VisibleForTesting + static RetryCache initRetryCache(Configuration conf) { + boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, + DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT); + LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled")); + if (enable) { + float heapPercent = conf.getFloat( + DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY, + DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT); + long entryExpiryMillis = conf.getLong( + DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY, + DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT); + LOG.info("Retry cache will use " + heapPercent + + " of total heap and retry cache entry expiry time is " + + entryExpiryMillis + " millis"); + long entryExpiryNanos = entryExpiryMillis * 1000 * 1000; + return new RetryCache("Namenode Retry Cache", heapPercent, + entryExpiryNanos); + } + return null; + } private List initAuditLoggers(Configuration conf) { // Initialize the custom access loggers if configured. @@ -726,7 +762,6 @@ public class FSNamesystem implements Nam if (!haEnabled) { fsImage.openEditLogForWrite(); } - success = true; } finally { if (!success) { @@ -803,6 +838,7 @@ public class FSNamesystem implements Nam } finally { writeUnlock(); } + RetryCache.clear(retryCache); } /** @@ -1471,15 +1507,26 @@ public class FSNamesystem implements Nam */ void concat(String target, String [] srcs) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + + // Either there is no previous request in progres or it has failed if(FSNamesystem.LOG.isDebugEnabled()) { FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + " to " + target); } + + boolean success = false; try { concatInt(target, srcs); + success = true; } catch (AccessControlException e) { logAuditEvent(false, "concat", Arrays.toString(srcs), target, null); throw e; + } finally { + RetryCache.setState(cacheEntry, success); } } @@ -1688,17 +1735,25 @@ public class FSNamesystem implements Nam void createSymlink(String target, String link, PermissionStatus dirPerms, boolean createParent) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } if (!DFSUtil.isValidName(link)) { throw new InvalidPathException("Invalid link name: " + link); } if (FSDirectory.isReservedName(target)) { throw new InvalidPathException("Invalid target name: " + target); } + boolean success = false; try { createSymlinkInt(target, link, dirPerms, createParent); + success = true; } catch (AccessControlException e) { logAuditEvent(false, "createSymlink", link, target, null); throw e; + } finally { + RetryCache.setState(cacheEntry, success); } } @@ -1837,13 +1892,17 @@ public class FSNamesystem implements Nam } } } - + /** * Create a new file entry in the namespace. * - * For description of parameters and exceptions thrown see - * {@link ClientProtocol#create()}, except it returns valid file status - * upon success + * For description of parameters and exceptions thrown see + * {@link ClientProtocol#create()}, except it returns valid file status upon + * success + * + * For retryCache handling details see - + * {@link #getFileStatus(boolean, CacheEntryWithPayload)} + * */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, @@ -1851,13 +1910,23 @@ public class FSNamesystem implements Nam throws AccessControlException, SafeModeException, FileAlreadyExistsException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { + HdfsFileStatus status = null; + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (HdfsFileStatus) cacheEntry.getPayload(); + } + try { - return startFileInt(src, permissions, holder, clientMachine, flag, + status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; + } finally { + RetryCache.setState(cacheEntry, status != null, status); } + return status; } private HdfsFileStatus startFileInt(String src, PermissionStatus permissions, @@ -1880,7 +1949,7 @@ public class FSNamesystem implements Nam blockManager.verifyReplication(src, replication, clientMachine); boolean skipSync = false; - final HdfsFileStatus stat; + HdfsFileStatus stat = null; FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); if (blockSize < minBlockSize) { @@ -1960,7 +2029,12 @@ public class FSNamesystem implements Nam } } else { if (overwrite) { - delete(src, true); // File exists - delete if overwrite + try { + deleteInt(src, true); // File exists - delete if overwrite + } catch (AccessControlException e) { + logAuditEvent(false, "delete", src); + throw e; + } } else { // If lease soft limit time is expired, recover the lease recoverLeaseInternal(myFile, src, holder, clientMachine, false); @@ -2209,16 +2283,28 @@ public class FSNamesystem implements Nam throws AccessControlException, SafeModeException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, IOException { + LocatedBlock lb = null; + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (LocatedBlock) cacheEntry.getPayload(); + } + + boolean success = false; try { - return appendFileInt(src, holder, clientMachine); + lb = appendFileInt(src, holder, clientMachine); + success = true; + return lb; } catch (AccessControlException e) { logAuditEvent(false, "append", src); throw e; + } finally { + RetryCache.setState(cacheEntry, success, lb); } } - private LocatedBlock appendFileInt(String src, String holder, String clientMachine) - throws AccessControlException, SafeModeException, + private LocatedBlock appendFileInt(String src, String holder, + String clientMachine) throws AccessControlException, SafeModeException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -2781,12 +2867,20 @@ public class FSNamesystem implements Nam @Deprecated boolean renameTo(String src, String dst) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return true; // Return previous response + } + boolean ret = false; try { - return renameToInt(src, dst); + ret = renameToInt(src, dst); } catch (AccessControlException e) { logAuditEvent(false, "rename", src, dst, null); throw e; + } finally { + RetryCache.setState(cacheEntry, ret); } + return ret; } private boolean renameToInt(String src, String dst) @@ -2853,6 +2947,10 @@ public class FSNamesystem implements Nam /** Rename src to dst */ void renameTo(String src, String dst, Options.Rename... options) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - " + src + " to " + dst); @@ -2865,6 +2963,7 @@ public class FSNamesystem implements Nam byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst); HdfsFileStatus resultingStat = null; + boolean success = false; writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2875,8 +2974,10 @@ public class FSNamesystem implements Nam dst = FSDirectory.resolvePath(dst, dstComponents, dir); renameToInternal(pc, src, dst, options); resultingStat = getAuditFileInfo(dst, false); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); if (resultingStat != null) { @@ -2908,12 +3009,20 @@ public class FSNamesystem implements Nam boolean delete(String src, boolean recursive) throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return true; // Return previous response + } + boolean ret = false; try { - return deleteInt(src, recursive); + ret = deleteInt(src, recursive); } catch (AccessControlException e) { logAuditEvent(false, "delete", src); throw e; + } finally { + RetryCache.setState(cacheEntry, ret); } + return ret; } private boolean deleteInt(String src, boolean recursive) @@ -2937,6 +3046,7 @@ public class FSNamesystem implements Nam throw new AccessControlException(ioe); } } + /** * Remove a file/directory from the namespace. *

@@ -2957,6 +3067,7 @@ public class FSNamesystem implements Nam FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + boolean ret = false; writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2975,6 +3086,7 @@ public class FSNamesystem implements Nam if (!dir.delete(src, collectedBlocks, removedINodes)) { return false; } + ret = true; } finally { writeUnlock(); } @@ -2992,7 +3104,7 @@ public class FSNamesystem implements Nam NameNode.stateChangeLog.debug("DIR* Namesystem.delete: " + src +" is removed"); } - return true; + return ret; } /** @@ -3158,12 +3270,14 @@ public class FSNamesystem implements Nam */ boolean mkdirs(String src, PermissionStatus permissions, boolean createParent) throws IOException, UnresolvedLinkException { + boolean ret = false; try { - return mkdirsInt(src, permissions, createParent); + ret = mkdirsInt(src, permissions, createParent); } catch (AccessControlException e) { logAuditEvent(false, "mkdirs", src); throw e; } + return ret; } private boolean mkdirsInt(String src, PermissionStatus permissions, @@ -3223,7 +3337,7 @@ public class FSNamesystem implements Nam } // validate that we have enough inodes. This is, at best, a - // heuristic because the mkdirs() operation migth need to + // heuristic because the mkdirs() operation might need to // create multiple inodes. checkFsObjectLimit(); @@ -4023,8 +4137,13 @@ public class FSNamesystem implements Nam * @throws IOException if */ void saveNamespace() throws AccessControlException, IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } checkSuperuserPrivilege(); checkOperation(OperationCategory.UNCHECKED); + boolean success = false; readLock(); try { checkOperation(OperationCategory.UNCHECKED); @@ -4033,10 +4152,12 @@ public class FSNamesystem implements Nam "in order to create namespace image."); } getFSImage().saveNamespace(this); - LOG.info("New namespace image has been created"); + success = true; } finally { readUnlock(); + RetryCache.setState(cacheEntry, success); } + LOG.info("New namespace image has been created"); } /** @@ -4594,6 +4715,7 @@ public class FSNamesystem implements Nam try { Thread.sleep(recheckInterval); } catch (InterruptedException ie) { + // Ignored } } if (!fsRunning) { @@ -4852,30 +4974,51 @@ public class FSNamesystem implements Nam } } - NamenodeCommand startCheckpoint( - NamenodeRegistration bnReg, // backup node - NamenodeRegistration nnReg) // active name-node - throws IOException { + NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, + NamenodeRegistration activeNamenode) throws IOException { checkOperation(OperationCategory.CHECKPOINT); + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (NamenodeCommand) cacheEntry.getPayload(); + } writeLock(); + NamenodeCommand cmd = null; try { checkOperation(OperationCategory.CHECKPOINT); if (isInSafeMode()) { throw new SafeModeException("Checkpoint not started", safeMode); } - LOG.info("Start checkpoint for " + bnReg.getAddress()); - NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg); + LOG.info("Start checkpoint for " + backupNode.getAddress()); + cmd = getFSImage().startCheckpoint(backupNode, activeNamenode); getEditLog().logSync(); return cmd; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, cmd != null, cmd); } } + public void processIncrementalBlockReport(final DatanodeID nodeID, + final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + throws IOException { + writeLock(); + try { + blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos); + } finally { + writeUnlock(); + } + } + void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } checkOperation(OperationCategory.CHECKPOINT); + boolean success = false; readLock(); try { checkOperation(OperationCategory.CHECKPOINT); @@ -4885,8 +5028,10 @@ public class FSNamesystem implements Nam } LOG.info("End checkpoint for " + registration.getAddress()); getFSImage().endCheckpoint(sig); + success = true; } finally { readUnlock(); + RetryCache.setState(cacheEntry, success); } } @@ -5404,6 +5549,10 @@ public class FSNamesystem implements Nam void updatePipeline(String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } checkOperation(OperationCategory.WRITE); LOG.info("updatePipeline(block=" + oldBlock + ", newGenerationStamp=" + newBlock.getGenerationStamp() @@ -5412,17 +5561,19 @@ public class FSNamesystem implements Nam + ", clientName=" + clientName + ")"); writeLock(); + boolean success = false; try { checkOperation(OperationCategory.WRITE); - if (isInSafeMode()) { throw new SafeModeException("Pipeline not updated", safeMode); } assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + oldBlock + " has different block identifier"; updatePipelineInternal(clientName, oldBlock, newBlock, newNodes); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock); @@ -6287,9 +6438,14 @@ public class FSNamesystem implements Nam */ String createSnapshot(String snapshotRoot, String snapshotName) throws SafeModeException, IOException { + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (String) cacheEntry.getPayload(); + } final FSPermissionChecker pc = getPermissionChecker(); writeLock(); - final String snapshotPath; + String snapshotPath = null; try { checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { @@ -6313,6 +6469,7 @@ public class FSNamesystem implements Nam getEditLog().logCreateSnapshot(snapshotRoot, snapshotName); } finally { writeUnlock(); + RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath); } getEditLog().logSync(); @@ -6332,8 +6489,13 @@ public class FSNamesystem implements Nam */ void renameSnapshot(String path, String snapshotOldName, String snapshotNewName) throws SafeModeException, IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } final FSPermissionChecker pc = getPermissionChecker(); writeLock(); + boolean success = false; try { checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { @@ -6347,8 +6509,10 @@ public class FSNamesystem implements Nam snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName); getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); @@ -6441,6 +6605,11 @@ public class FSNamesystem implements Nam void deleteSnapshot(String snapshotRoot, String snapshotName) throws SafeModeException, IOException { final FSPermissionChecker pc = getPermissionChecker(); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + boolean success = false; writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -6464,8 +6633,10 @@ public class FSNamesystem implements Nam this.removeBlocks(collectedBlocks); collectedBlocks.clear(); getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Jul 26 01:19:25 2013 @@ -951,7 +951,7 @@ class NameNodeRpcServer implements Namen +"from "+nodeReg+" "+receivedAndDeletedBlocks.length +" blocks."); } - namesystem.getBlockManager().processIncrementalBlockReport( + namesystem.processIncrementalBlockReport( nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks()); } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1507173&r1=1507172&r2=1507173&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 26 01:19:25 2013 @@ -1352,4 +1352,43 @@ + + dfs.namenode.enable.retrycache + true + + This enables the retry cache on the namenode. Namenode tracks for + non-idempotent requests the corresponding response. If a client retries the + request, the response from the retry cache is sent. Such operations + are tagged with annotation @AtMostOnce in namenode protocols. It is + recommended that this flag be set to true. Setting it to false, will result + in clients getting failure responses to retried request. This flag must + be enabled in HA setup for transparent fail-overs. + + The entries in the cache have expiration time configurable + using dfs.namenode.retrycache.expirytime.millis. + + + + + dfs.namenode.retrycache.expirytime.millis + 600000 + + The time for which retry cache entries are retained. + + + + + dfs.namenode.retrycache.heap.percent + 0.03f + + This parameter configures the heap size allocated for retry cache + (excluding the response cached). This corresponds to approximately + 4096 entries for every 64MB of namenode process java heap size. + Assuming retry cache entry expiration time (configured using + dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this + enables retry cache to support 7 operations per second sustained + for 10 minutes. As the heap size is increased, the operation rate + linearly increases. + +