Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 266B711C72 for ; Mon, 8 Sep 2014 03:06:16 +0000 (UTC) Received: (qmail 61722 invoked by uid 500); 8 Sep 2014 03:06:13 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 61544 invoked by uid 500); 8 Sep 2014 03:06:13 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 61419 invoked by uid 99); 8 Sep 2014 03:06:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2014 03:06:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 6424FA0DCFB; Mon, 8 Sep 2014 03:06:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: szetszwo@apache.org To: common-commits@hadoop.apache.org Date: Mon, 08 Sep 2014 03:06:24 -0000 Message-Id: <847e0edda4d744688d35c470ccc13b5f@git.apache.org> In-Reply-To: <7ebc54678aed4e5483e0efd518753b57@git.apache.org> References: <7ebc54678aed4e5483e0efd518753b57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/19] git commit: HDFS-6898. DN must reserve space for a full block when an RBW block is created. (Contributed by Arpit Agarwal) HDFS-6898. DN must reserve space for a full block when an RBW block is created. (Contributed by Arpit Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1fa5829 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1fa5829 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1fa5829 Branch: refs/heads/HDFS-6584 Commit: d1fa58292e87bc29b4ef1278368c2be938a0afc4 Parents: cbea1b1 Author: arp Authored: Sat Sep 6 20:02:40 2014 -0700 Committer: arp Committed: Sat Sep 6 21:04:29 2014 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/protocol/HdfsConstants.java | 2 +- .../server/datanode/ReplicaBeingWritten.java | 12 +- .../hdfs/server/datanode/ReplicaInPipeline.java | 33 ++- .../hdfs/server/datanode/ReplicaInfo.java | 7 + .../server/datanode/fsdataset/FsVolumeSpi.java | 11 + .../datanode/fsdataset/impl/BlockPoolSlice.java | 6 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 15 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 58 +++- .../server/datanode/TestDirectoryScanner.java | 8 + .../fsdataset/impl/TestRbwSpaceReservation.java | 288 +++++++++++++++++++ .../fsdataset/impl/TestWriteToReplica.java | 2 +- 12 files changed, 423 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4412b30..3d43171 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -612,6 +612,9 @@ Release 2.6.0 - UNRELEASED HDFS-6862. Add missing timeout annotations to tests. (Xiaoyu Yao via Arpit Agarwal) + HDFS-6898. DN must reserve space for a full block when an RBW block is + created. (Arpit Agarwal) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 77fe543..240dcd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -48,7 +48,7 @@ public class HdfsConstants { "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol"; - public static final int MIN_BLOCKS_FOR_WRITE = 5; + public static final int MIN_BLOCKS_FOR_WRITE = 1; // Long that indicates "leave current quota unchanged" public static final long QUOTA_DONT_SET = Long.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java index 728dd38..4a89493 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java @@ -34,10 +34,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline { * @param genStamp replica generation stamp * @param vol volume where replica is located * @param dir directory path where block and meta files are located + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ public ReplicaBeingWritten(long blockId, long genStamp, - FsVolumeSpi vol, File dir) { - super( blockId, genStamp, vol, dir); + FsVolumeSpi vol, File dir, long bytesToReserve) { + super(blockId, genStamp, vol, dir, bytesToReserve); } /** @@ -60,10 +62,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline { * @param vol volume where replica is located * @param dir directory path where block and meta files are located * @param writer a thread that is writing to this replica + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ public ReplicaBeingWritten(long blockId, long len, long genStamp, - FsVolumeSpi vol, File dir, Thread writer ) { - super( blockId, len, genStamp, vol, dir, writer); + FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { + super(blockId, len, genStamp, vol, dir, writer, bytesToReserve); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index f808e01..08395aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -44,6 +44,13 @@ public class ReplicaInPipeline extends ReplicaInfo private long bytesOnDisk; private byte[] lastChecksum; private Thread writer; + + /** + * Bytes reserved for this replica on the containing volume. + * Based off difference between the estimated maximum block length and + * the bytes already written to this block. + */ + private long bytesReserved; /** * Constructor for a zero length replica @@ -51,10 +58,12 @@ public class ReplicaInPipeline extends ReplicaInfo * @param genStamp replica generation stamp * @param vol volume where replica is located * @param dir directory path where block and meta files are located + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ public ReplicaInPipeline(long blockId, long genStamp, - FsVolumeSpi vol, File dir) { - this( blockId, 0L, genStamp, vol, dir, Thread.currentThread()); + FsVolumeSpi vol, File dir, long bytesToReserve) { + this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve); } /** @@ -67,7 +76,7 @@ public class ReplicaInPipeline extends ReplicaInfo ReplicaInPipeline(Block block, FsVolumeSpi vol, File dir, Thread writer) { this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), - vol, dir, writer); + vol, dir, writer, 0L); } /** @@ -78,13 +87,16 @@ public class ReplicaInPipeline extends ReplicaInfo * @param vol volume where replica is located * @param dir directory path where block and meta files are located * @param writer a thread that is writing to this replica + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. */ ReplicaInPipeline(long blockId, long len, long genStamp, - FsVolumeSpi vol, File dir, Thread writer ) { + FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { super( blockId, len, genStamp, vol, dir); this.bytesAcked = len; this.bytesOnDisk = len; this.writer = writer; + this.bytesReserved = bytesToReserve; } /** @@ -96,6 +108,7 @@ public class ReplicaInPipeline extends ReplicaInfo this.bytesAcked = from.getBytesAcked(); this.bytesOnDisk = from.getBytesOnDisk(); this.writer = from.writer; + this.bytesReserved = from.bytesReserved; } @Override @@ -115,13 +128,25 @@ public class ReplicaInPipeline extends ReplicaInfo @Override // ReplicaInPipelineInterface public void setBytesAcked(long bytesAcked) { + long newBytesAcked = bytesAcked - this.bytesAcked; this.bytesAcked = bytesAcked; + + // Once bytes are ACK'ed we can release equivalent space from the + // volume's reservedForRbw count. We could have released it as soon + // as the write-to-disk completed but that would be inefficient. + getVolume().releaseReservedSpace(newBytesAcked); + bytesReserved -= newBytesAcked; } @Override // ReplicaInPipelineInterface public long getBytesOnDisk() { return bytesOnDisk; } + + @Override + public long getBytesReserved() { + return bytesReserved; + } @Override // ReplicaInPipelineInterface public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 0dcdf05..49ac605 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -222,6 +222,13 @@ abstract public class ReplicaInfo extends Block implements Replica { public void setUnlinked() { // no need to be unlinked } + + /** + * Number of bytes reserved for this replica on disk. + */ + public long getBytesReserved() { + return 0; + } /** * Copy specified file into a temporary file. Then rename the http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index b14ef56..cba23c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -45,4 +45,15 @@ public interface FsVolumeSpi { public File getFinalizedDir(String bpid) throws IOException; public StorageType getStorageType(); + + /** + * Reserve disk space for an RBW block so a writer does not run out of + * space before the block is full. + */ + public void reserveSpaceForRbw(long bytesToReserve); + + /** + * Release disk space previously reserved for RBW block. + */ + public void releaseReservedSpace(long bytesToRelease); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5774407..96e4650 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -240,7 +240,7 @@ class BlockPoolSlice { return DatanodeUtil.createTmpFile(b, f); } - File addBlock(Block b, File f) throws IOException { + File addFinalizedBlock(Block b, File f) throws IOException { File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); if (!blockDir.exists()) { if (!blockDir.mkdirs()) { @@ -334,9 +334,11 @@ class BlockPoolSlice { // The restart meta file exists if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { // It didn't expire. Load the replica as a RBW. + // We don't know the expected block length, so just use 0 + // and don't reserve any more space for writes. newReplica = new ReplicaBeingWritten(blockId, validateIntegrityAndSetLength(file, genStamp), - genStamp, volume, file.getParentFile(), null); + genStamp, volume, file.getParentFile(), null, 0); loadRwr = false; } sc.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 5306be7..4511f21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -593,7 +593,7 @@ class FsDatasetImpl implements FsDatasetSpi { + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); } if (LOG.isDebugEnabled()) { - LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta + LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta + " and " + srcfile + " to " + dstfile); } return dstfile; @@ -712,7 +712,7 @@ class FsDatasetImpl implements FsDatasetSpi { File oldmeta = replicaInfo.getMetaFile(); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, - v, newBlkFile.getParentFile(), Thread.currentThread()); + v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen); File newmeta = newReplicaInfo.getMetaFile(); // rename meta file to rbw directory @@ -748,7 +748,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - + v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes()); return newReplicaInfo; } @@ -876,7 +876,7 @@ class FsDatasetImpl implements FsDatasetSpi { // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile()); + b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -992,7 +992,7 @@ class FsDatasetImpl implements FsDatasetSpi { // create RBW final ReplicaBeingWritten rbw = new ReplicaBeingWritten( blockId, numBytes, expectedGs, - v, dest.getParentFile(), Thread.currentThread()); + v, dest.getParentFile(), Thread.currentThread(), 0); rbw.setBytesAcked(visible); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); @@ -1013,7 +1013,7 @@ class FsDatasetImpl implements FsDatasetSpi { // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile()); + b.getGenerationStamp(), v, f.getParentFile(), 0); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; @@ -1079,7 +1079,8 @@ class FsDatasetImpl implements FsDatasetSpi { " for block " + replicaInfo); } - File dest = v.addBlock(bpid, replicaInfo, f); + File dest = v.addFinalizedBlock( + bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); } volumeMap.add(bpid, newReplicaInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 0b9fda8..3952c39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -62,6 +63,9 @@ public class FsVolumeImpl implements FsVolumeSpi { private final DF usage; private final long reserved; + // Disk space reserved for open blocks. + private AtomicLong reservedForRbw; + // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just // query from the filesystem. @@ -82,6 +86,7 @@ public class FsVolumeImpl implements FsVolumeSpi { this.reserved = conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); + this.reservedForRbw = new AtomicLong(0L); this.currentDir = currentDir; File parent = currentDir.getParentFile(); this.usage = new DF(parent, conf); @@ -166,13 +171,18 @@ public class FsVolumeImpl implements FsVolumeSpi { @Override public long getAvailable() throws IOException { - long remaining = getCapacity()-getDfsUsed(); + long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get(); long available = usage.getAvailable(); if (remaining > available) { remaining = available; } return (remaining > 0) ? remaining : 0; } + + @VisibleForTesting + public long getReservedForRbw() { + return reservedForRbw.get(); + } long getReserved(){ return reserved; @@ -217,16 +227,58 @@ public class FsVolumeImpl implements FsVolumeSpi { return getBlockPoolSlice(bpid).createTmpFile(b); } + @Override + public void reserveSpaceForRbw(long bytesToReserve) { + if (bytesToReserve != 0) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath()); + } + reservedForRbw.addAndGet(bytesToReserve); + } + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + if (bytesToRelease != 0) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath()); + } + + long oldReservation, newReservation; + do { + oldReservation = reservedForRbw.get(); + newReservation = oldReservation - bytesToRelease; + if (newReservation < 0) { + // Failsafe, this should never occur in practice, but if it does we don't + // want to start advertising more space than we have available. + newReservation = 0; + } + } while (!reservedForRbw.compareAndSet(oldReservation, newReservation)); + } + } + /** * RBW files. They get moved to the finalized block directory when * the block is finalized. */ File createRbwFile(String bpid, Block b) throws IOException { + reserveSpaceForRbw(b.getNumBytes()); return getBlockPoolSlice(bpid).createRbwFile(b); } - File addBlock(String bpid, Block b, File f) throws IOException { - return getBlockPoolSlice(bpid).addBlock(b, f); + /** + * + * @param bytesReservedForRbw Space that was reserved during + * block creation. Now that the block is being finalized we + * can free up this space. + * @return + * @throws IOException + */ + File addFinalizedBlock(String bpid, Block b, + File f, long bytesReservedForRbw) + throws IOException { + releaseReservedSpace(bytesReservedForRbw); + return getBlockPoolSlice(bpid).addFinalizedBlock(b, f); } Executor getCacheExecutor() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 05924ac..bc50eaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -424,6 +424,14 @@ public class TestDirectoryScanner { public String getStorageID() { return ""; } + + @Override + public void reserveSpaceForRbw(long bytesToReserve) { + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + } } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java new file mode 100644 index 0000000..74ac167 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.fs.DU; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Random; + +/** + * Ensure that the DN reserves disk space equivalent to a full block for + * replica being written (RBW). + */ +public class TestRbwSpaceReservation { + static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class); + + private static final short REPL_FACTOR = 1; + private static final int DU_REFRESH_INTERVAL_MSEC = 500; + private static final int STORAGES_PER_DATANODE = 1; + private static final int BLOCK_SIZE = 1024 * 1024; + private static final int SMALL_BLOCK_SIZE = 1024; + + protected MiniDFSCluster cluster; + private Configuration conf; + private DistributedFileSystem fs = null; + private DFSClient client = null; + FsVolumeImpl singletonVolume = null; + + private static Random rand = new Random(); + + private void initConfig(int blockSize) { + conf = new HdfsConfiguration(); + + // Refresh disk usage information frequently. + conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); + conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); + + // Disable the scanner + conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + } + + static { + ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); + } + + private void startCluster(int blockSize, long perVolumeCapacity) throws IOException { + initConfig(blockSize); + + cluster = new MiniDFSCluster + .Builder(conf) + .storagesPerDatanode(STORAGES_PER_DATANODE) + .numDataNodes(REPL_FACTOR) + .build(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + cluster.waitActive(); + + if (perVolumeCapacity >= 0) { + List volumes = + cluster.getDataNodes().get(0).getFSDataset().getVolumes(); + + assertThat(volumes.size(), is(1)); + singletonVolume = ((FsVolumeImpl) volumes.get(0)); + singletonVolume.setCapacityForTesting(perVolumeCapacity); + } + } + + @After + public void shutdownCluster() throws IOException { + if (client != null) { + client.close(); + client = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void createFileAndTestSpaceReservation( + final String fileNamePrefix, final int fileBlockSize) + throws IOException, InterruptedException { + // Enough for 1 block + meta files + some delta. + final long configuredCapacity = fileBlockSize * 2 - 1; + startCluster(BLOCK_SIZE, configuredCapacity); + FSDataOutputStream out = null; + Path path = new Path("/" + fileNamePrefix + ".dat"); + + try { + out = fs.create(path, false, 4096, (short) 1, fileBlockSize); + + byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)]; + out.write(buffer); + out.hsync(); + int bytesWritten = buffer.length; + + // Check that space was reserved for a full block minus the bytesWritten. + assertThat(singletonVolume.getReservedForRbw(), + is((long) fileBlockSize - bytesWritten)); + out.close(); + out = null; + + // Check that the reserved space has been released since we closed the + // file. + assertThat(singletonVolume.getReservedForRbw(), is(0L)); + + // Reopen the file for appends and write 1 more byte. + out = fs.append(path); + out.write(buffer); + out.hsync(); + bytesWritten += buffer.length; + + // Check that space was again reserved for a full block minus the + // bytesWritten so far. + assertThat(singletonVolume.getReservedForRbw(), + is((long) fileBlockSize - bytesWritten)); + + // Write once again and again verify the available space. This ensures + // that the reserved space is progressively adjusted to account for bytes + // written to disk. + out.write(buffer); + out.hsync(); + bytesWritten += buffer.length; + assertThat(singletonVolume.getReservedForRbw(), + is((long) fileBlockSize - bytesWritten)); + } finally { + if (out != null) { + out.close(); + } + } + } + + @Test (timeout=300000) + public void testWithDefaultBlockSize() + throws IOException, InterruptedException { + createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE); + } + + @Test (timeout=300000) + public void testWithNonDefaultBlockSize() + throws IOException, InterruptedException { + // Same test as previous one, but with a non-default block size. + createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2); + } + + /** + * Stress test to ensure we are not leaking reserved space. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=600000) + public void stressTest() throws IOException, InterruptedException { + final int numWriters = 5; + startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10); + Writer[] writers = new Writer[numWriters]; + + // Start a few writers and let them run for a while. + for (int i = 0; i < numWriters; ++i) { + writers[i] = new Writer(client, SMALL_BLOCK_SIZE); + writers[i].start(); + } + + Thread.sleep(60000); + + // Stop the writers. + for (Writer w : writers) { + w.stopWriter(); + } + int filesCreated = 0; + int numFailures = 0; + for (Writer w : writers) { + w.join(); + filesCreated += w.getFilesCreated(); + numFailures += w.getNumFailures(); + } + + LOG.info("Stress test created " + filesCreated + + " files and hit " + numFailures + " failures"); + + // Check no space was leaked. + assertThat(singletonVolume.getReservedForRbw(), is(0L)); + } + + private static class Writer extends Daemon { + private volatile boolean keepRunning; + private final DFSClient localClient; + private int filesCreated = 0; + private int numFailures = 0; + byte[] data; + + Writer(DFSClient client, int blockSize) throws IOException { + localClient = client; + keepRunning = true; + filesCreated = 0; + numFailures = 0; + + // At least some of the files should span a block boundary. + data = new byte[blockSize * 2]; + } + + @Override + public void run() { + /** + * Create a file, write up to 3 blocks of data and close the file. + * Do this in a loop until we are told to stop. + */ + while (keepRunning) { + OutputStream os = null; + try { + String filename = "/file-" + rand.nextLong(); + os = localClient.create(filename, false); + os.write(data, 0, rand.nextInt(data.length)); + IOUtils.closeQuietly(os); + os = null; + localClient.delete(filename, false); + Thread.sleep(50); // Sleep for a bit to avoid killing the system. + ++filesCreated; + } catch (IOException ioe) { + // Just ignore the exception and keep going. + ++numFailures; + } catch (InterruptedException ie) { + return; + } finally { + if (os != null) { + IOUtils.closeQuietly(os); + } + } + } + } + + public void stopWriter() { + keepRunning = false; + } + + public int getFilesCreated() { + return filesCreated; + } + + public int getNumFailures() { + return numFailures; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1fa5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index b8246c3..e6a03d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -158,7 +158,7 @@ public class TestWriteToReplica { replicasMap.add(bpid, new ReplicaInPipeline( blocks[TEMPORARY].getBlockId(), blocks[TEMPORARY].getGenerationStamp(), vol, - vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile())); + vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0)); replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);