Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7020A200D04 for ; Sun, 27 Aug 2017 09:15:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6E97116460E; Sun, 27 Aug 2017 07:15:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE78B16460A for ; Sun, 27 Aug 2017 09:15:37 +0200 (CEST) Received: (qmail 72585 invoked by uid 500); 27 Aug 2017 07:15:32 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 69149 invoked by uid 99); 27 Aug 2017 07:15:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Aug 2017 07:15:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BAAE2F5F2F; Sun, 27 Aug 2017 07:15:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Sun, 27 Aug 2017 07:15:42 -0000 Message-Id: <8dac828057b94578b87e4d6f8b021457@git.apache.org> In-Reply-To: <112934a642e8481caf7e783742a9ecd3@git.apache.org> References: <112934a642e8481caf7e783742a9ecd3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] hadoop git commit: HDFS-10884: [SPS]: Add block movement tracker to track the completion of block movement future tasks at DN. Contributed by Rakesh R archived-at: Sun, 27 Aug 2017 07:15:39 -0000 HDFS-10884: [SPS]: Add block movement tracker to track the completion of block movement future tasks at DN. Contributed by Rakesh R Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5058b09 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5058b09 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5058b09 Branch: refs/heads/HDFS-10285 Commit: d5058b09f2f5abb725559f9dd26f29a275978243 Parents: 1b7b964 Author: Uma Maheswara Rao G Authored: Tue Oct 25 00:40:45 2016 -0700 Committer: Rakesh Radhakrishnan Committed: Sun Aug 27 11:50:13 2017 +0530 ---------------------------------------------------------------------- .../datanode/BlockStorageMovementTracker.java | 146 ++++++++++++++ .../datanode/StoragePolicySatisfyWorker.java | 178 +++++++++++++---- .../protocol/BlockStorageMovementCommand.java | 12 +- .../TestStoragePolicySatisfyWorker.java | 190 +++++++++++++------ 4 files changed, 427 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5058b09/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java new file mode 100644 index 0000000..d31f075 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to track the completion of block movement future tasks. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class BlockStorageMovementTracker implements Runnable { + private static final Logger LOG = LoggerFactory + .getLogger(BlockStorageMovementTracker.class); + private final CompletionService moverCompletionService; + private final BlocksMovementsCompletionHandler blksMovementscompletionHandler; + + // Keeps the information - trackID vs its list of blocks + private final Map>> moverTaskFutures; + private final Map> movementResults; + + /** + * BlockStorageMovementTracker constructor. + * + * @param moverCompletionService + * completion service. + * @param handler + * blocks movements completion handler + */ + public BlockStorageMovementTracker( + CompletionService moverCompletionService, + BlocksMovementsCompletionHandler handler) { + this.moverCompletionService = moverCompletionService; + this.moverTaskFutures = new HashMap<>(); + this.blksMovementscompletionHandler = handler; + this.movementResults = new HashMap<>(); + } + + @Override + public void run() { + while (true) { + if (moverTaskFutures.size() <= 0) { + try { + synchronized (moverTaskFutures) { + // Waiting for mover tasks. + moverTaskFutures.wait(2000); + } + } catch (InterruptedException ignore) { + // ignore + } + } + try { + Future future = moverCompletionService.take(); + if (future != null) { + BlockMovementResult result = future.get(); + LOG.debug("Completed block movement. {}", result); + long trackId = result.getTrackId(); + List> blocksMoving = moverTaskFutures + .get(trackId); + blocksMoving.remove(future); + + List resultPerTrackIdList = + addMovementResultToTrackIdList(result); + + // Completed all the scheduled blocks movement under this 'trackId'. + if (blocksMoving.isEmpty()) { + synchronized (moverTaskFutures) { + moverTaskFutures.remove(trackId); + } + // handle completed blocks movements per trackId. + blksMovementscompletionHandler.handle(resultPerTrackIdList); + movementResults.remove(trackId); + } + } + } catch (ExecutionException | InterruptedException e) { + // TODO: Do we need failure retries and implement the same if required. + LOG.error("Exception while moving block replica to target storage type", + e); + } + } + } + + private List addMovementResultToTrackIdList( + BlockMovementResult result) { + long trackId = result.getTrackId(); + List perTrackIdList = movementResults.get(trackId); + if (perTrackIdList == null) { + perTrackIdList = new ArrayList<>(); + movementResults.put(trackId, perTrackIdList); + } + perTrackIdList.add(result); + return perTrackIdList; + } + + /** + * Add future task to the tracking list to check the completion status of the + * block movement. + * + * @param trackID + * tracking Id + * @param futureTask + * future task used for moving the respective block + */ + void addBlock(long trackID, Future futureTask) { + synchronized (moverTaskFutures) { + List> futures = moverTaskFutures + .get(Long.valueOf(trackID)); + // null for the first task + if (futures == null) { + futures = new ArrayList<>(); + moverTaskFutures.put(trackID, futures); + } + futures.add(futureTask); + // Notify waiting tracker thread about the newly added tasks. + moverTaskFutures.notify(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5058b09/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 2c99963..604fb4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -33,7 +33,6 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -65,6 +64,8 @@ import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * StoragePolicySatisfyWorker handles the storage policy satisfier commands. * These commands would be issued from NameNode as part of Datanode's heart beat @@ -82,8 +83,10 @@ public class StoragePolicySatisfyWorker { private final int moverThreads; private final ExecutorService moveExecutor; - private final CompletionService moverExecutorCompletionService; - private final List> moverTaskFutures; + private final CompletionService moverCompletionService; + private final BlocksMovementsCompletionHandler handler; + private final BlockStorageMovementTracker movementTracker; + private Daemon movementTrackerThread; public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; @@ -92,9 +95,13 @@ public class StoragePolicySatisfyWorker { moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); - moverExecutorCompletionService = new ExecutorCompletionService<>( - moveExecutor); - moverTaskFutures = new ArrayList<>(); + moverCompletionService = new ExecutorCompletionService<>(moveExecutor); + handler = new BlocksMovementsCompletionHandler(); + movementTracker = new BlockStorageMovementTracker(moverCompletionService, + handler); + movementTrackerThread = new Daemon(movementTracker); + movementTrackerThread.setName("BlockStorageMovementTracker"); + movementTrackerThread.start(); // TODO: Needs to manage the number of concurrent moves per DataNode. } @@ -133,10 +140,6 @@ public class StoragePolicySatisfyWorker { * separate thread. Each task will move the block replica to the target node & * wait for the completion. * - * TODO: Presently this function is a blocking call, this has to be refined by - * moving the tracking logic to another tracker thread. HDFS-10884 jira - * addresses the same. - * * @param trackID * unique tracking identifier * @param blockPoolID @@ -146,68 +149,64 @@ public class StoragePolicySatisfyWorker { */ public void processBlockMovingTasks(long trackID, String blockPoolID, Collection blockMovingInfos) { - Future moveCallable = null; for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { assert blkMovingInfo .getSources().length == blkMovingInfo.getTargets().length; for (int i = 0; i < blkMovingInfo.getSources().length; i++) { BlockMovingTask blockMovingTask = new BlockMovingTask( - blkMovingInfo.getBlock(), blockPoolID, + trackID, blockPoolID, blkMovingInfo.getBlock(), blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i], + blkMovingInfo.getSourceStorageTypes()[i], blkMovingInfo.getTargetStorageTypes()[i]); - moveCallable = moverExecutorCompletionService.submit(blockMovingTask); - moverTaskFutures.add(moveCallable); - } - } - - for (int i = 0; i < moverTaskFutures.size(); i++) { - try { - moveCallable = moverExecutorCompletionService.take(); - moveCallable.get(); - } catch (InterruptedException | ExecutionException e) { - // TODO: Failure retries and report back the error to NameNode. - LOG.error("Exception while moving block replica to target storage type", - e); + Future moveCallable = moverCompletionService + .submit(blockMovingTask); + movementTracker.addBlock(trackID, moveCallable); } } } /** * This class encapsulates the process of moving the block replica to the - * given target. + * given target and wait for the response. */ - private class BlockMovingTask implements Callable { + private class BlockMovingTask implements Callable { + private final long trackID; + private final String blockPoolID; private final Block block; private final DatanodeInfo source; private final DatanodeInfo target; + private final StorageType srcStorageType; private final StorageType targetStorageType; - private String blockPoolID; - BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source, - DatanodeInfo target, StorageType targetStorageType) { - this.block = block; + BlockMovingTask(long trackID, String blockPoolID, Block block, + DatanodeInfo source, DatanodeInfo target, + StorageType srcStorageType, StorageType targetStorageType) { + this.trackID = trackID; this.blockPoolID = blockPoolID; + this.block = block; this.source = source; this.target = target; + this.srcStorageType = srcStorageType; this.targetStorageType = targetStorageType; } @Override - public Void call() { - moveBlock(); - return null; + public BlockMovementResult call() { + BlockMovementStatus status = moveBlock(); + return new BlockMovementResult(trackID, block.getBlockId(), target, + status); } - private void moveBlock() { - LOG.info("Start moving block {}", block); - - LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy " - + "storageType:{}", block, source, target, targetStorageType); + private BlockMovementStatus moveBlock() { + LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " + + "storageType, sourceStoragetype:{} and destinStoragetype:{}", + block, source, target, srcStorageType, targetStorageType); Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { + ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block); DNConf dnConf = datanode.getDnConf(); String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname()); sock = datanode.newSocket(); @@ -218,7 +217,6 @@ public class StoragePolicySatisfyWorker { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); - ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block); Token accessToken = datanode.getBlockAccessToken( extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); @@ -239,12 +237,14 @@ public class StoragePolicySatisfyWorker { "Successfully moved block:{} from src:{} to destin:{} for" + " satisfying storageType:{}", block, source, target, targetStorageType); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; } catch (IOException e) { // TODO: handle failure retries LOG.warn( "Failed to move block:{} from src:{} to destin:{} to satisfy " + "storageType:{}", block, source, target, targetStorageType, e); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); @@ -272,4 +272,102 @@ public class StoragePolicySatisfyWorker { DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); } } + + /** + * Block movement status code. + */ + enum BlockMovementStatus { + /** Success. */ + DN_BLK_STORAGE_MOVEMENT_SUCCESS(0), + /** + * Failure due to generation time stamp mismatches or network errors + * or no available space. + */ + DN_BLK_STORAGE_MOVEMENT_FAILURE(-1); + + // TODO: need to support different type of failures. Failure due to network + // errors, block pinned, no space available etc. + + private final int code; + + private BlockMovementStatus(int code) { + this.code = code; + } + + /** + * @return the status code. + */ + int getStatusCode() { + return code; + } + } + + /** + * This class represents result from a block movement task. This will have the + * information of the task which was successful or failed due to errors. + */ + static class BlockMovementResult { + private final long trackId; + private final long blockId; + private final DatanodeInfo target; + private final BlockMovementStatus status; + + public BlockMovementResult(long trackId, long blockId, + DatanodeInfo target, BlockMovementStatus status) { + this.trackId = trackId; + this.blockId = blockId; + this.target = target; + this.status = status; + } + + long getTrackId() { + return trackId; + } + + long getBlockId() { + return blockId; + } + + BlockMovementStatus getStatus() { + return status; + } + + @Override + public String toString() { + return new StringBuilder().append("Block movement result(\n ") + .append("track id: ").append(trackId).append(" block id: ") + .append(blockId).append(" target node: ").append(target) + .append(" movement status: ").append(status).append(")").toString(); + } + } + + /** + * Blocks movements completion handler, which is used to collect details of + * the completed list of block movements and notify the namenode about the + * success or failures. + */ + static class BlocksMovementsCompletionHandler { + private final List completedBlocks = new ArrayList<>(); + + /** + * Collect all the block movement results and notify namenode. + * + * @param results + * result of all the block movements per trackId + */ + void handle(List results) { + completedBlocks.addAll(results); + // TODO: notify namenode about the success/failures. + } + + @VisibleForTesting + List getCompletedBlocks() { + return completedBlocks; + } + } + + @VisibleForTesting + BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() { + return handler; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5058b09/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java index 7c97f1a..5dcf4e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java @@ -35,10 +35,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; * service. After the block movement this DataNode sends response back to the * NameNode about the movement status. * - * The coordinator datanode will use 'trackId' identifier to coordinate the block - * movement of the given set of blocks. TrackId is a unique identifier that - * represents a group of blocks. Namenode will generate this unique value and - * send it to the coordinator datanode along with the + * The coordinator datanode will use 'trackId' identifier to coordinate the + * block movement of the given set of blocks. TrackId is a unique identifier + * that represents a group of blocks. Namenode will generate this unique value + * and send it to the coordinator datanode along with the * BlockStorageMovementCommand. Datanode will monitor the completion of the * block movements that grouped under this trackId and notifies Namenode about * the completion status. @@ -153,11 +153,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand { return new StringBuilder().append("BlockMovingInfo(\n ") .append("Moving block: ").append(blk).append(" From: ") .append(Arrays.asList(sourceNodes)).append(" To: [") - .append(Arrays.asList(targetNodes)).append(")\n") + .append(Arrays.asList(targetNodes)).append("\n ") .append(" sourceStorageTypes: ") .append(Arrays.toString(sourceStorageTypes)) .append(" targetStorageTypes: ") - .append(Arrays.toString(targetStorageTypes)).toString(); + .append(Arrays.toString(targetStorageTypes)).append(")").toString(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5058b09/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index d803f1a..ea3eec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -33,10 +34,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +57,9 @@ public class TestStoragePolicySatisfyWorker { private static final Logger LOG = LoggerFactory .getLogger(TestStoragePolicySatisfyWorker.class); - private static final int DEFAULT_BLOCK_SIZE = 100; + private MiniDFSCluster cluster = null; + private final Configuration conf = new HdfsConfiguration(); private static void initConf(Configuration conf) { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); @@ -63,64 +70,141 @@ public class TestStoragePolicySatisfyWorker { conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); } + @Before + public void setUp() throws IOException { + initConf(conf); + } + + @After + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + /** * Tests to verify that the block replica is moving to ARCHIVE storage type to * fulfill the storage policy requirement. */ @Test(timeout = 120000) public void testMoveSingleBlockToAnotherDatanode() throws Exception { - final Configuration conf = new HdfsConfiguration(); - initConf(conf); - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(4) - .storageTypes( - new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}}) - .build(); - try { - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String file = "/testMoveSingleBlockToAnotherDatanode"; - // write to DISK - final FSDataOutputStream out = dfs.create(new Path(file), (short) 2); - out.writeChars("testMoveSingleBlockToAnotherDatanode"); - out.close(); - - // verify before movement - LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); - StorageType[] storageTypes = lb.getStorageTypes(); - for (StorageType storageType : storageTypes) { - Assert.assertTrue(StorageType.DISK == storageType); - } - // move to ARCHIVE - dfs.setStoragePolicy(new Path(file), "COLD"); - - lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); - DataNode src = cluster.getDataNodes().get(3); - DatanodeInfo targetDnInfo = DFSTestUtil - .getLocalDatanodeInfo(src.getXferPort()); - - // TODO: Need to revisit this when NN is implemented to be able to send - // block moving commands. - StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf, - src); - List blockMovingInfos = new ArrayList<>(); - BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo( - lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo, - lb.getStorageTypes()[0], StorageType.ARCHIVE); - blockMovingInfos.add(blockMovingInfo); - INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); - worker.processBlockMovingTasks(inode.getId(), - cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); - cluster.triggerHeartbeats(); - - // Wait till NameNode notified about the block location details - waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000); - } finally { - cluster.shutdown(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}) + .build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoveSingleBlockToAnotherDatanode"; + // write to DISK + final FSDataOutputStream out = dfs.create(new Path(file), (short) 2); + out.writeChars("testMoveSingleBlockToAnotherDatanode"); + out.close(); + + // verify before movement + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + StorageType[] storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.DISK == storageType); + } + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + + FSNamesystem namesystem = cluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + + cluster.triggerHeartbeats(); + + // Wait till NameNode notified about the block location details + waitForLocatedBlockWithArchiveStorageType(dfs, file, 2, 30000); + } + + /** + * Test to verify that satisfy worker can't move blocks. If specified target + * datanode doesn't have enough space to accommodate the moving block. + */ + @Test(timeout = 120000) + public void testMoveWithNoSpaceAvailable() throws Exception { + final long capacity = 150; + final String rack0 = "/rack0"; + final String rack1 = "/rack1"; + long[] capacities = new long[] {capacity, capacity, capacity / 2}; + String[] hosts = {"host0", "host1", "host2"}; + String[] racks = {rack0, rack1, rack0}; + int numOfDatanodes = capacities.length; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numOfDatanodes) + .hosts(hosts).racks(racks).simulatedCapacities(capacities) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}) + .build(); + + cluster.waitActive(); + InetSocketAddress[] favoredNodes = new InetSocketAddress[3]; + for (int i = 0; i < favoredNodes.length; i++) { + // DFSClient will attempt reverse lookup. In case it resolves + // "127.0.0.1" to "localhost", we manually specify the hostname. + favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress(); + } + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoveWithNoSpaceAvailable"; + DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 100, + DEFAULT_BLOCK_SIZE, (short) 2, 0, false, favoredNodes); + + // verify before movement + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + StorageType[] storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.DISK == storageType); } + + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + DataNode src = cluster.getDataNodes().get(2); + DatanodeInfo targetDnInfo = DFSTestUtil + .getLocalDatanodeInfo(src.getXferPort()); + + StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf, + src); + List blockMovingInfos = new ArrayList<>(); + BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo( + lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo, + lb.getStorageTypes()[0], StorageType.ARCHIVE); + blockMovingInfos.add(blockMovingInfo); + INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); + worker.processBlockMovingTasks(inode.getId(), + cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); + + waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000); + } + + private void waitForBlockMovementCompletion( + final StoragePolicySatisfyWorker worker, final long inodeId, + int expectedFailedItemsCount, int timeout) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + List completedBlocks = worker + .getBlocksMovementsCompletionHandler().getCompletedBlocks(); + int failedCount = 0; + for (BlockMovementResult blockMovementResult : completedBlocks) { + if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE == + blockMovementResult.getStatus()) { + failedCount++; + } + } + LOG.info("Block movement completed count={}, expected={} and actual={}", + completedBlocks.size(), expectedFailedItemsCount, failedCount); + return expectedFailedItemsCount == failedCount; + } + }, 100, timeout); } private void waitForLocatedBlockWithArchiveStorageType( @@ -150,7 +234,7 @@ public class TestStoragePolicySatisfyWorker { }, 100, timeout); } - BlockMovingInfo prepareBlockMovingInfo(Block block, + private BlockMovingInfo prepareBlockMovingInfo(Block block, DatanodeInfo src, DatanodeInfo destin, StorageType storageType, StorageType targetStorageType) { return new BlockMovingInfo(block, new DatanodeInfo[] {src}, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org