Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1AA0DF4C for ; Sat, 2 Feb 2013 02:07:57 +0000 (UTC) Received: (qmail 4682 invoked by uid 500); 2 Feb 2013 02:07:57 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 4500 invoked by uid 500); 2 Feb 2013 02:07:57 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 4492 invoked by uid 99); 2 Feb 2013 02:07:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Feb 2013 02:07:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Feb 2013 02:07:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7C10123888E7; Sat, 2 Feb 2013 02:07:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1441681 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java Date: Sat, 02 Feb 2013 02:07:27 -0000 To: hdfs-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130202020727.7C10123888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Sat Feb 2 02:07:26 2013 New Revision: 1441681 URL: http://svn.apache.org/viewvc?rev=1441681&view=rev Log: HDFS-4452. getAdditionalBlock() can create multiple blocks if the client times out and retries. Contributed by Konstantin Shvachko. Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java (with props) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1441681&r1=1441680&r2=1441681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Feb 2 02:07:26 2013 @@ -744,6 +744,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4428. FsDatasetImpl should disclose what the error is when a rename fails. (Colin Patrick McCabe via atm) + HDFS-4452. getAdditionalBlock() can create multiple blocks if the client + times out and retries. (shv) + BREAKDOWN OF HDFS-3077 SUBTASKS HDFS-3077. Quorum-based protocol for reading and writing edit logs. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1441681&r1=1441680&r2=1441681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Feb 2 02:07:26 2013 @@ -2200,12 +2200,9 @@ public class FSNamesystem implements Nam throws LeaseExpiredException, NotReplicatedYetException, QuotaExceededException, SafeModeException, UnresolvedLinkException, IOException { - checkBlock(previous); - Block previousBlock = ExtendedBlock.getLocalBlock(previous); - long fileLength, blockSize; + long blockSize; int replication; DatanodeDescriptor clientNode = null; - Block newBlock = null; if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug( @@ -2213,119 +2210,61 @@ public class FSNamesystem implements Nam +src+" for "+clientName); } - writeLock(); + // Part I. Analyze the state of the file with respect to the input data. + readLock(); try { - checkOperation(OperationCategory.WRITE); - - if (isInSafeMode()) { - throw new SafeModeException("Cannot add block to " + src, safeMode); - } - - // have we exceeded the configured limit of fs objects. - checkFsObjectLimit(); - - INodeFileUnderConstruction pendingFile = checkLease(src, clientName); - BlockInfo lastBlockInFile = pendingFile.getLastBlock(); - if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { - // The block that the client claims is the current last block - // doesn't match up with what we think is the last block. There are - // three possibilities: - // 1) This is the first block allocation of an append() pipeline - // which started appending exactly at a block boundary. - // In this case, the client isn't passed the previous block, - // so it makes the allocateBlock() call with previous=null. - // We can distinguish this since the last block of the file - // will be exactly a full block. - // 2) This is a retry from a client that missed the response of a - // prior getAdditionalBlock() call, perhaps because of a network - // timeout, or because of an HA failover. In that case, we know - // by the fact that the client is re-issuing the RPC that it - // never began to write to the old block. Hence it is safe to - // abandon it and allocate a new one. - // 3) This is an entirely bogus request/bug -- we should error out - // rather than potentially appending a new block with an empty - // one in the middle, etc - - BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); - if (previous == null && - lastBlockInFile != null && - lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() && - lastBlockInFile.isComplete()) { - // Case 1 - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "BLOCK* NameSystem.allocateBlock: handling block allocation" + - " writing to a file with a complete previous block: src=" + - src + " lastBlock=" + lastBlockInFile); - } - } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) { - // Case 2 - if (lastBlockInFile.getNumBytes() != 0) { - throw new IOException( - "Request looked like a retry to allocate block " + - lastBlockInFile + " but it already contains " + - lastBlockInFile.getNumBytes() + " bytes"); - } - - // The retry case ("b" above) -- abandon the old block. - NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + - "caught retry for allocation of a new block in " + - src + ". Abandoning old block " + lastBlockInFile); - dir.removeBlock(src, pendingFile, lastBlockInFile); - dir.persistBlocks(src, pendingFile); - } else { - - throw new IOException("Cannot allocate block in " + src + ": " + - "passed 'previous' block " + previous + " does not match actual " + - "last block in file " + lastBlockInFile); - } + LocatedBlock[] onRetryBlock = new LocatedBlock[1]; + final INode[] inodes = analyzeFileState( + src, clientName, previous, onRetryBlock).getINodes(); + final INodeFileUnderConstruction pendingFile = + (INodeFileUnderConstruction) inodes[inodes.length - 1]; + + if(onRetryBlock[0] != null) { + // This is a retry. Just return the last block. + return onRetryBlock[0]; } - // commit the last block and complete it if it has minimum replicas - commitOrCompleteLastBlock(pendingFile, previousBlock); - - // - // If we fail this, bad things happen! - // - if (!checkFileProgress(pendingFile, false)) { - throw new NotReplicatedYetException("Not replicated yet:" + src); - } - fileLength = pendingFile.computeContentSummary().getLength(); blockSize = pendingFile.getPreferredBlockSize(); clientNode = pendingFile.getClientNode(); replication = pendingFile.getBlockReplication(); } finally { - writeUnlock(); + readUnlock(); } // choose targets for the new block to be allocated. - final DatanodeDescriptor targets[] = blockManager.chooseTarget( + final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( src, replication, clientNode, excludedNodes, blockSize); - // Allocate a new block and record it in the INode. + // Part II. + // Allocate a new block, add it to the INode and the BlocksMap. + Block newBlock = null; + long offset; writeLock(); try { - checkOperation(OperationCategory.WRITE); - if (isInSafeMode()) { - throw new SafeModeException("Cannot add block to " + src, safeMode); + // Run the full analysis again, since things could have changed + // while chooseTarget() was executing. + LocatedBlock[] onRetryBlock = new LocatedBlock[1]; + INodesInPath inodesInPath = + analyzeFileState(src, clientName, previous, onRetryBlock); + INode[] inodes = inodesInPath.getINodes(); + final INodeFileUnderConstruction pendingFile = + (INodeFileUnderConstruction) inodes[inodes.length - 1]; + + if(onRetryBlock[0] != null) { + // This is a retry. Just return the last block. + return onRetryBlock[0]; } - final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true); - final INode[] inodes = inodesInPath.getINodes(); - final INodeFileUnderConstruction pendingFile - = checkLease(src, clientName, inodes[inodes.length - 1]); - - if (!checkFileProgress(pendingFile, false)) { - throw new NotReplicatedYetException("Not replicated yet:" + src); - } + // commit the last block and complete it if it has minimum replicas + commitOrCompleteLastBlock(pendingFile, + ExtendedBlock.getLocalBlock(previous)); + + // allocate new block, record block locations in INode. + newBlock = createNewBlock(); + saveAllocatedBlock(src, inodesInPath, newBlock, targets); - // allocate new block record block locations in INode. - newBlock = allocateBlock(src, inodesInPath, targets); - - for (DatanodeDescriptor dn : targets) { - dn.incBlocksScheduled(); - } dir.persistBlocks(src, pendingFile); + offset = pendingFile.computeFileSize(true); } finally { writeUnlock(); } @@ -2333,10 +2272,114 @@ public class FSNamesystem implements Nam getEditLog().logSync(); } - // Create next block - LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength); - blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE); - return b; + // Return located block + return makeLocatedBlock(newBlock, targets, offset); + } + + INodesInPath analyzeFileState(String src, + String clientName, + ExtendedBlock previous, + LocatedBlock[] onRetryBlock) + throws IOException { + assert hasReadOrWriteLock(); + + checkBlock(previous); + onRetryBlock[0] = null; + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { + throw new SafeModeException("Cannot add block to " + src, safeMode); + } + + // have we exceeded the configured limit of fs objects. + checkFsObjectLimit(); + + Block previousBlock = ExtendedBlock.getLocalBlock(previous); + final INodesInPath inodesInPath = + dir.rootDir.getExistingPathINodes(src, true); + final INode[] inodes = inodesInPath.getINodes(); + final INodeFileUnderConstruction pendingFile + = checkLease(src, clientName, inodes[inodes.length - 1]); + BlockInfo lastBlockInFile = pendingFile.getLastBlock(); + if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { + // The block that the client claims is the current last block + // doesn't match up with what we think is the last block. There are + // four possibilities: + // 1) This is the first block allocation of an append() pipeline + // which started appending exactly at a block boundary. + // In this case, the client isn't passed the previous block, + // so it makes the allocateBlock() call with previous=null. + // We can distinguish this since the last block of the file + // will be exactly a full block. + // 2) This is a retry from a client that missed the response of a + // prior getAdditionalBlock() call, perhaps because of a network + // timeout, or because of an HA failover. In that case, we know + // by the fact that the client is re-issuing the RPC that it + // never began to write to the old block. Hence it is safe to + // to return the existing block. + // 3) This is an entirely bogus request/bug -- we should error out + // rather than potentially appending a new block with an empty + // one in the middle, etc + // 4) This is a retry from a client that timed out while + // the prior getAdditionalBlock() is still being processed, + // currently working on chooseTarget(). + // There are no means to distinguish between the first and + // the second attempts in Part I, because the first one hasn't + // changed the namesystem state yet. + // We run this analysis again in Part II where case 4 is impossible. + + BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); + if (previous == null && + lastBlockInFile != null && + lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() && + lastBlockInFile.isComplete()) { + // Case 1 + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.allocateBlock: handling block allocation" + + " writing to a file with a complete previous block: src=" + + src + " lastBlock=" + lastBlockInFile); + } + } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) { + if (lastBlockInFile.getNumBytes() != 0) { + throw new IOException( + "Request looked like a retry to allocate block " + + lastBlockInFile + " but it already contains " + + lastBlockInFile.getNumBytes() + " bytes"); + } + + // Case 2 + // Return the last block. + NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + + "caught retry for allocation of a new block in " + + src + ". Returning previously allocated block " + lastBlockInFile); + long offset = pendingFile.computeFileSize(true); + onRetryBlock[0] = makeLocatedBlock(lastBlockInFile, + ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(), + offset); + return inodesInPath; + } else { + // Case 3 + throw new IOException("Cannot allocate block in " + src + ": " + + "passed 'previous' block " + previous + " does not match actual " + + "last block in file " + lastBlockInFile); + } + } + + // Check if the penultimate block is minimally replicated + if (!checkFileProgress(pendingFile, false)) { + throw new NotReplicatedYetException("Not replicated yet: " + src); + } + return inodesInPath; + } + + LocatedBlock makeLocatedBlock(Block blk, + DatanodeInfo[] locs, + long offset) throws IOException { + LocatedBlock lBlk = new LocatedBlock( + getExtendedBlock(blk), locs, offset); + getBlockManager().setBlockToken( + lBlk, BlockTokenSecretManager.AccessMode.WRITE); + return lBlk; } /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */ @@ -2528,22 +2571,33 @@ public class FSNamesystem implements Nam } /** - * Allocate a block at the given pending filename + * Save allocated block at the given pending filename * * @param src path to the file * @param inodesInPath representing each of the components of src. * The last INode is the INode for the file. * @throws QuotaExceededException If addition of block exceeds space quota */ - private Block allocateBlock(String src, INodesInPath inodesInPath, - DatanodeDescriptor targets[]) throws IOException { + BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath, + Block newBlock, DatanodeDescriptor targets[]) throws IOException { + assert hasWriteLock(); + BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets); + NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". " + + getBlockPoolId() + " " + b); + for (DatanodeDescriptor dn : targets) { + dn.incBlocksScheduled(); + } + return b; + } + + /** + * Create new block with a unique block id and a new generation stamp. + */ + Block createNewBlock() throws IOException { assert hasWriteLock(); Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0); // Increment the generation stamp for every new block. b.setGenerationStamp(nextGenerationStamp()); - b = dir.addBlock(src, inodesInPath, b, targets); - NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". " - + blockPoolId + " " + b); return b; } Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java?rev=1441681&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java Sat Feb 2 02:07:26 2013 @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.lang.reflect.Field; +import java.util.EnumSet; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.net.Node; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Race between two threads simultaneously calling + * FSNamesystem.getAdditionalBlock(). + */ +public class TestAddBlockRetry { + public static final Log LOG = LogFactory.getLog(TestAddBlockRetry.class); + + private static final short REPLICATION = 3; + + private Configuration conf; + private MiniDFSCluster cluster; + + private int count = 0; + private LocatedBlock lb1; + private LocatedBlock lb2; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(REPLICATION) + .build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Retry addBlock() while another thread is in chooseTarget(). + * See HDFS-4452. + */ + @Test + public void testRetryAddBlockWhileInChooseTarget() throws Exception { + final String src = "/testRetryAddBlockWhileInChooseTarget"; + + FSNamesystem ns = cluster.getNamesystem(); + BlockManager spyBM = spy(ns.getBlockManager()); + final NamenodeProtocols nn = cluster.getNameNodeRpc(); + + // substitute mocked BlockManager into FSNamesystem + Class nsClass = ns.getClass(); + Field bmField = nsClass.getDeclaredField("blockManager"); + bmField.setAccessible(true); + bmField.set(ns, spyBM); + + doAnswer(new Answer() { + @Override + public DatanodeDescriptor[] answer(InvocationOnMock invocation) + throws Throwable { + LOG.info("chooseTarget for " + src); + DatanodeDescriptor[] ret = + (DatanodeDescriptor[]) invocation.callRealMethod(); + count++; + if(count == 1) { // run second addBlock() + LOG.info("Starting second addBlock for " + src); + nn.addBlock(src, "clientName", null, null); + LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); + assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); + lb2 = lbs.get(0); + assertEquals("Wrong replication", + REPLICATION, lb2.getLocations().length); + } + return ret; + } + }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(), + Mockito.any(), Mockito.>any(), + Mockito.anyLong()); + + // create file + nn.create(src, FsPermission.getFileDefault(), + "clientName", + new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), + true, (short)3, 1024); + + // start first addBlock() + LOG.info("Starting first addBlock for " + src); + nn.addBlock(src, "clientName", null, null); + + // check locations + LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); + assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); + lb1 = lbs.get(0); + assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length); + assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); + } +} Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java ------------------------------------------------------------------------------ svn:mime-type = text/plain