Author: shv
Date: Sat Feb 2 02:09:09 2013
New Revision: 1441682
URL: http://svn.apache.org/viewvc?rev=1441682&view=rev
Log:
HDFS-4452. getAdditionalBlock() can create multiple blocks if the client times out and retries.
Contributed by Konstantin Shvachko.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
(with props)
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1441682&r1=1441681&r2=1441682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Feb 2
02:09:09 2013
@@ -445,6 +445,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4428. FsDatasetImpl should disclose what the error is when a rename
fails. (Colin Patrick McCabe via atm)
+ HDFS-4452. getAdditionalBlock() can create multiple blocks if the client
+ times out and retries. (shv)
+
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1441682&r1=1441681&r2=1441682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Sat Feb 2 02:09:09 2013
@@ -2151,12 +2151,9 @@ public class FSNamesystem implements Nam
throws LeaseExpiredException, NotReplicatedYetException,
QuotaExceededException, SafeModeException, UnresolvedLinkException,
IOException {
- checkBlock(previous);
- Block previousBlock = ExtendedBlock.getLocalBlock(previous);
- long fileLength, blockSize;
+ long blockSize;
int replication;
DatanodeDescriptor clientNode = null;
- Block newBlock = null;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
@@ -2164,119 +2161,60 @@ public class FSNamesystem implements Nam
+src+" for "+clientName);
}
- writeLock();
+ // Part I. Analyze the state of the file with respect to the input data.
+ readLock();
try {
- checkOperation(OperationCategory.WRITE);
-
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot add block to " + src, safeMode);
- }
-
- // have we exceeded the configured limit of fs objects.
- checkFsObjectLimit();
-
- INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
- BlockInfo lastBlockInFile = pendingFile.getLastBlock();
- if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
- // The block that the client claims is the current last block
- // doesn't match up with what we think is the last block. There are
- // three possibilities:
- // 1) This is the first block allocation of an append() pipeline
- // which started appending exactly at a block boundary.
- // In this case, the client isn't passed the previous block,
- // so it makes the allocateBlock() call with previous=null.
- // We can distinguish this since the last block of the file
- // will be exactly a full block.
- // 2) This is a retry from a client that missed the response of a
- // prior getAdditionalBlock() call, perhaps because of a network
- // timeout, or because of an HA failover. In that case, we know
- // by the fact that the client is re-issuing the RPC that it
- // never began to write to the old block. Hence it is safe to
- // abandon it and allocate a new one.
- // 3) This is an entirely bogus request/bug -- we should error out
- // rather than potentially appending a new block with an empty
- // one in the middle, etc
-
- BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
- if (previous == null &&
- lastBlockInFile != null &&
- lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
- lastBlockInFile.isComplete()) {
- // Case 1
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.allocateBlock: handling block allocation" +
- " writing to a file with a complete previous block: src=" +
- src + " lastBlock=" + lastBlockInFile);
- }
- } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
- // Case 2
- if (lastBlockInFile.getNumBytes() != 0) {
- throw new IOException(
- "Request looked like a retry to allocate block " +
- lastBlockInFile + " but it already contains " +
- lastBlockInFile.getNumBytes() + " bytes");
- }
-
- // The retry case ("b" above) -- abandon the old block.
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
- "caught retry for allocation of a new block in " +
- src + ". Abandoning old block " + lastBlockInFile);
- dir.removeBlock(src, pendingFile, lastBlockInFile);
- dir.persistBlocks(src, pendingFile);
- } else {
-
- throw new IOException("Cannot allocate block in " + src + ": " +
- "passed 'previous' block " + previous + " does not match actual " +
- "last block in file " + lastBlockInFile);
- }
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+ final INode[] inodes = analyzeFileState(
+ src, clientName, previous, onRetryBlock);
+ final INodeFileUnderConstruction pendingFile =
+ (INodeFileUnderConstruction) inodes[inodes.length - 1];
+
+ if(onRetryBlock[0] != null) {
+ // This is a retry. Just return the last block.
+ return onRetryBlock[0];
}
- // commit the last block and complete it if it has minimum replicas
- commitOrCompleteLastBlock(pendingFile, previousBlock);
-
- //
- // If we fail this, bad things happen!
- //
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
- }
- fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = pendingFile.getBlockReplication();
} finally {
- writeUnlock();
+ readUnlock();
}
// choose targets for the new block to be allocated.
- final DatanodeDescriptor targets[] = blockManager.chooseTarget(
+ final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
src, replication, clientNode, excludedNodes, blockSize);
- // Allocate a new block and record it in the INode.
+ // Part II.
+ // Allocate a new block, add it to the INode and the BlocksMap.
+ Block newBlock = null;
+ long offset;
writeLock();
try {
- checkOperation(OperationCategory.WRITE);
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot add block to " + src, safeMode);
- }
- INode[] pathINodes = dir.getExistingPathINodes(src);
- int inodesLen = pathINodes.length;
- checkLease(src, clientName, pathINodes[inodesLen-1]);
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)
- pathINodes[inodesLen - 1];
-
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
+ // Run the full analysis again, since things could have changed
+ // while chooseTarget() was executing.
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+ INode[] inodes =
+ analyzeFileState(src, clientName, previous, onRetryBlock);
+ final INodeFileUnderConstruction pendingFile =
+ (INodeFileUnderConstruction) inodes[inodes.length - 1];
+
+ if(onRetryBlock[0] != null) {
+ // This is a retry. Just return the last block.
+ return onRetryBlock[0];
}
- // allocate new block record block locations in INode.
- newBlock = allocateBlock(src, pathINodes, targets);
-
- for (DatanodeDescriptor dn : targets) {
- dn.incBlocksScheduled();
- }
+ // commit the last block and complete it if it has minimum replicas
+ commitOrCompleteLastBlock(pendingFile,
+ ExtendedBlock.getLocalBlock(previous));
+
+ // allocate new block, record block locations in INode.
+ newBlock = createNewBlock();
+ saveAllocatedBlock(src, inodes, newBlock, targets);
+
dir.persistBlocks(src, pendingFile);
+ offset = pendingFile.computeFileSize(true);
} finally {
writeUnlock();
}
@@ -2284,10 +2222,112 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
}
- // Create next block
- LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
- blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
- return b;
+ // Return located block
+ return makeLocatedBlock(newBlock, targets, offset);
+ }
+
+ INode[] analyzeFileState(String src,
+ String clientName,
+ ExtendedBlock previous,
+ LocatedBlock[] onRetryBlock)
+ throws IOException {
+ assert hasReadOrWriteLock();
+
+ checkBlock(previous);
+ onRetryBlock[0] = null;
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
+ }
+
+ // have we exceeded the configured limit of fs objects.
+ checkFsObjectLimit();
+
+ Block previousBlock = ExtendedBlock.getLocalBlock(previous);
+ final INode[] inodes = dir.rootDir.getExistingPathINodes(src, true);
+ final INodeFileUnderConstruction pendingFile
+ = checkLease(src, clientName, inodes[inodes.length - 1]);
+ BlockInfo lastBlockInFile = pendingFile.getLastBlock();
+ if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
+ // The block that the client claims is the current last block
+ // doesn't match up with what we think is the last block. There are
+ // four possibilities:
+ // 1) This is the first block allocation of an append() pipeline
+ // which started appending exactly at a block boundary.
+ // In this case, the client isn't passed the previous block,
+ // so it makes the allocateBlock() call with previous=null.
+ // We can distinguish this since the last block of the file
+ // will be exactly a full block.
+ // 2) This is a retry from a client that missed the response of a
+ // prior getAdditionalBlock() call, perhaps because of a network
+ // timeout, or because of an HA failover. In that case, we know
+ // by the fact that the client is re-issuing the RPC that it
+ // never began to write to the old block. Hence it is safe to
+ // to return the existing block.
+ // 3) This is an entirely bogus request/bug -- we should error out
+ // rather than potentially appending a new block with an empty
+ // one in the middle, etc
+ // 4) This is a retry from a client that timed out while
+ // the prior getAdditionalBlock() is still being processed,
+ // currently working on chooseTarget().
+ // There are no means to distinguish between the first and
+ // the second attempts in Part I, because the first one hasn't
+ // changed the namesystem state yet.
+ // We run this analysis again in Part II where case 4 is impossible.
+
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+ if (previous == null &&
+ lastBlockInFile != null &&
+ lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
+ lastBlockInFile.isComplete()) {
+ // Case 1
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.allocateBlock: handling block allocation" +
+ " writing to a file with a complete previous block: src=" +
+ src + " lastBlock=" + lastBlockInFile);
+ }
+ } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
+ if (lastBlockInFile.getNumBytes() != 0) {
+ throw new IOException(
+ "Request looked like a retry to allocate block " +
+ lastBlockInFile + " but it already contains " +
+ lastBlockInFile.getNumBytes() + " bytes");
+ }
+
+ // Case 2
+ // Return the last block.
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
+ "caught retry for allocation of a new block in " +
+ src + ". Returning previously allocated block " + lastBlockInFile);
+ long offset = pendingFile.computeFileSize(true);
+ onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
+ ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
+ offset);
+ return inodes;
+ } else {
+ // Case 3
+ throw new IOException("Cannot allocate block in " + src + ": " +
+ "passed 'previous' block " + previous + " does not match actual " +
+ "last block in file " + lastBlockInFile);
+ }
+ }
+
+ // Check if the penultimate block is minimally replicated
+ if (!checkFileProgress(pendingFile, false)) {
+ throw new NotReplicatedYetException("Not replicated yet: " + src);
+ }
+ return inodes;
+ }
+
+ LocatedBlock makeLocatedBlock(Block blk,
+ DatanodeInfo[] locs,
+ long offset) throws IOException {
+ LocatedBlock lBlk = new LocatedBlock(
+ getExtendedBlock(blk), locs, offset);
+ getBlockManager().setBlockToken(
+ lBlk, BlockTokenSecretManager.AccessMode.WRITE);
+ return lBlk;
}
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[],
int, String) */
@@ -2479,25 +2519,34 @@ public class FSNamesystem implements Nam
}
/**
- * Allocate a block at the given pending filename
+ * Save allocated block at the given pending filename
*
* @param src path to the file
- * @param inodes INode representing each of the components of src.
- * <code>inodes[inodes.length-1]</code> is the INode for the file.
- *
+ * @param inodesInPath representing each of the components of src.
+ * The last INode is the INode for the file.
* @throws QuotaExceededException If addition of block exceeds space quota
*/
- private Block allocateBlock(String src, INode[] inodes,
- DatanodeDescriptor targets[]) throws QuotaExceededException,
- SafeModeException {
+ BlockInfo saveAllocatedBlock(String src, INode[] inodes,
+ Block newBlock, DatanodeDescriptor targets[]) throws IOException {
+ assert hasWriteLock();
+ BlockInfo b = dir.addBlock(src, inodes, newBlock, targets);
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+ + getBlockPoolId() + " " + b);
+ for (DatanodeDescriptor dn : targets) {
+ dn.incBlocksScheduled();
+ }
+ return b;
+ }
+
+ /**
+ * Create new block with a unique block id and a new generation stamp.
+ */
+ Block createNewBlock() throws IOException {
assert hasWriteLock();
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
// Increment the generation stamp for every new block.
nextGenerationStamp();
b.setGenerationStamp(getGenerationStamp());
- b = dir.addBlock(src, inodes, b, targets);
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
- + blockPoolId + " " + b);
return b;
}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java?rev=1441682&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
Sat Feb 2 02:09:09 2013
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Field;
+import java.util.EnumSet;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Race between two threads simultaneously calling
+ * FSNamesystem.getAdditionalBlock().
+ */
+public class TestAddBlockRetry {
+ public static final Log LOG = LogFactory.getLog(TestAddBlockRetry.class);
+
+ private static final short REPLICATION = 3;
+
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+
+ private int count = 0;
+ private LocatedBlock lb1;
+ private LocatedBlock lb2;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(REPLICATION)
+ .build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Retry addBlock() while another thread is in chooseTarget().
+ * See HDFS-4452.
+ */
+ @Test
+ public void testRetryAddBlockWhileInChooseTarget() throws Exception {
+ final String src = "/testRetryAddBlockWhileInChooseTarget";
+
+ FSNamesystem ns = cluster.getNamesystem();
+ BlockManager spyBM = spy(ns.getBlockManager());
+ final NamenodeProtocols nn = cluster.getNameNodeRpc();
+
+ // substitute mocked BlockManager into FSNamesystem
+ Class<? extends FSNamesystem> nsClass = ns.getClass();
+ Field bmField = nsClass.getDeclaredField("blockManager");
+ bmField.setAccessible(true);
+ bmField.set(ns, spyBM);
+
+ doAnswer(new Answer<DatanodeDescriptor[]>() {
+ @Override
+ public DatanodeDescriptor[] answer(InvocationOnMock invocation)
+ throws Throwable {
+ LOG.info("chooseTarget for " + src);
+ DatanodeDescriptor[] ret =
+ (DatanodeDescriptor[]) invocation.callRealMethod();
+ count++;
+ if(count == 1) { // run second addBlock()
+ LOG.info("Starting second addBlock for " + src);
+ nn.addBlock(src, "clientName", null, null);
+ LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
+ assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
+ lb2 = lbs.get(0);
+ assertEquals("Wrong replication",
+ REPLICATION, lb2.getLocations().length);
+ }
+ return ret;
+ }
+ }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
+ Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
+ Mockito.anyLong());
+
+ // create file
+ nn.create(src, FsPermission.getFileDefault(),
+ "clientName",
+ new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
+ true, (short)3, 1024);
+
+ // start first addBlock()
+ LOG.info("Starting first addBlock for " + src);
+ nn.addBlock(src, "clientName", null, null);
+
+ // check locations
+ LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
+ assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
+ lb1 = lbs.get(0);
+ assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length);
+ assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
+ }
+}
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
|