Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 69062 invoked from network); 4 Jun 2010 07:20:37 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 Jun 2010 07:20:37 -0000 Received: (qmail 6953 invoked by uid 500); 4 Jun 2010 07:20:37 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 6744 invoked by uid 500); 4 Jun 2010 07:20:34 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 6737 invoked by uid 99); 4 Jun 2010 07:20:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jun 2010 07:20:33 +0000 X-ASF-Spam-Status: No, hits=-1625.4 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jun 2010 07:20:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B5C2F23888D1; Fri, 4 Jun 2010 07:20:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r951306 - in /hadoop/common/branches/branch-0.20-append: ./ src/core/org/apache/hadoop/io/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoo... Date: Fri, 04 Jun 2010 07:20:11 -0000 To: common-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100604072011.B5C2F23888D1@eris.apache.org> Author: dhruba Date: Fri Jun 4 07:20:10 2010 New Revision: 951306 URL: http://svn.apache.org/viewvc?rev=951306&view=rev Log: HDFS-200. Support append and sync for hadoop 0.20 branch. Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/SequenceFile.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Fri Jun 4 07:20:10 2010 @@ -4,6 +4,8 @@ Release 0.20-append - Unreleased NEW FEATURES + HDFS-200. Support append and sync for hadoop 0.20 branch. + IMPROVEMENTS BUG FIXES Modified: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/SequenceFile.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/SequenceFile.java (original) +++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/SequenceFile.java Fri Jun 4 07:20:10 2010 @@ -938,6 +938,13 @@ public class SequenceFile { } } + /** flush all currently written data to the file system */ + public void syncFs() throws IOException { + if (out != null) { + out.sync(); // flush contents to file system + } + } + /** Returns the configuration of this file. */ Configuration getConf() { return conf; } Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Jun 4 07:20:10 2010 @@ -1500,7 +1500,10 @@ public class DFSClient implements FSCons throw new IOException("Cannot open filename " + src); } - if (locatedBlocks != null) { + // I think this check is not correct. A file could have been appended to + // between two calls to openInfo(). + if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && + !newInfo.isUnderConstruction()) { Iterator oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { @@ -1509,6 +1512,38 @@ public class DFSClient implements FSCons } } } + + // if the file is under construction, then fetch size of last block + // from datanode. + if (newInfo.isUnderConstruction() && newInfo.locatedBlockCount() > 0) { + LocatedBlock last = newInfo.get(newInfo.locatedBlockCount()-1); + boolean lastBlockInFile = (last.getStartOffset() + + last.getBlockSize() == + newInfo.getFileLength()); + if (lastBlockInFile && last.getLocations().length > 0) { + ClientDatanodeProtocol primary = null; + DatanodeInfo primaryNode = last.getLocations()[0]; + try { + primary = createClientDatanodeProtocolProxy(primaryNode, conf); + Block newBlock = primary.getBlockInfo(last.getBlock()); + long newBlockSize = newBlock.getNumBytes(); + long delta = newBlockSize - last.getBlockSize(); + // if the size of the block on the datanode is different + // from what the NN knows about, the datanode wins! + last.getBlock().setNumBytes(newBlockSize); + long newlength = newInfo.getFileLength() + delta; + newInfo.setFileLength(newlength); + LOG.debug("DFSClient setting last block " + last + + " to length " + newBlockSize + + " filesize is now " + newInfo.getFileLength()); + } catch (IOException e) { + LOG.debug("DFSClient file " + src + + " is being concurrently append to" + + " but datanode " + primaryNode.getHostName() + + " probably does not have block " + last.getBlock()); + } + } + } this.locatedBlocks = newInfo; this.currentNode = null; } Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Jun 4 07:20:10 2010 @@ -29,7 +29,7 @@ public interface ClientDatanodeProtocol public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); /** - * 3: add keepLength parameter. + *4: added getBlockInfo */ public static final long versionID = 3L; @@ -44,4 +44,12 @@ public interface ClientDatanodeProtocol */ LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException; + + /** Returns a block object that contains the specified block object + * from the specified Datanode. + * @param block the specified block + * @return the Block object from the specified Datanode + * @throws IOException if the block does not exist + */ + Block getBlockInfo(Block block) throws IOException; } Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Fri Jun 4 07:20:10 2010 @@ -85,6 +85,13 @@ public class LocatedBlocks implements Wr public boolean isUnderConstruction() { return underConstruction; } + + /** + * Sets the file length of the file. + */ + public void setFileLength(long length) { + this.fileLength = length; + } /** * Find block containing specified offset. Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Jun 4 07:20:10 2010 @@ -96,7 +96,7 @@ class BlockReceiver implements java.io.C // Open local disk out // streams = datanode.data.writeToBlock(block, isRecovery); - this.finalized = datanode.data.isValidBlock(block); + this.finalized = false; if (streams != null) { this.out = streams.dataOut; this.checksumOut = new DataOutputStream(new BufferedOutputStream( Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 4 07:20:10 2010 @@ -1590,6 +1590,12 @@ public class DataNode extends Configured return recoverBlock(block, keepLength, targets, false); } + /** {@inheritDoc} */ + public Block getBlockInfo(Block block) throws IOException { + Block stored = data.getStoredBlock(block.getBlockId()); + return stored; + } + private static void logRecoverBlock(String who, Block block, DatanodeID[] targets) { StringBuilder msg = new StringBuilder(targets[0].getName()); Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Jun 4 07:20:10 2010 @@ -1020,12 +1020,12 @@ public class FSDataset implements FSCons v = volumes.getNextVolume(blockSize); // create temporary file to hold block in the designated volume f = createTmpFile(v, b); - volumeMap.put(b, new DatanodeBlockInfo(v)); + volumeMap.put(b, new DatanodeBlockInfo(v, f)); } else if (f != null) { DataNode.LOG.info("Reopen already-open Block for append " + b); // create or reuse temporary file to hold block in the designated volume v = volumeMap.get(b).getVolume(); - volumeMap.put(b, new DatanodeBlockInfo(v)); + volumeMap.put(b, new DatanodeBlockInfo(v, f)); } else { // reopening block for appending to it. DataNode.LOG.info("Reopen Block for append " + b); @@ -1056,7 +1056,7 @@ public class FSDataset implements FSCons " to tmp dir " + f); } } - volumeMap.put(b, new DatanodeBlockInfo(v)); + volumeMap.put(b, new DatanodeBlockInfo(v, f)); } if (f == null) { DataNode.LOG.warn("Block " + b + " reopen failed " + Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Jun 4 07:20:10 2010 @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -377,11 +378,28 @@ public class DatanodeDescriptor extends // Note we are taking special precaution to limit tmp blocks allocated // as part this block report - which why block list is stored as longs Block iblk = new Block(); // a fixed new'ed block to be reused with index i + Block oblk = new Block(); // for fixing genstamps for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) { iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), newReport.getBlockGenStamp(i)); BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); if(storedBlock == null) { + // if the block with a WILDCARD generation stamp matches + // then accept this block. + // This block has a diferent generation stamp on the datanode + // because of a lease-recovery-attempt. + oblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), + GenerationStamp.WILDCARD_STAMP); + storedBlock = blocksMap.getStoredBlock(oblk); + if (storedBlock != null && storedBlock.getINode() != null && + (storedBlock.getGenerationStamp() <= iblk.getGenerationStamp() || + storedBlock.getINode().isUnderConstruction())) { + // accept block. It wil be cleaned up on cluster restart. + } else { + storedBlock = null; + } + } + if(storedBlock == null) { // If block is not in blocksMap it does not belong to any file toInvalidate.add(new Block(iblk)); continue; Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=951306&r1=951305&r2=951306&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jun 4 07:20:10 2010 @@ -1042,10 +1042,13 @@ public class FSNamesystem implements FSC // holder is trying to recreate this file. This should never occur. // if (lease != null) { - throw new AlreadyBeingCreatedException( + Lease leaseFile = leaseManager.getLeaseByPath(src); + if (leaseFile != null && leaseFile.equals(lease)) { + throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " on client " + clientMachine + " because current leaseholder is trying to recreate file."); + } } // // Find the original holder. @@ -1819,11 +1822,30 @@ public class FSNamesystem implements FSC } /** + * This is invoked when a lease expires. On lease expiry, + * all the files that were written from that dfsclient should be + * recovered. + */ + void internalReleaseLease(Lease lease, String src) throws IOException { + if (lease.hasPath()) { + // make a copy of the paths because internalReleaseLeaseOne removes + // pathnames from the lease record. + String[] leasePaths = new String[lease.getPaths().size()]; + lease.getPaths().toArray(leasePaths); + for (String p: leasePaths) { + internalReleaseLeaseOne(lease, p); + } + } else { + internalReleaseLeaseOne(lease, src); + } + } + + /** * Move a file that is being written to be immutable. * @param src The filename * @param lease The lease for the client creating the file */ - void internalReleaseLease(Lease lease, String src) throws IOException { + void internalReleaseLeaseOne(Lease lease, String src) throws IOException { LOG.info("Recovering lease=" + lease + ", src=" + src); INodeFile iFile = dir.getFileINode(src); @@ -1931,20 +1953,11 @@ public class FSNamesystem implements FSC descriptors = new DatanodeDescriptor[newtargets.length]; for(int i = 0; i < newtargets.length; i++) { descriptors[i] = getDatanode(newtargets[i]); - } - } - if (closeFile) { - // the file is getting closed. Insert block locations into blocksMap. - // Otherwise fsck will report these blocks as MISSING, especially if the - // blocksReceived from Datanodes take a long time to arrive. - for (int i = 0; i < descriptors.length; i++) { descriptors[i].addBlock(newblockinfo); } - pendingFile.setLastBlock(newblockinfo, null); - } else { - // add locations into the INodeUnderConstruction - pendingFile.setLastBlock(newblockinfo, descriptors); } + // add locations into the INodeUnderConstruction + pendingFile.setLastBlock(newblockinfo, descriptors); } // If this commit does not want to close the file, persist @@ -2329,9 +2342,11 @@ public class FSNamesystem implements FSC LOG.warn("ReplicationMonitor thread received InterruptedException." + ie); break; } catch (IOException ie) { - LOG.warn("ReplicationMonitor thread received exception. " + ie); + LOG.warn("ReplicationMonitor thread received exception. " + ie + " " + + StringUtils.stringifyException(ie)); } catch (Throwable t) { - LOG.warn("ReplicationMonitor thread received Runtime exception. " + t); + LOG.warn("ReplicationMonitor thread received Runtime exception. " + t + " " + + StringUtils.stringifyException(t)); Runtime.getRuntime().exit(-1); } } @@ -2933,6 +2948,24 @@ public class FSNamesystem implements FSC DatanodeDescriptor node, DatanodeDescriptor delNodeHint) { BlockInfo storedBlock = blocksMap.getStoredBlock(block); + if (storedBlock == null) { + // if the block with a WILDCARD generation stamp matches and the + // corresponding file is under construction, then accept this block. + // This block has a diferent generation stamp on the datanode + // because of a lease-recovery-attempt. + Block nblk = new Block(block.getBlockId()); + storedBlock = blocksMap.getStoredBlock(nblk); + if (storedBlock != null && storedBlock.getINode() != null && + (storedBlock.getGenerationStamp() <= block.getGenerationStamp() || + storedBlock.getINode().isUnderConstruction())) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: " + + "addStoredBlock request received for " + block + " on " + + node.getName() + " size " + block.getNumBytes() + + " and it belongs to a file under construction. "); + } else { + storedBlock = null; + } + } if(storedBlock == null || storedBlock.getINode() == null) { // If this block does not belong to anyfile, then we are done. NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: " @@ -2953,6 +2986,8 @@ public class FSNamesystem implements FSC if (block != storedBlock) { if (block.getNumBytes() >= 0) { long cursize = storedBlock.getNumBytes(); + INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null; + boolean underConstruction = (file == null ? false : file.isUnderConstruction()); if (cursize == 0) { storedBlock.setNumBytes(block.getNumBytes()); } else if (cursize != block.getNumBytes()) { @@ -2964,9 +2999,11 @@ public class FSNamesystem implements FSC if (cursize > block.getNumBytes()) { // new replica is smaller in size than existing block. // Mark the new replica as corrupt. - LOG.warn("Mark new replica " + block + " from " + node.getName() + - "as corrupt because its length is shorter than existing ones"); - markBlockAsCorrupt(block, node); + if (!underConstruction) { + LOG.warn("Mark new replica " + block + " from " + node.getName() + + "as corrupt because its length is shorter than existing ones"); + markBlockAsCorrupt(block, node); + } } else { // new replica is larger in size than existing block. // Mark pre-existing replicas as corrupt. @@ -2980,7 +3017,7 @@ public class FSNamesystem implements FSC nodes[count++] = dd; } } - for (int j = 0; j < count; j++) { + for (int j = 0; j < count && !underConstruction; j++) { LOG.warn("Mark existing replica " + block + " from " + node.getName() + " as corrupt because its length is shorter than the new one"); markBlockAsCorrupt(block, nodes[j]); @@ -3003,7 +3040,6 @@ public class FSNamesystem implements FSC } //Updated space consumed if required. - INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null; long diff = (file == null) ? 0 : (file.getPreferredBlockSize() - storedBlock.getNumBytes());