Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7935E109CA for ; Wed, 26 Nov 2014 18:00:54 +0000 (UTC) Received: (qmail 27817 invoked by uid 500); 26 Nov 2014 18:00:54 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 27748 invoked by uid 500); 26 Nov 2014 18:00:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 27739 invoked by uid 99); 26 Nov 2014 18:00:54 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 18:00:54 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D8F6F829E3C; Wed, 26 Nov 2014 18:00:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: umamahesh@apache.org To: common-commits@hadoop.apache.org Message-Id: <69d419005c654b64a72588c2cf1afebe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-7310. Mover can give first priority to local DN if it has target storage type available in local DN. (Vinayakumar B via umamahesh) Date: Wed, 26 Nov 2014 18:00:53 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk aa7dac335 -> 058af60c5 HDFS-7310. Mover can give first priority to local DN if it has target storage type available in local DN. (Vinayakumar B via umamahesh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/058af60c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/058af60c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/058af60c Branch: refs/heads/trunk Commit: 058af60c56207907f2bedf76df4284e86d923e0c Parents: aa7dac3 Author: Uma Maheswara Rao G Authored: Wed Nov 26 23:27:25 2014 +0530 Committer: Uma Maheswara Rao G Committed: Wed Nov 26 23:27:25 2014 +0530 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/balancer/Dispatcher.java | 17 +-- .../server/blockmanagement/BlockManager.java | 18 ++- .../hdfs/server/blockmanagement/BlocksMap.java | 6 +- .../blockmanagement/DatanodeStorageInfo.java | 14 +- .../hdfs/server/datanode/DataXceiver.java | 141 ++++++++++--------- .../server/datanode/fsdataset/FsDatasetSpi.java | 7 + .../datanode/fsdataset/impl/BlockPoolSlice.java | 4 + .../datanode/fsdataset/impl/FsDatasetImpl.java | 134 ++++++++++++++---- .../datanode/fsdataset/impl/FsVolumeImpl.java | 4 + .../impl/RamDiskAsyncLazyPersistService.java | 2 +- .../apache/hadoop/hdfs/server/mover/Mover.java | 29 +++- .../server/blockmanagement/TestBlockInfo.java | 4 +- .../blockmanagement/TestDatanodeDescriptor.java | 8 +- .../server/datanode/SimulatedFSDataset.java | 7 + .../server/datanode/TestBlockReplacement.java | 101 ++++++++++--- .../hadoop/hdfs/server/mover/TestMover.java | 48 ++++++- 17 files changed, 397 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 89e67eb..12219a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -401,6 +401,9 @@ Release 2.7.0 - UNRELEASED HDFS-6803 Document DFSClient#DFSInputStream expectations reading and preading in concurrent context. (stack via stevel) + HDFS-7310. Mover can give first priority to local DN if it has target storage type + available in local DN. (Vinayakumar B via umamahesh) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 6ede40a..63e151c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -243,6 +243,10 @@ public class Dispatcher { */ private boolean chooseProxySource() { final DatanodeInfo targetDN = target.getDatanodeInfo(); + // if source and target are same nodes then no need of proxy + if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) { + return true; + } // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { for (StorageGroup loc : block.getLocations()) { @@ -375,19 +379,6 @@ public class Dispatcher { public DBlock(Block block) { super(block); } - - @Override - public synchronized boolean isLocatedOn(StorageGroup loc) { - // currently we only check if replicas are located on the same DataNodes - // since we do not have the capability to store two replicas in the same - // DataNode even though they are on two different storage types - for (StorageGroup existing : locations) { - if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) { - return true; - } - } - return false; - } } /** The class represents a desired move. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 254643c..2676696 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.fs.FileEncryptionInfo; - import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; @@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.Acces import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -2000,8 +2000,9 @@ public class BlockManager { // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), (short) 1); - boolean added = storageInfo.addBlock(delimiter); - assert added : "Delimiting block cannot be present in the node"; + AddBlockResult result = storageInfo.addBlock(delimiter); + assert result == AddBlockResult.ADDED + : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list int curIndex; @@ -2394,14 +2395,19 @@ public class BlockManager { assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock); int curReplicaDelta; - if (added) { + if (result == AddBlockResult.ADDED) { curReplicaDelta = 1; if (logEveryBlock) { logAddStoredBlock(storedBlock, node); } + } else if (result == AddBlockResult.REPLACED) { + curReplicaDelta = 0; + blockLog.warn("BLOCK* addStoredBlock: " + "block " + storedBlock + + " moved to storageType " + storageInfo.getStorageType() + + " on node " + node); } else { // if the same block is added again and the replica was corrupt // previously because of a wrong gen stamp, remove it from the @@ -2423,7 +2429,7 @@ public class BlockManager { if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && numLiveReplicas >= minReplication) { storedBlock = completeBlock(bc, storedBlock, false); - } else if (storedBlock.isComplete() && added) { + } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that // Is no-op if not in safe mode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index a675635..6664034 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; @@ -223,8 +224,9 @@ class BlocksMap { final boolean removed = storage.removeBlock(currentBlock); Preconditions.checkState(removed, "currentBlock not found."); - final boolean added = storage.addBlock(newBlock); - Preconditions.checkState(added, "newBlock already exists."); + final AddBlockResult result = storage.addBlock(newBlock); + Preconditions.checkState(result == AddBlockResult.ADDED, + "newBlock already exists."); } // replace block in the map itself blocks.put(newBlock); http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 8c44b30..a3198e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -215,10 +215,10 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - public boolean addBlock(BlockInfo b) { + public AddBlockResult addBlock(BlockInfo b) { // First check whether the block belongs to a different storage // on the same DN. - boolean replaced = false; + AddBlockResult result = AddBlockResult.ADDED; DatanodeStorageInfo otherStorage = b.findStorageInfo(getDatanodeDescriptor()); @@ -226,10 +226,10 @@ public class DatanodeStorageInfo { if (otherStorage != this) { // The block belongs to a different storage. Remove it first. otherStorage.removeBlock(b); - replaced = true; + result = AddBlockResult.REPLACED; } else { // The block is already associated with this storage. - return false; + return AddBlockResult.ALREADY_EXIST; } } @@ -237,7 +237,7 @@ public class DatanodeStorageInfo { b.addStorage(this); blockList = b.listInsert(blockList, this); numBlocks++; - return !replaced; + return result; } boolean removeBlock(BlockInfo b) { @@ -358,4 +358,8 @@ public class DatanodeStorageInfo { } return null; } + + static enum AddBlockResult { + ADDED, REPLACED, ALREADY_EXIST; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 61b9c67..bbc23e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -1007,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable { updateCurrentThreadName("Replacing block " + block + " from " + delHint); /* read header */ - block.setNumBytes(dataXceiverServer.estimateBlockSize); if (datanode.isBlockTokenEnabled) { try { datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, @@ -1039,73 +1038,83 @@ class DataXceiver extends Receiver implements Runnable { DataOutputStream replyOut = new DataOutputStream(getOutputStream()); boolean IoeDuringCopyBlockOperation = false; try { - // get the output stream to the proxy - final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to datanode " + dnAddr); - } - InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr); - proxySock = datanode.newSocket(); - NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); - proxySock.setSoTimeout(dnConf.socketTimeout); - - OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, - dnConf.socketWriteTimeout); - InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); - DataEncryptionKeyFactory keyFactory = - datanode.getDataEncryptionKeyFactoryForBlock(block); - IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock, - unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource); - unbufProxyOut = saslStreams.out; - unbufProxyIn = saslStreams.in; - - proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, - HdfsConstants.SMALL_BUFFER_SIZE)); - proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - - /* send request to the proxy */ - IoeDuringCopyBlockOperation = true; - new Sender(proxyOut).copyBlock(block, blockToken); - IoeDuringCopyBlockOperation = false; - - // receive the response from the proxy - - BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(proxyReply)); - - if (copyResponse.getStatus() != SUCCESS) { - if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) { + // Move the block to different storage in the same datanode + if (proxySource.equals(datanode.getDatanodeId())) { + ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block, + storageType); + if (oldReplica != null) { + LOG.info("Moved " + block + " from StorageType " + + oldReplica.getVolume().getStorageType() + " to " + storageType); + } + } else { + block.setNumBytes(dataXceiverServer.estimateBlockSize); + // get the output stream to the proxy + final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr); + } + InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr); + proxySock = datanode.newSocket(); + NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); + proxySock.setSoTimeout(dnConf.socketTimeout); + + OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, + dnConf.socketWriteTimeout); + InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock, + unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource); + unbufProxyOut = saslStreams.out; + unbufProxyIn = saslStreams.in; + + proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, + HdfsConstants.IO_FILE_BUFFER_SIZE)); + + /* send request to the proxy */ + IoeDuringCopyBlockOperation = true; + new Sender(proxyOut).copyBlock(block, blockToken); + IoeDuringCopyBlockOperation = false; + + // receive the response from the proxy + + BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( + PBHelper.vintPrefixed(proxyReply)); + + if (copyResponse.getStatus() != SUCCESS) { + if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) { + throw new IOException("Copy block " + block + " from " + + proxySock.getRemoteSocketAddress() + + " failed due to access token error"); + } throw new IOException("Copy block " + block + " from " - + proxySock.getRemoteSocketAddress() - + " failed due to access token error"); + + proxySock.getRemoteSocketAddress() + " failed"); } - throw new IOException("Copy block " + block + " from " - + proxySock.getRemoteSocketAddress() + " failed"); + + // get checksum info about the block we're copying + ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo(); + DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + // open a block receiver and check if the block does not exist + blockReceiver = new BlockReceiver(block, storageType, + proxyReply, proxySock.getRemoteSocketAddress().toString(), + proxySock.getLocalSocketAddress().toString(), + null, 0, 0, 0, "", null, datanode, remoteChecksum, + CachingStrategy.newDropBehind(), false); + + // receive a block + blockReceiver.receiveBlock(null, null, replyOut, null, + dataXceiverServer.balanceThrottler, null, true); + + // notify name node + datanode.notifyNamenodeReceivedBlock( + block, delHint, blockReceiver.getStorageUuid()); + + LOG.info("Moved " + block + " from " + peer.getRemoteAddressString() + + ", delHint=" + delHint); } - - // get checksum info about the block we're copying - ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo(); - DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver(block, storageType, - proxyReply, proxySock.getRemoteSocketAddress().toString(), - proxySock.getLocalSocketAddress().toString(), - null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false); - - // receive a block - blockReceiver.receiveBlock(null, null, replyOut, null, - dataXceiverServer.balanceThrottler, null, true); - - // notify name node - datanode.notifyNamenodeReceivedBlock( - block, delHint, blockReceiver.getStorageUuid()); - - LOG.info("Moved " + block + " from " + peer.getRemoteAddressString() - + ", delHint=" + delHint); - } catch (IOException ioe) { opStatus = ERROR; errMsg = "opReplaceBlock " + block + " received exception " + ioe; @@ -1117,7 +1126,7 @@ class DataXceiver extends Receiver implements Runnable { throw ioe; } finally { // receive the last byte that indicates the proxy released its thread resource - if (opStatus == SUCCESS) { + if (opStatus == SUCCESS && proxyReply != null) { try { proxyReply.readChar(); } catch (IOException ignored) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index a02ee0a..462ad31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; @@ -508,4 +509,10 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail */ public void onFailLazyPersist(String bpId, long blockId); + + /** + * Move block from one storage to another storage + */ + public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, + StorageType targetStorageType) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5c8709c..77cdb91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -157,6 +157,10 @@ class BlockPoolSlice { return rbwDir; } + File getTmpDir() { + return tmpDir; + } + /** Run DU on local drives. It must be synchronized from caller. */ void decDfsUsed(long value) { dfsUsage.decDfsUsed(value); http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 4a89778..2c6f409 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -663,13 +663,21 @@ class FsDatasetImpl implements FsDatasetSpi { * @return the new meta and block files. * @throws IOException */ - static File[] copyBlockFiles(long blockId, long genStamp, - File srcMeta, File srcFile, File destRoot) + static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, + File srcFile, File destRoot, boolean calculateChecksum) throws IOException { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); - computeChecksum(srcMeta, dstMeta, srcFile); + if (calculateChecksum) { + computeChecksum(srcMeta, dstMeta, srcFile); + } else { + try { + Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); + } catch (IOException e) { + throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); + } + } try { Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true); @@ -677,14 +685,73 @@ class FsDatasetImpl implements FsDatasetSpi { throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); } if (LOG.isDebugEnabled()) { - LOG.debug("Copied " + srcMeta + " to " + dstMeta + - " and calculated checksum"); - LOG.debug("Copied " + srcFile + " to " + dstFile); + if (calculateChecksum) { + LOG.debug("Copied " + srcMeta + " to " + dstMeta + + " and calculated checksum"); + } else { + LOG.debug("Copied " + srcFile + " to " + dstFile); + } } return new File[] {dstMeta, dstFile}; } /** + * Move block files from one storage to another storage. + * @return Returns the Old replicaInfo + * @throws IOException + */ + @Override + public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, + StorageType targetStorageType) throws IOException { + ReplicaInfo replicaInfo = getReplicaInfo(block); + if (replicaInfo.getState() != ReplicaState.FINALIZED) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNFINALIZED_REPLICA + block); + } + if (replicaInfo.getNumBytes() != block.getNumBytes()) { + throw new IOException("Corrupted replica " + replicaInfo + + " with a length of " + replicaInfo.getNumBytes() + + " expected length is " + block.getNumBytes()); + } + if (replicaInfo.getVolume().getStorageType() == targetStorageType) { + throw new ReplicaAlreadyExistsException("Replica " + replicaInfo + + " already exists on storage " + targetStorageType); + } + + if (replicaInfo.isOnTransientStorage()) { + // Block movement from RAM_DISK will be done by LazyPersist mechanism + throw new IOException("Replica " + replicaInfo + + " cannot be moved from storageType : " + + replicaInfo.getVolume().getStorageType()); + } + + FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType, + block.getNumBytes()); + File oldBlockFile = replicaInfo.getBlockFile(); + File oldMetaFile = replicaInfo.getMetaFile(); + + // Copy files to temp dir first + File[] blockFiles = copyBlockFiles(block.getBlockId(), + block.getGenerationStamp(), oldMetaFile, oldBlockFile, + targetVolume.getTmpDir(block.getBlockPoolId()), + replicaInfo.isOnTransientStorage()); + + ReplicaInfo newReplicaInfo = new ReplicaInPipeline( + replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), + targetVolume, blockFiles[0].getParentFile(), 0); + newReplicaInfo.setNumBytes(blockFiles[1].length()); + // Finalize the copied files + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); + + removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, + oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); + + // Replace the old block if any to reschedule the scanning. + datanode.getBlockScanner().addBlock(block); + return replicaInfo; + } + + /** * Compute and store the checksum for a block file that does not already have * its checksum computed. * @@ -2442,6 +2509,35 @@ class FsDatasetImpl implements FsDatasetSpi { } } + private void removeOldReplica(ReplicaInfo replicaInfo, + ReplicaInfo newReplicaInfo, File blockFile, File metaFile, + long blockFileUsed, long metaFileUsed, final String bpid) { + // Before deleting the files from old storage we must notify the + // NN that the files are on the new storage. Else a blockReport from + // the transient storage might cause the NN to think the blocks are lost. + // Replicas must be evicted from client short-circuit caches, because the + // storage will no longer be same, and thus will require validating + // checksum. This also stops a client from holding file descriptors, + // which would prevent the OS from reclaiming the memory. + ExtendedBlock extendedBlock = + new ExtendedBlock(bpid, newReplicaInfo); + datanode.getShortCircuitRegistry().processBlockInvalidation( + ExtendedBlockId.fromExtendedBlock(extendedBlock)); + datanode.notifyNamenodeReceivedBlock( + extendedBlock, null, newReplicaInfo.getStorageUuid()); + + // Remove the old replicas + if (blockFile.delete() || !blockFile.exists()) { + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed); + if (metaFile.delete() || !metaFile.exists()) { + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed); + } + } + + // If deletion failed then the directory scanner will cleanup the blocks + // eventually. + } + class LazyWriter implements Runnable { private volatile boolean shouldRun = true; final int checkpointerInterval; @@ -2601,30 +2697,8 @@ class FsDatasetImpl implements FsDatasetSpi { } } - // Before deleting the files from transient storage we must notify the - // NN that the files are on the new storage. Else a blockReport from - // the transient storage might cause the NN to think the blocks are lost. - // Replicas must be evicted from client short-circuit caches, because the - // storage will no longer be transient, and thus will require validating - // checksum. This also stops a client from holding file descriptors, - // which would prevent the OS from reclaiming the memory. - ExtendedBlock extendedBlock = - new ExtendedBlock(bpid, newReplicaInfo); - datanode.getShortCircuitRegistry().processBlockInvalidation( - ExtendedBlockId.fromExtendedBlock(extendedBlock)); - datanode.notifyNamenodeReceivedBlock( - extendedBlock, null, newReplicaInfo.getStorageUuid()); - - // Remove the old replicas from transient storage. - if (blockFile.delete() || !blockFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed); - if (metaFile.delete() || !metaFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed); - } - } - - // If deletion failed then the directory scanner will cleanup the blocks - // eventually. + removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, + blockFileUsed, metaFileUsed, bpid); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 1d7540c..48427ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -129,6 +129,10 @@ public class FsVolumeImpl implements FsVolumeSpi { return getBlockPoolSlice(bpid).getLazypersistDir(); } + File getTmpDir(String bpid) throws IOException { + return getBlockPoolSlice(bpid).getTmpDir(); + } + void decDfsUsed(String bpid, long value) { synchronized(dataset) { BlockPoolSlice bp = bpSlices.get(bpid); http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 5fdcc2f..c9aba8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -232,7 +232,7 @@ class RamDiskAsyncLazyPersistService { try { // No FsDatasetImpl lock for the file copy File targetFiles[] = FsDatasetImpl.copyBlockFiles( - blockId, genStamp, metaFile, blockFile, lazyPersistDir); + blockId, genStamp, metaFile, blockFile, lazyPersistDir, true); // Lock FsDataSetImpl during onCompleteLazyPersist callback datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 59814af..108eb38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -86,8 +86,8 @@ public class Mover { return get(sources, ml); } - private StorageGroup getTarget(MLocation ml) { - return get(targets, ml); + private StorageGroup getTarget(String uuid, StorageType storageType) { + return targets.get(uuid, storageType); } private static G get(StorageGroupMap map, MLocation ml) { @@ -387,6 +387,11 @@ public class Mover { boolean scheduleMoveReplica(DBlock db, Source source, List targetTypes) { + // Match storage on the same node + if (chooseTargetInSameNode(db, source, targetTypes)) { + return true; + } + if (dispatcher.getCluster().isNodeGroupAware()) { if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) { return true; @@ -401,6 +406,26 @@ public class Mover { return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER); } + /** + * Choose the target storage within same Datanode if possible. + */ + boolean chooseTargetInSameNode(DBlock db, Source source, + List targetTypes) { + for (StorageType t : targetTypes) { + StorageGroup target = storages.getTarget(source.getDatanodeInfo() + .getDatanodeUuid(), t); + if (target == null) { + continue; + } + final PendingMove pm = source.addPendingMove(db, target); + if (pm != null) { + dispatcher.executePendingMove(pm); + return true; + } + } + return false; + } + boolean chooseTarget(DBlock db, Source source, List targetTypes, Matcher matcher) { final NetworkTopology cluster = dispatcher.getCluster(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index f8c583a..41c8f8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.junit.Assert; @@ -75,7 +76,8 @@ public class TestBlockInfo { } // Try to move one of the blocks to a different storage. - boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]); + boolean added = + storage2.addBlock(blockInfos[NUM_BLOCKS / 2]) == AddBlockResult.ADDED; Assert.assertThat(added, is(false)); Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java index e00a4c3..fe639e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.junit.Test; @@ -61,18 +62,17 @@ public class TestDatanodeDescriptor { BlockInfo blk1 = new BlockInfo(new Block(2L), (short) 2); DatanodeStorageInfo[] storages = dd.getStorageInfos(); assertTrue(storages.length > 0); - final String storageID = storages[0].getStorageID(); // add first block - assertTrue(storages[0].addBlock(blk)); + assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED); assertEquals(1, dd.numBlocks()); // remove a non-existent block assertFalse(dd.removeBlock(blk1)); assertEquals(1, dd.numBlocks()); // add an existent block - assertFalse(storages[0].addBlock(blk)); + assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED); assertEquals(1, dd.numBlocks()); // add second block - assertTrue(storages[0].addBlock(blk1)); + assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED); assertEquals(2, dd.numBlocks()); // remove first block assertTrue(dd.removeBlock(blk)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 3e5034a..e03b756 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1260,5 +1260,12 @@ public class SimulatedFSDataset implements FsDatasetSpi { public void onFailLazyPersist(String bpId, long blockId) { throw new UnsupportedOperationException(); } + + @Override + public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, + StorageType targetStorageType) throws IOException { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index e0d7964..fbdfebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -17,15 +17,14 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -40,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StorageType; @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; @@ -200,7 +201,51 @@ public class TestBlockReplacement { cluster.shutdown(); } } - + + @Test + public void testBlockMoveAcrossStorageInSameNode() throws Exception { + final Configuration conf = new HdfsConfiguration(); + // create only one datanode in the cluster to verify movement within + // datanode. + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).storageTypes( + new StorageType[] { StorageType.DISK, StorageType.ARCHIVE }) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final Path file = new Path("/testBlockMoveAcrossStorageInSameNode/file"); + DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024); + LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0); + // get the current + LocatedBlock locatedBlock = locatedBlocks.get(0); + ExtendedBlock block = locatedBlock.getBlock(); + DatanodeInfo[] locations = locatedBlock.getLocations(); + assertEquals(1, locations.length); + StorageType[] storageTypes = locatedBlock.getStorageTypes(); + // current block should be written to DISK + assertTrue(storageTypes[0] == StorageType.DISK); + + DatanodeInfo source = locations[0]; + // move block to ARCHIVE by using same DataNodeInfo for source, proxy and + // destination so that movement happens within datanode + assertTrue(replaceBlock(block, source, source, source, + StorageType.ARCHIVE)); + + // wait till namenode notified + Thread.sleep(3000); + locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0); + // get the current + locatedBlock = locatedBlocks.get(0); + assertEquals("Storage should be only one", 1, + locatedBlock.getLocations().length); + assertTrue("Block should be moved to ARCHIVE", locatedBlock + .getStorageTypes()[0] == StorageType.ARCHIVE); + } finally { + cluster.shutdown(); + } + } + /* check if file's blocks have expected number of replicas, * and exist at all of includeNodes */ @@ -259,24 +304,42 @@ public class TestBlockReplacement { */ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { + return replaceBlock(block, source, sourceProxy, destination, + StorageType.DEFAULT); + } + + /* + * Replace block + */ + private boolean replaceBlock( + ExtendedBlock block, + DatanodeInfo source, + DatanodeInfo sourceProxy, + DatanodeInfo destination, + StorageType targetStorageType) throws IOException, SocketException { Socket sock = new Socket(); - sock.connect(NetUtils.createSocketAddr( - destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); - sock.setKeepAlive(true); - // sendRequest - DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - new Sender(out).replaceBlock(block, StorageType.DEFAULT, - BlockTokenSecretManager.DUMMY_TOKEN, - source.getDatanodeUuid(), sourceProxy); - out.flush(); - // receiveResponse - DataInputStream reply = new DataInputStream(sock.getInputStream()); + try { + sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), + HdfsServerConstants.READ_TIMEOUT); + sock.setKeepAlive(true); + // sendRequest + DataOutputStream out = new DataOutputStream(sock.getOutputStream()); + new Sender(out).replaceBlock(block, targetStorageType, + BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), + sourceProxy); + out.flush(); + // receiveResponse + DataInputStream reply = new DataInputStream(sock.getInputStream()); - BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); - while (proto.getStatus() == Status.IN_PROGRESS) { - proto = BlockOpResponseProto.parseDelimitedFrom(reply); + BlockOpResponseProto proto = + BlockOpResponseProto.parseDelimitedFrom(reply); + while (proto.getStatus() == Status.IN_PROGRESS) { + proto = BlockOpResponseProto.parseDelimitedFrom(reply); + } + return proto.getStatus() == Status.SUCCESS; + } finally { + sock.close(); } - return proto.getStatus() == Status.SUCCESS; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 5866c7f..c9fc5ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.Test; @@ -79,6 +79,52 @@ public class TestMover { } } + @Test + public void testScheduleBlockWithinSameNode() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .storageTypes( + new StorageType[] { StorageType.DISK, StorageType.ARCHIVE }) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testScheduleWithinSameNode/file"; + Path dir = new Path("/testScheduleWithinSameNode"); + dfs.mkdirs(dir); + // write to DISK + dfs.setStoragePolicy(dir, "HOT"); + { + final FSDataOutputStream out = dfs.create(new Path(file)); + out.writeChars("testScheduleWithinSameNode"); + out.close(); + } + + //verify before movement + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + StorageType[] storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.DISK == storageType); + } + // move to ARCHIVE + dfs.setStoragePolicy(dir, "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] { "-p", dir.toString() }); + Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc); + + // Wait till namenode notified + Thread.sleep(3000); + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.ARCHIVE == storageType); + } + } finally { + cluster.shutdown(); + } + } + private void checkMovePaths(List actual, Path... expected) { Assert.assertEquals(expected.length, actual.size()); for (Path p : expected) {