Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 82351 invoked from network); 26 Sep 2009 23:42:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 26 Sep 2009 23:42:47 -0000 Received: (qmail 61671 invoked by uid 500); 26 Sep 2009 23:42:47 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 61626 invoked by uid 500); 26 Sep 2009 23:42:47 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 61616 invoked by uid 99); 26 Sep 2009 23:42:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Sep 2009 23:42:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Sep 2009 23:42:44 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 29ABF238889C; Sat, 26 Sep 2009 23:42:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r819232 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/a... Date: Sat, 26 Sep 2009 23:42:23 -0000 To: hdfs-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090926234224.29ABF238889C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Sat Sep 26 23:42:22 2009 New Revision: 819232 URL: http://svn.apache.org/viewvc?rev=819232&view=rev Log: HDFS-570. Get last block length from a data-node when opening a file being written to. Contributed by Tsz Wo (Nicholas), SZE. Added: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (with props) Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original) +++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Sat Sep 26 23:42:22 2009 @@ -43,6 +43,9 @@ HDFS-627. Support replica update in data-node. (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv) + HDFS-570. Get last block length from a data-node when opening a file + being written to. (Tsz Wo (Nicholas), SZE via shv) + IMPROVEMENTS HDFS-509. Redesign DataNode volumeMap to include all types of Replicas. Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Sep 26 23:42:22 2009 @@ -1611,6 +1611,7 @@ private BlockReader blockReader = null; private boolean verifyChecksum; private LocatedBlocks locatedBlocks = null; + private long lastBlockBeingWrittenLength = 0; private DatanodeInfo currentNode = null; private Block currentBlock = null; private long pos = 0; @@ -1643,6 +1644,9 @@ */ synchronized void openInfo() throws IOException { LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); + if (LOG.isDebugEnabled()) { + LOG.debug("newInfo = " + newInfo); + } if (newInfo == null) { throw new IOException("Cannot open filename " + src); } @@ -1657,11 +1661,39 @@ } } this.locatedBlocks = newInfo; + this.lastBlockBeingWrittenLength = + locatedBlocks.isLastBlockComplete()? 0: + readBlockLength(locatedBlocks.getLastLocatedBlock()); this.currentNode = null; } + + /** Read the block length from one of the datanodes. */ + private long readBlockLength(LocatedBlock locatedblock) throws IOException { + if (locatedblock == null || locatedblock.getLocations().length == 0) { + return 0; + } + for(DatanodeInfo datanode : locatedblock.getLocations()) { + try { + final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy( + datanode, conf); + final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); + if (n >= 0) { + return n; + } + } + catch(IOException ioe) { + if (LOG.isDebugEnabled()) { + LOG.debug("Faild to getReplicaVisibleLength from datanode " + + datanode + " for block " + locatedblock.getBlock(), ioe); + } + } + } + throw new IOException("Cannot obtain block length for " + locatedblock); + } public synchronized long getFileLength() { - return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength(); + return locatedBlocks == null? 0: + locatedBlocks.getFileLength() + lastBlockBeingWrittenLength; } /** @@ -1697,17 +1729,36 @@ private synchronized LocatedBlock getBlockAt(long offset, boolean updatePosition) throws IOException { assert (locatedBlocks != null) : "locatedBlocks is null"; - // search cached blocks first - int targetBlockIdx = locatedBlocks.findBlock(offset); - if (targetBlockIdx < 0) { // block is not cached - targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); - // fetch more blocks - LocatedBlocks newBlocks; - newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize); - assert (newBlocks != null) : "Could not find target position " + offset; - locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); + + final LocatedBlock blk; + + //check offset + if (offset < 0 || offset >= getFileLength()) { + throw new IOException("offset < 0 || offset > getFileLength(), offset=" + + offset + + ", updatePosition=" + updatePosition + + ", locatedBlocks=" + locatedBlocks); + } + else if (offset >= locatedBlocks.getFileLength()) { + // offset to the portion of the last block, + // which is not known to the name-node yet; + // getting the last block + blk = locatedBlocks.getLastLocatedBlock(); + } + else { + // search cached blocks first + int targetBlockIdx = locatedBlocks.findBlock(offset); + if (targetBlockIdx < 0) { // block is not cached + targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); + // fetch more blocks + LocatedBlocks newBlocks; + newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize); + assert (newBlocks != null) : "Could not find target position " + offset; + locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); + } + blk = locatedBlocks.get(targetBlockIdx); } - LocatedBlock blk = locatedBlocks.get(targetBlockIdx); + // update current position if (updatePosition) { this.pos = offset; @@ -1744,6 +1795,27 @@ private synchronized List getBlockRange(long offset, long length) throws IOException { + final List blocks; + if (locatedBlocks.isLastBlockComplete()) { + blocks = getFinalizedBlockRange(offset, length); + } + else { + if (length + offset > locatedBlocks.getFileLength()) { + length = locatedBlocks.getFileLength() - offset; + } + blocks = getFinalizedBlockRange(offset, length); + blocks.add(locatedBlocks.getLastLocatedBlock()); + } + return blocks; + } + + /** + * Get blocks in the specified range. + * Includes only the complete blocks. + * Fetch them from the namenode if not cached. + */ + private synchronized List getFinalizedBlockRange( + long offset, long length) throws IOException { assert (locatedBlocks != null) : "locatedBlocks is null"; List blockRange = new ArrayList(); // search cached blocks first Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Sat Sep 26 23:42:22 2009 @@ -29,9 +29,9 @@ public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); /** - * 4: never return null and always return a newly generated access token + * 5: add getReplicaVisibleLength(..) */ - public static final long versionID = 4L; + public static final long versionID = 5L; /** Start generation-stamp recovery for specified block * @param block the specified block @@ -45,4 +45,7 @@ @Deprecated // not used anymore - should be removed LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException; + + /** Return the visible length of a replica. */ + long getReplicaVisibleLength(Block b) throws IOException; } Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Sep 26 23:42:22 2009 @@ -44,11 +44,9 @@ * Compared to the previous version the following changes have been introduced: * (Only the latest change is reflected. * The log of historical changes can be retrieved from the svn). - * 49: added two new methods to support pipeline recovery and append - * updateBlockForPipeline(Block, String) and - * updatePipeline(String, Block, Block, DatanodeID[]) + * 50: change LocatedBlocks to include last block information. */ - public static final long versionID = 49L; + public static final long versionID = 50L; /////////////////////////////////////// // File contents Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Sat Sep 26 23:42:22 2009 @@ -145,4 +145,20 @@ locs[i].readFields(in); } } + + /** Read LocatedBlock from in. */ + public static LocatedBlock read(DataInput in) throws IOException { + final LocatedBlock lb = new LocatedBlock(); + lb.readFields(in); + return lb; + } + + /** {@inheritDoc} */ + public String toString() { + return getClass().getSimpleName() + "{" + b + + "; corrupt=" + corrupt + + "; offset=" + offset + + "; locs=" + java.util.Arrays.asList(locs) + + "}"; + } } Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Sat Sep 26 23:42:22 2009 @@ -36,6 +36,8 @@ private long fileLength; private List blocks; // array of blocks with prioritized locations private boolean underConstruction; + private LocatedBlock lastLocatedBlock = null; + private boolean isLastBlockComplete = false; LocatedBlocks() { fileLength = 0; @@ -43,11 +45,15 @@ underConstruction = false; } - public LocatedBlocks(long flength, List blks, boolean isUnderConstuction) { - + /** public Constructor */ + public LocatedBlocks(long flength, boolean isUnderConstuction, + List blks, + LocatedBlock lastBlock, boolean isLastBlockCompleted) { fileLength = flength; blocks = blks; underConstruction = isUnderConstuction; + this.lastLocatedBlock = lastBlock; + this.isLastBlockComplete = isLastBlockCompleted; } /** @@ -57,6 +63,16 @@ return blocks; } + /** Get the last located block. */ + public LocatedBlock getLastLocatedBlock() { + return lastLocatedBlock; + } + + /** Is the last block completed? */ + public boolean isLastBlockComplete() { + return isLastBlockComplete; + } + /** * Get located block. */ @@ -161,6 +177,15 @@ public void write(DataOutput out) throws IOException { out.writeLong(this.fileLength); out.writeBoolean(underConstruction); + + //write the last located block + final boolean isNull = lastLocatedBlock == null; + out.writeBoolean(isNull); + if (!isNull) { + lastLocatedBlock.write(out); + } + out.writeBoolean(isLastBlockComplete); + // write located blocks int nrBlocks = locatedBlockCount(); out.writeInt(nrBlocks); @@ -175,6 +200,14 @@ public void readFields(DataInput in) throws IOException { this.fileLength = in.readLong(); underConstruction = in.readBoolean(); + + //read the last located block + final boolean isNull = in.readBoolean(); + if (!isNull) { + lastLocatedBlock = LocatedBlock.read(in); + } + isLastBlockComplete = in.readBoolean(); + // read located blocks int nrBlocks = in.readInt(); this.blocks = new ArrayList(nrBlocks); @@ -184,4 +217,18 @@ this.blocks.add(blk); } } + + /** {@inheritDoc} */ + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()); + b.append("{") + .append("\n fileLength=").append(fileLength) + .append("\n underConstruction=").append(underConstruction) + .append("\n blocks=").append(blocks) + .append("\n lastLocatedBlock=").append(lastLocatedBlock) + .append("\n isLastBlockComplete=").append(isLastBlockComplete) + .append("}"); + return b.toString(); + } } Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Sep 26 23:42:22 2009 @@ -1725,4 +1725,19 @@ LOG.info(who + " calls recoverBlock(block=" + block + ", targets=[" + msg + "])"); } + + /** {@inheritDoc} */ + @Override + public long getReplicaVisibleLength(final Block block) throws IOException { + final Replica replica = data.getReplica(block.getBlockId()); + if (replica == null) { + throw new ReplicaNotFoundException(block); + } + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + + block + ", replica=" + replica); + } + return replica.getVisibleLength(); + } } Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java Sat Sep 26 23:42:22 2009 @@ -266,4 +266,17 @@ // Sufficient to rely on super's implementation return (this == obj) || super.equals(obj); } + + /** {@inheritDoc} */ + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()); + b.append("{") + .append("\n blockUCState=").append(blockUCState) + .append("\n replicas=").append(replicas) + .append("\n primaryNodeIndex=").append(primaryNodeIndex) + .append("\n lastRecoveryTime=").append(lastRecoveryTime) + .append("}"); + return b.toString(); + } } Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Sat Sep 26 23:42:22 2009 @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas; import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator; -import org.apache.hadoop.security.AccessTokenHandler; /** * Keeps information related to the blocks stored in the Hadoop cluster. @@ -346,43 +344,12 @@ } if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file - return null; + return Collections.emptyList(); long endOff = offset + length; List results = new ArrayList(blocks.length); do { - // get block locations - int numNodes = blocksMap.numNodes(blocks[curBlk]); - int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas(); - int numCorruptReplicas = corruptReplicas - .numCorruptReplicas(blocks[curBlk]); - if (numCorruptNodes != numCorruptReplicas) { - FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " - + blocks[curBlk] + "blockMap has " + numCorruptNodes - + " but corrupt replicas map has " + numCorruptReplicas); - } - boolean blockCorrupt = (numCorruptNodes == numNodes); - int numMachineSet = blockCorrupt ? numNodes : - (numNodes - numCorruptNodes); - DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet]; - if (numMachineSet > 0) { - numNodes = 0; - for (Iterator it = - blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) { - DatanodeDescriptor dn = it.next(); - boolean replicaCorrupt = - corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn); - if (blockCorrupt || (!blockCorrupt && !replicaCorrupt)) - machineSet[numNodes++] = dn; - } - } - LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos, - blockCorrupt); - if (namesystem.isAccessTokenEnabled) { - b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock() - .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ))); - } - results.add(b); + results.add(getBlockLocation(blocks[curBlk], curPos)); curPos += blocks[curBlk].getNumBytes(); curBlk++; } while (curPos < endOff @@ -391,6 +358,35 @@ return results; } + /** @return a LocatedBlock for the given block */ + LocatedBlock getBlockLocation(final Block blk, final long pos + ) throws IOException { + // get block locations + final int numCorruptNodes = countNodes(blk).corruptReplicas(); + final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk); + if (numCorruptNodes != numCorruptReplicas) { + FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " + + blk + "blockMap has " + numCorruptNodes + + " but corrupt replicas map has " + numCorruptReplicas); + } + + final int numNodes = blocksMap.numNodes(blk); + final boolean isCorrupt = numCorruptNodes == numNodes; + final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; + final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines]; + if (numMachines > 0) { + int j = 0; + for(Iterator it = blocksMap.nodeIterator(blk); + it.hasNext();) { + final DatanodeDescriptor d = it.next(); + final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); + if (isCorrupt || (!isCorrupt && !replicaCorrupt)) + machines[j++] = d; + } + } + return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt); + } + /** * Check whether the replication parameter is within the range * determined by system configuration. Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep 26 23:42:22 2009 @@ -713,14 +713,51 @@ if (doAccessTime && isAccessTimeSupported()) { dir.setTimes(src, inode, -1, now(), false); } - final Block[] blocks = inode.getBlocks(); + final BlockInfo[] blocks = inode.getBlocks(); + if (LOG.isDebugEnabled()) { + LOG.debug("blocks = " + java.util.Arrays.asList(blocks)); + } if (blocks == null) { return null; } - final List results = blocks.length == 0? - new ArrayList(0): - blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE); - return inode.createLocatedBlocks(results); + + if (blocks.length == 0) { + return new LocatedBlocks(0, inode.isUnderConstruction(), + Collections.emptyList(), null, false); + } else { + final long n = inode.computeFileSize(false); + final List locatedblocks = blockManager.getBlockLocations( + blocks, offset, length, Integer.MAX_VALUE); + final BlockInfo last = inode.getLastBlock(); + if (LOG.isDebugEnabled()) { + LOG.debug("last = " + last); + } + + if (!last.isComplete()) { + final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)last; + final DatanodeDescriptor[] locations = uc.getExpectedLocations(); + if (LOG.isDebugEnabled()) { + LOG.debug("locations = " + java.util.Arrays.asList(locations)); + } + return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks, + createLocatedBlock(uc, locations, n, false), false); + } + else { + return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks, + blockManager.getBlockLocation(last, n-last.getNumBytes()), true); + } + } + } + + /** Create a LocatedBlock. */ + LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations, + final long offset, final boolean corrupt) throws IOException { + final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt); + if (isAccessTokenEnabled) { + lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(), + EnumSet.of(AccessTokenHandler.AccessMode.READ))); + } + return lb; } /** Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Sat Sep 26 23:42:22 2009 @@ -25,8 +25,6 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; /** * We keep an in-memory representation of the file/block hierarchy. @@ -423,10 +421,4 @@ } return null; } - - - LocatedBlocks createLocatedBlocks(List blocks) { - return new LocatedBlocks(computeContentSummary().getLength(), blocks, - isUnderConstruction()); - } } Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=819232&r1=819231&r2=819232&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sat Sep 26 23:42:22 2009 @@ -122,16 +122,29 @@ /** {@inheritDoc} */ long[] computeContentSummary(long[] summary) { - long bytes = 0; - for(Block blk : blocks) { - bytes += blk.getNumBytes(); - } - summary[0] += bytes; + summary[0] += computeFileSize(true); summary[1]++; summary[3] += diskspaceConsumed(); return summary; } + /** Compute file size. + * May or may not include BlockInfoUnderConstruction. + */ + long computeFileSize(boolean includesBlockInfoUnderConstruction) { + if (blocks == null || blocks.length == 0) { + return 0; + } + final int last = blocks.length - 1; + //check if the last block is BlockInfoUnderConstruction + long bytes = blocks[last] instanceof BlockInfoUnderConstruction + && !includesBlockInfoUnderConstruction? + 0: blocks[last].getNumBytes(); + for(int i = 0; i < last; i++) { + bytes += blocks[i].getNumBytes(); + } + return bytes; + } @Override Added: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=819232&view=auto ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (added) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Sat Sep 26 23:42:22 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +/** Test reading from hdfs while a file is being written. */ +public class TestReadWhileWriting { + { + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + + private static final String DIR = "/" + + TestReadWhileWriting.class.getSimpleName() + "/"; + private static final int BLOCK_SIZE = 8192; + + /** Test reading while writing. */ + @Test + public void testReadWhileWriting() throws Exception { + Configuration conf = new Configuration(); + // create cluster + final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null); + try { + cluster.waitActive(); + final FileSystem fs = cluster.getFileSystem(); + + // write to a file but not closing it. + final Path p = new Path(DIR, "file1"); + final FSDataOutputStream out = fs.create(p, true, + fs.getConf().getInt("io.file.buffer.size", 4096), + (short)3, BLOCK_SIZE); + final int size = BLOCK_SIZE/3; + final byte[] buffer = AppendTestUtil.randomBytes(0, size); + out.write(buffer, 0, size); + out.flush(); + out.sync(); + + // able to read? + Assert.assertTrue(read(fs, p, size)); + + out.close(); + } finally { + cluster.shutdown(); + } + } + + /** able to read? */ + private static boolean read(FileSystem fs, Path p, int expectedsize + ) throws Exception { + //try at most 3 minutes + for(int i = 0; i < 360; i++) { + final FSDataInputStream in = fs.open(p); + try { + final int available = in.available(); + System.out.println(i + ") in.available()=" + available); + Assert.assertTrue(available >= 0); + Assert.assertTrue(available <= expectedsize); + if (available == expectedsize) { + return true; + } + } finally { + in.close(); + } + Thread.sleep(500); + } + return false; + } +} Propchange: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java ------------------------------------------------------------------------------ svn:mime-type = text/plain