Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 439B31142A for ; Fri, 8 Aug 2014 21:34:32 +0000 (UTC) Received: (qmail 48727 invoked by uid 500); 8 Aug 2014 21:34:32 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 48676 invoked by uid 500); 8 Aug 2014 21:34:32 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 48665 invoked by uid 99); 8 Aug 2014 21:34:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Aug 2014 21:34:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Aug 2014 21:34:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 349BF2388999; Fri, 8 Aug 2014 21:33:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1616889 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/test/java/org/apache/hadoop/hdfs/server/balancer/ Date: Fri, 08 Aug 2014 21:33:57 -0000 To: hdfs-commits@hadoop.apache.org From: jing9@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140808213358.349BF2388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jing9 Date: Fri Aug 8 21:33:57 2014 New Revision: 1616889 URL: http://svn.apache.org/r1616889 Log: HDFS-6828. Separate block replica dispatching from Balancer. Contributed by Tsz Wo Nicholas Sze. Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616889&r1=1616888&r2=1616889&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 8 21:33:57 2014 @@ -384,6 +384,9 @@ Release 2.6.0 - UNRELEASED HDFS-573. Porting libhdfs to Windows. (cnauroth) + HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via + jing9) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1616889&r1=1616888&r2=1616889&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug 8 21:33:57 2014 @@ -18,19 +18,9 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; + import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintStream; -import java.net.Socket; import java.net.URI; import java.text.DateFormat; import java.util.ArrayList; @@ -38,20 +28,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.EnumMap; import java.util.Formatter; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,31 +44,15 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -200,15 +165,7 @@ public class Balancer { private static final long GB = 1L << 30; //1GB private static final long MAX_SIZE_TO_MOVE = 10*GB; - private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; - /** The maximum number of concurrent blocks moves for - * balancing purpose at a datanode - */ - private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5; - public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds - public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes - private static final String USAGE = "Usage: java " + Balancer.class.getSimpleName() + "\n\t[-policy ]\tthe balancing policy: " @@ -220,16 +177,9 @@ public class Balancer { + "\n\t[-include [-f | comma-sperated list of hosts]]" + "\tIncludes only the specified datanodes."; - private final NameNodeConnector nnc; - private final KeyManager keyManager; - + private final Dispatcher dispatcher; private final BalancingPolicy policy; - private final SaslDataTransferClient saslClient; private final double threshold; - // set of data nodes to be excluded from balancing operations. - Set nodesToBeExcluded; - //Restrict balancing to the following nodes. - Set nodesToBeIncluded; // all data node lists private final Collection overUtilized = new LinkedList(); @@ -238,634 +188,6 @@ public class Balancer { = new LinkedList(); private final Collection underUtilized = new LinkedList(); - - private final Collection sources = new HashSet(); - private final Collection targets - = new HashSet(); - - private final Map globalBlockList - = new HashMap(); - private final MovedBlocks movedBlocks; - - /** Map (datanodeUuid,storageType -> StorageGroup) */ - private final StorageGroupMap storageGroupMap = new StorageGroupMap(); - - private NetworkTopology cluster; - - private final ExecutorService moverExecutor; - private final ExecutorService dispatcherExecutor; - private final int maxConcurrentMovesPerNode; - - - private static class StorageGroupMap { - private static String toKey(String datanodeUuid, StorageType storageType) { - return datanodeUuid + ":" + storageType; - } - - private final Map map - = new HashMap(); - - BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) { - return map.get(toKey(datanodeUuid, storageType)); - } - - void put(BalancerDatanode.StorageGroup g) { - final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); - final BalancerDatanode.StorageGroup existing = map.put(key, g); - Preconditions.checkState(existing == null); - } - - int size() { - return map.size(); - } - - void clear() { - map.clear(); - } - } - /* This class keeps track of a scheduled block move */ - private class PendingBlockMove { - private BalancerBlock block; - private Source source; - private BalancerDatanode proxySource; - private BalancerDatanode.StorageGroup target; - - /** constructor */ - private PendingBlockMove() { - } - - @Override - public String toString() { - final Block b = block.getBlock(); - return b + " with size=" + b.getNumBytes() + " from " - + source.getDisplayName() + " to " + target.getDisplayName() - + " through " + proxySource.datanode; - } - - /* choose a block & a proxy source for this pendingMove - * whose source & target have already been chosen. - * - * Return true if a block and its proxy are chosen; false otherwise - */ - private boolean chooseBlockAndProxy() { - // iterate all source's blocks until find a good one - for (Iterator blocks= - source.getBlockIterator(); blocks.hasNext();) { - if (markMovedIfGoodBlock(blocks.next())) { - blocks.remove(); - return true; - } - } - return false; - } - - /* Return true if the given block is good for the tentative move; - * If it is good, add it to the moved list to marked as "Moved". - * A block is good if - * 1. it is a good candidate; see isGoodBlockCandidate - * 2. can find a proxy source that's not busy for this move - */ - private boolean markMovedIfGoodBlock(BalancerBlock block) { - synchronized(block) { - synchronized(movedBlocks) { - if (isGoodBlockCandidate(source, target, block)) { - this.block = block; - if ( chooseProxySource() ) { - movedBlocks.put(block); - if (LOG.isDebugEnabled()) { - LOG.debug("Decided to move " + this); - } - return true; - } - } - } - } - return false; - } - - /* Now we find out source, target, and block, we need to find a proxy - * - * @return true if a proxy is found; otherwise false - */ - private boolean chooseProxySource() { - final DatanodeInfo targetDN = target.getDatanode(); - // if node group is supported, first try add nodes in the same node group - if (cluster.isNodeGroupAware()) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) { - return true; - } - } - } - // check if there is replica which is on the same rack with the target - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { - return true; - } - } - // find out a non-busy replica - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (addTo(loc)) { - return true; - } - } - return false; - } - - /** add to a proxy source for specific block movement */ - private boolean addTo(BalancerDatanode.StorageGroup g) { - final BalancerDatanode bdn = g.getBalancerDatanode(); - if (bdn.addPendingBlock(this)) { - proxySource = bdn; - return true; - } - return false; - } - - /* Dispatch the block move task to the proxy source & wait for the response - */ - private void dispatch() { - Socket sock = new Socket(); - DataOutputStream out = null; - DataInputStream in = null; - try { - sock.connect( - NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), - HdfsServerConstants.READ_TIMEOUT); - /* Unfortunately we don't have a good way to know if the Datanode is - * taking a really long time to move a block, OR something has - * gone wrong and it's never going to finish. To deal with this - * scenario, we set a long timeout (20 minutes) to avoid hanging - * the balancer indefinitely. - */ - sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); - - sock.setKeepAlive(true); - - OutputStream unbufOut = sock.getOutputStream(); - InputStream unbufIn = sock.getInputStream(); - ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); - Token accessToken = keyManager.getAccessToken(eb); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyManager, accessToken, target.getDatanode()); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - in = new DataInputStream(new BufferedInputStream(unbufIn, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - - sendRequest(out, eb, StorageType.DEFAULT, accessToken); - receiveResponse(in); - bytesMoved.addAndGet(block.getNumBytes()); - LOG.info("Successfully moved " + this); - } catch (IOException e) { - LOG.warn("Failed to move " + this + ": " + e.getMessage()); - /* proxy or target may have an issue, insert a small delay - * before using these nodes further. This avoids a potential storm - * of "threads quota exceeded" Warnings when the balancer - * gets out of sync with work going on in datanode. - */ - proxySource.activateDelay(DELAY_AFTER_ERROR); - target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); - } finally { - IOUtils.closeStream(out); - IOUtils.closeStream(in); - IOUtils.closeSocket(sock); - - proxySource.removePendingBlock(this); - target.getBalancerDatanode().removePendingBlock(this); - - synchronized (this ) { - reset(); - } - synchronized (Balancer.this) { - Balancer.this.notifyAll(); - } - } - } - - /* Send a block replace request to the output stream*/ - private void sendRequest(DataOutputStream out, ExtendedBlock eb, - StorageType storageType, - Token accessToken) throws IOException { - new Sender(out).replaceBlock(eb, storageType, accessToken, - source.getDatanode().getDatanodeUuid(), proxySource.datanode); - } - - /* Receive a block copy response from the input stream */ - private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto.parseFrom( - vintPrefixed(in)); - if (response.getStatus() != Status.SUCCESS) { - if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) - throw new IOException("block move failed due to access token error"); - throw new IOException("block move is failed: " + - response.getMessage()); - } - } - - /* reset the object */ - private void reset() { - block = null; - source = null; - proxySource = null; - target = null; - } - - /* start a thread to dispatch the block move */ - private void scheduleBlockMove() { - moverExecutor.execute(new Runnable() { - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Start moving " + PendingBlockMove.this); - } - dispatch(); - } - }); - } - } - - /* A class for keeping track of blocks in the Balancer */ - static class BalancerBlock extends MovedBlocks.Locations { - BalancerBlock(Block block) { - super(block); - } - } - - /* The class represents a desired move of bytes between two nodes - * and the target. - * An object of this class is stored in a source. - */ - static private class Task { - private final BalancerDatanode.StorageGroup target; - private long size; //bytes scheduled to move - - /* constructor */ - private Task(BalancerDatanode.StorageGroup target, long size) { - this.target = target; - this.size = size; - } - } - - - /* A class that keeps track of a datanode in Balancer */ - private static class BalancerDatanode { - - /** A group of storages in a datanode with the same storage type. */ - private class StorageGroup { - final StorageType storageType; - final double utilization; - final long maxSize2Move; - private long scheduledSize = 0L; - - private StorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { - this.storageType = storageType; - this.utilization = utilization; - this.maxSize2Move = maxSize2Move; - } - - BalancerDatanode getBalancerDatanode() { - return BalancerDatanode.this; - } - - DatanodeInfo getDatanode() { - return BalancerDatanode.this.datanode; - } - - /** Decide if still need to move more bytes */ - protected synchronized boolean hasSpaceForScheduling() { - return availableSizeToMove() > 0L; - } - - /** @return the total number of bytes that need to be moved */ - synchronized long availableSizeToMove() { - return maxSize2Move - scheduledSize; - } - - /** increment scheduled size */ - synchronized void incScheduledSize(long size) { - scheduledSize += size; - } - - /** @return scheduled size */ - synchronized long getScheduledSize() { - return scheduledSize; - } - - /** Reset scheduled size to zero. */ - synchronized void resetScheduledSize() { - scheduledSize = 0L; - } - - /** @return the name for display */ - String getDisplayName() { - return datanode + ":" + storageType; - } - - @Override - public String toString() { - return "" + utilization; - } - } - - final DatanodeInfo datanode; - final EnumMap storageMap - = new EnumMap(StorageType.class); - protected long delayUntil = 0L; - // blocks being moved but not confirmed yet - private final List pendingBlocks; - private final int maxConcurrentMoves; - - @Override - public String toString() { - return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; - } - - /* Constructor - * Depending on avgutil & threshold, calculate maximum bytes to move - */ - private BalancerDatanode(DatanodeStorageReport report, - double threshold, int maxConcurrentMoves) { - this.datanode = report.getDatanodeInfo(); - this.maxConcurrentMoves = maxConcurrentMoves; - this.pendingBlocks = new ArrayList(maxConcurrentMoves); - } - - private void put(StorageType storageType, StorageGroup g) { - final StorageGroup existing = storageMap.put(storageType, g); - Preconditions.checkState(existing == null); - } - - StorageGroup addStorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { - final StorageGroup g = new StorageGroup(storageType, utilization, - maxSize2Move); - put(storageType, g); - return g; - } - - Source addSource(StorageType storageType, double utilization, - long maxSize2Move, Balancer balancer) { - final Source s = balancer.new Source(storageType, utilization, - maxSize2Move, this); - put(storageType, s); - return s; - } - - synchronized private void activateDelay(long delta) { - delayUntil = Time.now() + delta; - } - - synchronized private boolean isDelayActive() { - if (delayUntil == 0 || Time.now() > delayUntil){ - delayUntil = 0; - return false; - } - return true; - } - - /* Check if the node can schedule more blocks to move */ - synchronized private boolean isPendingQNotFull() { - if ( pendingBlocks.size() < this.maxConcurrentMoves ) { - return true; - } - return false; - } - - /* Check if all the dispatched moves are done */ - synchronized private boolean isPendingQEmpty() { - return pendingBlocks.isEmpty(); - } - - /* Add a scheduled block move to the node */ - private synchronized boolean addPendingBlock( - PendingBlockMove pendingBlock) { - if (!isDelayActive() && isPendingQNotFull()) { - return pendingBlocks.add(pendingBlock); - } - return false; - } - - /* Remove a scheduled block move from the node */ - private synchronized boolean removePendingBlock( - PendingBlockMove pendingBlock) { - return pendingBlocks.remove(pendingBlock); - } - } - - /** A node that can be the sources of a block move */ - private class Source extends BalancerDatanode.StorageGroup { - - /* A thread that initiates a block move - * and waits for block move to complete */ - private class BlockMoveDispatcher implements Runnable { - @Override - public void run() { - dispatchBlocks(); - } - } - - private final List tasks = new ArrayList(2); - private long blocksToReceive = 0L; - /* source blocks point to balancerBlocks in the global list because - * we want to keep one copy of a block in balancer and be aware that - * the locations are changing over time. - */ - private final List srcBlockList - = new ArrayList(); - - /* constructor */ - private Source(StorageType storageType, double utilization, - long maxSize2Move, BalancerDatanode dn) { - dn.super(storageType, utilization, maxSize2Move); - } - - /** Add a task */ - private void addTask(Task task) { - Preconditions.checkState(task.target != this, - "Source and target are the same storage group " + getDisplayName()); - incScheduledSize(task.size); - tasks.add(task); - } - - /* Return an iterator to this source's blocks */ - private Iterator getBlockIterator() { - return srcBlockList.iterator(); - } - - /* fetch new blocks of this source from namenode and - * update this source's block list & the global block list - * Return the total size of the received blocks in the number of bytes. - */ - private long getBlockList() throws IOException { - final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks( - getDatanode(), size).getBlocks(); - - long bytesReceived = 0; - for (BlockWithLocations blk : newBlocks) { - bytesReceived += blk.getBlock().getNumBytes(); - BalancerBlock block; - synchronized(globalBlockList) { - block = globalBlockList.get(blk.getBlock()); - if (block==null) { - block = new BalancerBlock(blk.getBlock()); - globalBlockList.put(blk.getBlock(), block); - } else { - block.clearLocations(); - } - - synchronized (block) { - // update locations - final String[] datanodeUuids = blk.getDatanodeUuids(); - final StorageType[] storageTypes = blk.getStorageTypes(); - for (int i = 0; i < datanodeUuids.length; i++) { - final BalancerDatanode.StorageGroup g = storageGroupMap.get( - datanodeUuids[i], storageTypes[i]); - if (g != null) { // not unknown - block.addLocation(g); - } - } - } - if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) { - // filter bad candidates - srcBlockList.add(block); - } - } - } - return bytesReceived; - } - - /* Decide if the given block is a good candidate to move or not */ - private boolean isGoodBlockCandidate(BalancerBlock block) { - for (Task t : tasks) { - if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) { - return true; - } - } - return false; - } - - /* Return a block that's good for the source thread to dispatch immediately - * The block's source, target, and proxy source are determined too. - * When choosing proxy and target, source & target throttling - * has been considered. They are chosen only when they have the capacity - * to support this block move. - * The block should be dispatched immediately after this method is returned. - */ - private PendingBlockMove chooseNextBlockToMove() { - for (Iterator i = tasks.iterator(); i.hasNext();) { - final Task task = i.next(); - final BalancerDatanode target = task.target.getBalancerDatanode(); - PendingBlockMove pendingBlock = new PendingBlockMove(); - if (target.addPendingBlock(pendingBlock)) { - // target is not busy, so do a tentative block allocation - pendingBlock.source = this; - pendingBlock.target = task.target; - if ( pendingBlock.chooseBlockAndProxy() ) { - long blockSize = pendingBlock.block.getNumBytes(); - incScheduledSize(-blockSize); - task.size -= blockSize; - if (task.size == 0) { - i.remove(); - } - return pendingBlock; - } else { - // cancel the tentative move - target.removePendingBlock(pendingBlock); - } - } - } - return null; - } - - /* iterate all source's blocks to remove moved ones */ - private void filterMovedBlocks() { - for (Iterator blocks=getBlockIterator(); - blocks.hasNext();) { - if (movedBlocks.contains(blocks.next().getBlock())) { - blocks.remove(); - } - } - } - - private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5; - /* Return if should fetch more blocks from namenode */ - private boolean shouldFetchMoreBlocks() { - return srcBlockList.size()0; - } - - /* This method iteratively does the following: - * it first selects a block to move, - * then sends a request to the proxy source to start the block move - * when the source's block list falls below a threshold, it asks - * the namenode for more blocks. - * It terminates when it has dispatch enough block move tasks or - * it has received enough blocks from the namenode, or - * the elapsed time of the iteration has exceeded the max time limit. - */ - private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins - private void dispatchBlocks() { - long startTime = Time.now(); - long scheduledSize = getScheduledSize(); - this.blocksToReceive = 2*scheduledSize; - boolean isTimeUp = false; - int noPendingBlockIteration = 0; - while(!isTimeUp && getScheduledSize()>0 && - (!srcBlockList.isEmpty() || blocksToReceive>0)) { - PendingBlockMove pendingBlock = chooseNextBlockToMove(); - if (pendingBlock != null) { - // move the block - pendingBlock.scheduleBlockMove(); - continue; - } - - /* Since we can not schedule any block to move, - * filter any moved blocks from the source block list and - * check if we should fetch more blocks from the namenode - */ - filterMovedBlocks(); // filter already moved blocks - if (shouldFetchMoreBlocks()) { - // fetch new blocks - try { - blocksToReceive -= getBlockList(); - continue; - } catch (IOException e) { - LOG.warn("Exception while getting block list", e); - return; - } - } else { - // source node cannot find a pendingBlockToMove, iteration +1 - noPendingBlockIteration++; - // in case no blocks can be moved for source node's task, - // jump out of while-loop after 5 iterations. - if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - resetScheduledSize(); - } - } - - // check if time is up or not - if (Time.now()-startTime > MAX_ITERATION_TIME) { - isTimeUp = true; - continue; - } - - /* Now we can not schedule any block to move and there are - * no new blocks added to the source block list, so we wait. - */ - try { - synchronized(Balancer.this) { - Balancer.this.wait(1000); // wait for targets/sources to be idle - } - } catch (InterruptedException ignored) { - } - } - } - } /* Check that this Balancer is compatible with the Block Placement Policy * used by the Namenode. @@ -887,38 +209,12 @@ public class Balancer { * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, + p.nodesToBeExcluded, conf); this.threshold = p.threshold; this.policy = p.policy; - this.nodesToBeExcluded = p.nodesToBeExcluded; - this.nodesToBeIncluded = p.nodesToBeIncluded; - this.nnc = theblockpool; - this.keyManager = nnc.getKeyManager(); - - final long movedWinWidth = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - movedBlocks = new MovedBlocks(movedWinWidth); - - cluster = NetworkTopology.getInstance(conf); - - this.moverExecutor = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); - this.dispatcherExecutor = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); - this.maxConcurrentMovesPerNode = - conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); - this.saslClient = new SaslDataTransferClient( - DataTransferSaslUtil.getSaslPropertiesResolver(conf), - TrustedChannelResolver.getInstance(conf), - conf.getBoolean( - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } - private static long getCapacity(DatanodeStorageReport report, StorageType t) { long capacity = 0L; for(StorageReport r : report.getStorageReports()) { @@ -939,26 +235,6 @@ public class Balancer { return remaining; } - private boolean shouldIgnore(DatanodeInfo dn) { - //ignore decommissioned nodes - final boolean decommissioned = dn.isDecommissioned(); - //ignore decommissioning nodes - final boolean decommissioning = dn.isDecommissionInProgress(); - // ignore nodes in exclude list - final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn); - // ignore nodes not in the include list (if include list is not empty) - final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn); - - if (decommissioned || decommissioning || excluded || notIncluded) { - if (LOG.isTraceEnabled()) { - LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " - + decommissioning + ", " + excluded + ", " + notIncluded); - } - return true; - } - return false; - } - /** * Given a datanode storage set, build a network topology and decide * over-utilized storages, above average utilized storages, @@ -966,16 +242,11 @@ public class Balancer { * The input datanode storage set is shuffled in order to randomize * to the storage matching later on. * - * @return the total number of bytes that are - * needed to move to make the cluster balanced. - * @param reports a set of datanode storage reports + * @return the number of bytes needed to move in order to balance the cluster. */ - private long init(DatanodeStorageReport[] reports) { + private long init(List reports) { // compute average utilization for (DatanodeStorageReport r : reports) { - if (shouldIgnore(r.getDatanodeInfo())) { - continue; - } policy.accumulateSpaces(r); } policy.initAvgUtilization(); @@ -983,15 +254,8 @@ public class Balancer { // create network topology and classify utilization collections: // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; - for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) { - final DatanodeInfo datanode = r.getDatanodeInfo(); - if (shouldIgnore(datanode)) { - continue; // ignore decommissioning or decommissioned nodes - } - cluster.add(datanode); - - final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes, - maxConcurrentMovesPerNode); + for(DatanodeStorageReport r : reports) { + final BalancerDatanode dn = dispatcher.newDatanode(r); for(StorageType t : StorageType.asList()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type @@ -1006,7 +270,7 @@ public class Balancer { final BalancerDatanode.StorageGroup g; if (utilizationDiff > 0) { - final Source s = dn.addSource(t, utilization, maxSize2Move, this); + final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher); if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { @@ -1023,14 +287,15 @@ public class Balancer { underUtilized.add(g); } } - storageGroupMap.put(g); + dispatcher.getStorageGroupMap().put(g); } } logUtilizationCollections(); - Preconditions.checkState(storageGroupMap.size() == overUtilized.size() - + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), + Preconditions.checkState(dispatcher.getStorageGroupMap().size() + == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size() + + belowAvgUtilized.size(), "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced @@ -1077,7 +342,7 @@ public class Balancer { */ private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware - if (cluster.isNodeGroupAware()) { + if (dispatcher.getCluster().isNodeGroupAware()) { chooseStorageGroups(Matcher.SAME_NODE_GROUP); } @@ -1086,15 +351,7 @@ public class Balancer { // At last, match all remaining nodes chooseStorageGroups(Matcher.ANY_OTHER); - Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), - "Mismatched number of datanodes (" + storageGroupMap.size() + " < " - + sources.size() + " sources, " + targets.size() + " targets)"); - - long bytesToMove = 0L; - for (Source src : sources) { - bytesToMove += src.getScheduledSize(); - } - return bytesToMove; + return dispatcher.bytesToMove(); } /** Decide all pairs according to the matcher. */ @@ -1166,9 +423,8 @@ public class Balancer { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); final Task task = new Task(target, size); source.addTask(task); - target.incScheduledSize(task.size); - sources.add(source); - targets.add(target); + target.incScheduledSize(task.getSize()); + dispatcher.add(source, target); LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " + source.getDisplayName() + " to " + target.getDisplayName()); } @@ -1182,7 +438,8 @@ public class Balancer { final C c = candidates.next(); if (!c.hasSpaceForScheduling()) { candidates.remove(); - } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) { + } else if (matcher.match(dispatcher.getCluster(), + g.getDatanode(), c.getDatanode())) { return c; } } @@ -1190,172 +447,16 @@ public class Balancer { return null; } - private final AtomicLong bytesMoved = new AtomicLong(); - - /* Start a thread to dispatch block moves for each source. - * The thread selects blocks to move & sends request to proxy source to - * initiate block move. The process is flow controlled. Block selection is - * blocked if there are too many un-confirmed block moves. - * Return the total number of bytes successfully moved in this iteration. - */ - private long dispatchBlockMoves() throws InterruptedException { - long bytesLastMoved = bytesMoved.get(); - Future[] futures = new Future[sources.size()]; - int i=0; - for (Source source : sources) { - futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher()); - } - - // wait for all dispatcher threads to finish - for (Future future : futures) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.warn("Dispatcher thread failed", e.getCause()); - } - } - - // wait for all block moving to be done - waitForMoveCompletion(); - - return bytesMoved.get()-bytesLastMoved; - } - - // The sleeping period before checking if block move is completed again - static private long blockMoveWaitTime = 30000L; - - /** set the sleeping period for block move completion check */ - static void setBlockMoveWaitTime(long time) { - blockMoveWaitTime = time; - } - - /* wait for all block move confirmations - * by checking each target's pendingMove queue - */ - private void waitForMoveCompletion() { - boolean shouldWait; - do { - shouldWait = false; - for (BalancerDatanode.StorageGroup target : targets) { - if (!target.getBalancerDatanode().isPendingQEmpty()) { - shouldWait = true; - break; - } - } - if (shouldWait) { - try { - Thread.sleep(blockMoveWaitTime); - } catch (InterruptedException ignored) { - } - } - } while (shouldWait); - } - - /* Decide if it is OK to move the given block from source to target - * A block is a good candidate if - * 1. the block is not in the process of being moved/has not been moved; - * 2. the block does not have a replica on the target; - * 3. doing the move does not reduce the number of racks that the block has - */ - private boolean isGoodBlockCandidate(Source source, - BalancerDatanode.StorageGroup target, BalancerBlock block) { - if (source.storageType != target.storageType) { - return false; - } - // check if the block is moved or not - if (movedBlocks.contains(block.getBlock())) { - return false; - } - if (block.isLocatedOn(target)) { - return false; - } - if (cluster.isNodeGroupAware() && - isOnSameNodeGroupWithReplicas(target, block, source)) { - return false; - } - - boolean goodBlock = false; - if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { - // good if source and target are on the same rack - goodBlock = true; - } else { - boolean notOnSameRack = true; - synchronized (block) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { - notOnSameRack = false; - break; - } - } - } - if (notOnSameRack) { - // good if target is target is not on the same rack as any replica - goodBlock = true; - } else { - // good if source is on the same rack as on of the replicas - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (loc != source && - cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { - goodBlock = true; - break; - } - } - } - } - return goodBlock; - } - - /** - * Check if there are any replica (other than source) on the same node group - * with target. If true, then target is not a good candidate for placing - * specific block replica as we don't want 2 replicas under the same nodegroup - * after balance. - * @param target targetDataNode - * @param block dataBlock - * @param source sourceDataNode - * @return true if there are any replica (other than source) on the same node - * group with target - */ - private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, - BalancerBlock block, Source source) { - final DatanodeInfo targetDn = target.getDatanode(); - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (loc != source && - cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { - return true; - } - } - return false; - } - /* reset all fields in a balancer preparing for the next iteration */ private void resetData(Configuration conf) { - this.cluster = NetworkTopology.getInstance(conf); this.overUtilized.clear(); this.aboveAvgUtilized.clear(); this.belowAvgUtilized.clear(); this.underUtilized.clear(); - this.storageGroupMap.clear(); - this.sources.clear(); - this.targets.clear(); this.policy.reset(); - cleanGlobalBlockList(); - this.movedBlocks.cleanup(); + dispatcher.reset(conf);; } - /* Remove all blocks from the global block list except for the ones in the - * moved list. - */ - private void cleanGlobalBlockList() { - for (Iterator globalBlockListIterator=globalBlockList.keySet().iterator(); - globalBlockListIterator.hasNext();) { - Block block = globalBlockListIterator.next(); - if(!movedBlocks.contains(block)) { - globalBlockListIterator.remove(); - } - } - } - // Exit status enum ReturnStatus { // These int values will map directly to the balancer process's exit code. @@ -1379,11 +480,8 @@ public class Balancer { private ReturnStatus run(int iteration, Formatter formatter, Configuration conf) { try { - /* get all live datanodes of a cluster and their disk usage - * decide the number of bytes need to be moved - */ - final long bytesLeftToMove = init( - nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE)); + final List reports = dispatcher.init(); + final long bytesLeftToMove = init(reports); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); return ReturnStatus.SUCCESS; @@ -1409,7 +507,7 @@ public class Balancer { formatter.format("%-24s %10d %19s %18s %17s%n", DateFormat.getDateTimeInstance().format(new Date()), iteration, - StringUtils.byteDesc(bytesMoved.get()), + StringUtils.byteDesc(dispatcher.getBytesMoved()), StringUtils.byteDesc(bytesLeftToMove), StringUtils.byteDesc(bytesToMove) ); @@ -1420,7 +518,7 @@ public class Balancer { * available to move. * Exit no byte has been moved for 5 consecutive iterations. */ - if (!this.nnc.shouldContinue(dispatchBlockMoves())) { + if (!dispatcher.dispatchAndCheckContinue()) { return ReturnStatus.NO_MOVE_PROGRESS; } @@ -1435,9 +533,7 @@ public class Balancer { System.out.println(e + ". Exiting ..."); return ReturnStatus.INTERRUPTED; } finally { - // shutdown thread pools - dispatcherExecutor.shutdownNow(); - moverExecutor.shutdownNow(); + dispatcher.shutdownNow(); } } @@ -1546,76 +642,6 @@ public class Balancer { } } - static class Util { - - /** - * @param datanode - * @return returns true if data node is part of the excludedNodes. - */ - static boolean shouldBeExcluded(Set excludedNodes, DatanodeInfo datanode) { - return isIn(excludedNodes, datanode); - } - - /** - * @param datanode - * @return returns true if includedNodes is empty or data node is part of the includedNodes. - */ - static boolean shouldBeIncluded(Set includedNodes, DatanodeInfo datanode) { - return (includedNodes.isEmpty() || - isIn(includedNodes, datanode)); - } - /** - * Match is checked using host name , ip address with and without port number. - * @param datanodeSet - * @param datanode - * @return true if the datanode's transfer address matches the set of nodes. - */ - private static boolean isIn(Set datanodeSet, DatanodeInfo datanode) { - return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) || - isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) || - isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort()); - } - - /** - * returns true if nodes contains host or host:port - * @param nodes - * @param host - * @param port - * @return - */ - private static boolean isIn(Set nodes, String host, int port) { - if (host == null) { - return false; - } - return (nodes.contains(host) || nodes.contains(host +":"+ port)); - } - - /** - * parse a comma separated string to obtain set of host names - * @param string - * @return - */ - static Set parseHostList(String string) { - String[] addrs = StringUtils.getTrimmedStrings(string); - return new HashSet(Arrays.asList(addrs)); - } - - /** - * read set of host names from a file - * @param fileName - * @return - */ - static Set getHostListFromFile(String fileName) { - Set nodes = new HashSet (); - try { - HostsFileReader.readFileToSet("nodes", fileName, nodes); - return StringUtils.getTrimmedStrings(nodes); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to open file: " + fileName); - } - } - } - static class Cli extends Configured implements Tool { /** * Parse arguments and then run Balancer. @@ -1688,7 +714,7 @@ public class Balancer { checkArgument(++i < args.length, "File containing nodes to exclude is not specified: args = " + Arrays.toString(args)); - nodesTobeExcluded = Util.getHostListFromFile(args[i]); + nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude"); } else { nodesTobeExcluded = Util.parseHostList(args[i]); } @@ -1700,7 +726,7 @@ public class Balancer { checkArgument(++i < args.length, "File containing nodes to include is not specified: args = " + Arrays.toString(args)); - nodesTobeIncluded = Util.getHostListFromFile(args[i]); + nodesTobeIncluded = Util.getHostListFromFile(args[i], "include"); } else { nodesTobeIncluded = Util.parseHostList(args[i]); } Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1616889&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Fri Aug 8 21:33:57 2014 @@ -0,0 +1,1060 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; + +/** Dispatching block replica moves between datanodes. */ +@InterfaceAudience.Private +public class Dispatcher { + static final Log LOG = LogFactory.getLog(Dispatcher.class); + + private static final long GB = 1L << 30; // 1GB + private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB; + + private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; + private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds + private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20 + // minutes + + private final NameNodeConnector nnc; + private final KeyManager keyManager; + private final SaslDataTransferClient saslClient; + + /** Set of datanodes to be excluded. */ + private final Set excludedNodes; + /** Restrict to the following nodes. */ + private final Set includedNodes; + + private final Collection sources = new HashSet(); + private final Collection targets + = new HashSet(); + + private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); + private final MovedBlocks movedBlocks; + + /** Map (datanodeUuid,storageType -> StorageGroup) */ + private final StorageGroupMap storageGroupMap = new StorageGroupMap(); + + private NetworkTopology cluster; + + private final ExecutorService moveExecutor; + private final ExecutorService dispatchExecutor; + /** The maximum number of concurrent blocks moves at a datanode */ + private final int maxConcurrentMovesPerNode; + + private final AtomicLong bytesMoved = new AtomicLong(); + + private static class GlobalBlockMap { + private final Map map = new HashMap(); + + /** + * Get the block from the map; + * if the block is not found, create a new block and put it in the map. + */ + private DBlock get(Block b) { + DBlock block = map.get(b); + if (block == null) { + block = new DBlock(b); + map.put(b, block); + } + return block; + } + + /** Remove all blocks except for the moved blocks. */ + private void removeAllButRetain( + MovedBlocks movedBlocks) { + for (Iterator i = map.keySet().iterator(); i.hasNext();) { + if (!movedBlocks.contains(i.next())) { + i.remove(); + } + } + } + } + + static class StorageGroupMap { + private static String toKey(String datanodeUuid, StorageType storageType) { + return datanodeUuid + ":" + storageType; + } + + private final Map map + = new HashMap(); + + BalancerDatanode.StorageGroup get(String datanodeUuid, + StorageType storageType) { + return map.get(toKey(datanodeUuid, storageType)); + } + + void put(BalancerDatanode.StorageGroup g) { + final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); + final BalancerDatanode.StorageGroup existing = map.put(key, g); + Preconditions.checkState(existing == null); + } + + int size() { + return map.size(); + } + + void clear() { + map.clear(); + } + } + + /** This class keeps track of a scheduled block move */ + private class PendingMove { + private DBlock block; + private Source source; + private BalancerDatanode proxySource; + private BalancerDatanode.StorageGroup target; + + private PendingMove() { + } + + @Override + public String toString() { + final Block b = block.getBlock(); + return b + " with size=" + b.getNumBytes() + " from " + + source.getDisplayName() + " to " + target.getDisplayName() + + " through " + proxySource.datanode; + } + + /** + * Choose a block & a proxy source for this pendingMove whose source & + * target have already been chosen. + * + * @return true if a block and its proxy are chosen; false otherwise + */ + private boolean chooseBlockAndProxy() { + // iterate all source's blocks until find a good one + for (Iterator i = source.getBlockIterator(); i.hasNext();) { + if (markMovedIfGoodBlock(i.next())) { + i.remove(); + return true; + } + } + return false; + } + + /** + * @return true if the given block is good for the tentative move. + */ + private boolean markMovedIfGoodBlock(DBlock block) { + synchronized (block) { + synchronized (movedBlocks) { + if (isGoodBlockCandidate(source, target, block)) { + this.block = block; + if (chooseProxySource()) { + movedBlocks.put(block); + if (LOG.isDebugEnabled()) { + LOG.debug("Decided to move " + this); + } + return true; + } + } + } + } + return false; + } + + /** + * Choose a proxy source. + * + * @return true if a proxy is found; otherwise false + */ + private boolean chooseProxySource() { + final DatanodeInfo targetDN = target.getDatanode(); + // if node group is supported, first try add nodes in the same node group + if (cluster.isNodeGroupAware()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) + && addTo(loc)) { + return true; + } + } + } + // check if there is replica which is on the same rack with the target + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { + return true; + } + } + // find out a non-busy replica + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { + if (addTo(loc)) { + return true; + } + } + return false; + } + + /** add to a proxy source for specific block movement */ + private boolean addTo(BalancerDatanode.StorageGroup g) { + final BalancerDatanode bdn = g.getBalancerDatanode(); + if (bdn.addPendingBlock(this)) { + proxySource = bdn; + return true; + } + return false; + } + + /** Dispatch the move to the proxy source & wait for the response. */ + private void dispatch() { + if (LOG.isDebugEnabled()) { + LOG.debug("Start moving " + this); + } + + Socket sock = new Socket(); + DataOutputStream out = null; + DataInputStream in = null; + try { + sock.connect( + NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), + HdfsServerConstants.READ_TIMEOUT); + /* + * Unfortunately we don't have a good way to know if the Datanode is + * taking a really long time to move a block, OR something has gone + * wrong and it's never going to finish. To deal with this scenario, we + * set a long timeout (20 minutes) to avoid hanging the balancer + * indefinitely. + */ + sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); + + sock.setKeepAlive(true); + + OutputStream unbufOut = sock.getOutputStream(); + InputStream unbufIn = sock.getInputStream(); + ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), + block.getBlock()); + Token accessToken = keyManager.getAccessToken(eb); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, keyManager, accessToken, target.getDatanode()); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.IO_FILE_BUFFER_SIZE)); + in = new DataInputStream(new BufferedInputStream(unbufIn, + HdfsConstants.IO_FILE_BUFFER_SIZE)); + + sendRequest(out, eb, accessToken); + receiveResponse(in); + bytesMoved.addAndGet(block.getNumBytes()); + LOG.info("Successfully moved " + this); + } catch (IOException e) { + LOG.warn("Failed to move " + this + ": " + e.getMessage()); + /* + * proxy or target may have an issue, insert a small delay before using + * these nodes further. This avoids a potential storm of + * "threads quota exceeded" Warnings when the balancer gets out of sync + * with work going on in datanode. + */ + proxySource.activateDelay(DELAY_AFTER_ERROR); + target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeSocket(sock); + + proxySource.removePendingBlock(this); + target.getBalancerDatanode().removePendingBlock(this); + + synchronized (this) { + reset(); + } + synchronized (Dispatcher.this) { + Dispatcher.this.notifyAll(); + } + } + } + + /** Send a block replace request to the output stream */ + private void sendRequest(DataOutputStream out, ExtendedBlock eb, + Token accessToken) throws IOException { + new Sender(out).replaceBlock(eb, target.storageType, accessToken, source + .getDatanode().getDatanodeUuid(), proxySource.datanode); + } + + /** Receive a block copy response from the input stream */ + private void receiveResponse(DataInputStream in) throws IOException { + BlockOpResponseProto response = BlockOpResponseProto + .parseFrom(vintPrefixed(in)); + if (response.getStatus() != Status.SUCCESS) { + if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new IOException("block move failed due to access token error"); + } + throw new IOException("block move is failed: " + response.getMessage()); + } + } + + /** reset the object */ + private void reset() { + block = null; + source = null; + proxySource = null; + target = null; + } + } + + /** A class for keeping track of block locations in the dispatcher. */ + private static class DBlock extends + MovedBlocks.Locations { + DBlock(Block block) { + super(block); + } + } + + /** The class represents a desired move. */ + static class Task { + private final BalancerDatanode.StorageGroup target; + private long size; // bytes scheduled to move + + Task(BalancerDatanode.StorageGroup target, long size) { + this.target = target; + this.size = size; + } + + long getSize() { + return size; + } + } + + /** A class that keeps track of a datanode. */ + static class BalancerDatanode { + + /** A group of storages in a datanode with the same storage type. */ + class StorageGroup { + final StorageType storageType; + final double utilization; + final long maxSize2Move; + private long scheduledSize = 0L; + + private StorageGroup(StorageType storageType, double utilization, + long maxSize2Move) { + this.storageType = storageType; + this.utilization = utilization; + this.maxSize2Move = maxSize2Move; + } + + BalancerDatanode getBalancerDatanode() { + return BalancerDatanode.this; + } + + DatanodeInfo getDatanode() { + return BalancerDatanode.this.datanode; + } + + /** Decide if still need to move more bytes */ + synchronized boolean hasSpaceForScheduling() { + return availableSizeToMove() > 0L; + } + + /** @return the total number of bytes that need to be moved */ + synchronized long availableSizeToMove() { + return maxSize2Move - scheduledSize; + } + + /** increment scheduled size */ + synchronized void incScheduledSize(long size) { + scheduledSize += size; + } + + /** @return scheduled size */ + synchronized long getScheduledSize() { + return scheduledSize; + } + + /** Reset scheduled size to zero. */ + synchronized void resetScheduledSize() { + scheduledSize = 0L; + } + + /** @return the name for display */ + String getDisplayName() { + return datanode + ":" + storageType; + } + + @Override + public String toString() { + return "" + utilization; + } + } + + final DatanodeInfo datanode; + final EnumMap storageMap + = new EnumMap(StorageType.class); + protected long delayUntil = 0L; + /** blocks being moved but not confirmed yet */ + private final List pendings; + private final int maxConcurrentMoves; + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; + } + + private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { + this.datanode = r.getDatanodeInfo(); + this.maxConcurrentMoves = maxConcurrentMoves; + this.pendings = new ArrayList(maxConcurrentMoves); + } + + private void put(StorageType storageType, StorageGroup g) { + final StorageGroup existing = storageMap.put(storageType, g); + Preconditions.checkState(existing == null); + } + + StorageGroup addStorageGroup(StorageType storageType, double utilization, + long maxSize2Move) { + final StorageGroup g = new StorageGroup(storageType, utilization, + maxSize2Move); + put(storageType, g); + return g; + } + + Source addSource(StorageType storageType, double utilization, + long maxSize2Move, Dispatcher balancer) { + final Source s = balancer.new Source(storageType, utilization, + maxSize2Move, this); + put(storageType, s); + return s; + } + + synchronized private void activateDelay(long delta) { + delayUntil = Time.monotonicNow() + delta; + } + + synchronized private boolean isDelayActive() { + if (delayUntil == 0 || Time.monotonicNow() > delayUntil) { + delayUntil = 0; + return false; + } + return true; + } + + /** Check if the node can schedule more blocks to move */ + synchronized boolean isPendingQNotFull() { + return pendings.size() < maxConcurrentMoves; + } + + /** Check if all the dispatched moves are done */ + synchronized boolean isPendingQEmpty() { + return pendings.isEmpty(); + } + + /** Add a scheduled block move to the node */ + synchronized boolean addPendingBlock(PendingMove pendingBlock) { + if (!isDelayActive() && isPendingQNotFull()) { + return pendings.add(pendingBlock); + } + return false; + } + + /** Remove a scheduled block move from the node */ + synchronized boolean removePendingBlock(PendingMove pendingBlock) { + return pendings.remove(pendingBlock); + } + } + + /** A node that can be the sources of a block move */ + class Source extends BalancerDatanode.StorageGroup { + + private final List tasks = new ArrayList(2); + private long blocksToReceive = 0L; + /** + * Source blocks point to the objects in {@link Dispatcher#globalBlocks} + * because we want to keep one copy of a block and be aware that the + * locations are changing over time. + */ + private final List srcBlocks = new ArrayList(); + + private Source(StorageType storageType, double utilization, + long maxSize2Move, BalancerDatanode dn) { + dn.super(storageType, utilization, maxSize2Move); + } + + /** Add a task */ + void addTask(Task task) { + Preconditions.checkState(task.target != this, + "Source and target are the same storage group " + getDisplayName()); + incScheduledSize(task.size); + tasks.add(task); + } + + /** @return an iterator to this source's blocks */ + Iterator getBlockIterator() { + return srcBlocks.iterator(); + } + + /** + * Fetch new blocks of this source from namenode and update this source's + * block list & {@link Dispatcher#globalBlocks}. + * + * @return the total size of the received blocks in the number of bytes. + */ + private long getBlockList() throws IOException { + final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); + final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size); + + long bytesReceived = 0; + for (BlockWithLocations blk : newBlocks.getBlocks()) { + bytesReceived += blk.getBlock().getNumBytes(); + synchronized (globalBlocks) { + final DBlock block = globalBlocks.get(blk.getBlock()); + synchronized (block) { + block.clearLocations(); + + // update locations + final String[] datanodeUuids = blk.getDatanodeUuids(); + final StorageType[] storageTypes = blk.getStorageTypes(); + for (int i = 0; i < datanodeUuids.length; i++) { + final BalancerDatanode.StorageGroup g = storageGroupMap.get( + datanodeUuids[i], storageTypes[i]); + if (g != null) { // not unknown + block.addLocation(g); + } + } + } + if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { + // filter bad candidates + srcBlocks.add(block); + } + } + } + return bytesReceived; + } + + /** Decide if the given block is a good candidate to move or not */ + private boolean isGoodBlockCandidate(DBlock block) { + for (Task t : tasks) { + if (Dispatcher.this.isGoodBlockCandidate(this, t.target, block)) { + return true; + } + } + return false; + } + + /** + * Choose a move for the source. The block's source, target, and proxy + * are determined too. When choosing proxy and target, source & + * target throttling has been considered. They are chosen only when they + * have the capacity to support this block move. The block should be + * dispatched immediately after this method is returned. + * + * @return a move that's good for the source to dispatch immediately. + */ + private PendingMove chooseNextMove() { + for (Iterator i = tasks.iterator(); i.hasNext();) { + final Task task = i.next(); + final BalancerDatanode target = task.target.getBalancerDatanode(); + PendingMove pendingBlock = new PendingMove(); + if (target.addPendingBlock(pendingBlock)) { + // target is not busy, so do a tentative block allocation + pendingBlock.source = this; + pendingBlock.target = task.target; + if (pendingBlock.chooseBlockAndProxy()) { + long blockSize = pendingBlock.block.getNumBytes(); + incScheduledSize(-blockSize); + task.size -= blockSize; + if (task.size == 0) { + i.remove(); + } + return pendingBlock; + } else { + // cancel the tentative move + target.removePendingBlock(pendingBlock); + } + } + } + return null; + } + + /** Iterate all source's blocks to remove moved ones */ + private void removeMovedBlocks() { + for (Iterator i = getBlockIterator(); i.hasNext();) { + if (movedBlocks.contains(i.next().getBlock())) { + i.remove(); + } + } + } + + private static final int SOURCE_BLOCKS_MIN_SIZE = 5; + + /** @return if should fetch more blocks from namenode */ + private boolean shouldFetchMoreBlocks() { + return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0; + } + + private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins + + /** + * This method iteratively does the following: it first selects a block to + * move, then sends a request to the proxy source to start the block move + * when the source's block list falls below a threshold, it asks the + * namenode for more blocks. It terminates when it has dispatch enough block + * move tasks or it has received enough blocks from the namenode, or the + * elapsed time of the iteration has exceeded the max time limit. + */ + private void dispatchBlocks() { + final long startTime = Time.monotonicNow(); + this.blocksToReceive = 2 * getScheduledSize(); + boolean isTimeUp = false; + int noPendingBlockIteration = 0; + while (!isTimeUp && getScheduledSize() > 0 + && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { + final PendingMove p = chooseNextMove(); + if (p != null) { + // move the block + moveExecutor.execute(new Runnable() { + @Override + public void run() { + p.dispatch(); + } + }); + continue; + } + + // Since we cannot schedule any block to move, + // remove any moved blocks from the source block list and + removeMovedBlocks(); // filter already moved blocks + // check if we should fetch more blocks from the namenode + if (shouldFetchMoreBlocks()) { + // fetch new blocks + try { + blocksToReceive -= getBlockList(); + continue; + } catch (IOException e) { + LOG.warn("Exception while getting block list", e); + return; + } + } else { + // source node cannot find a pendingBlockToMove, iteration +1 + noPendingBlockIteration++; + // in case no blocks can be moved for source node's task, + // jump out of while-loop after 5 iterations. + if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { + resetScheduledSize(); + } + } + + // check if time is up or not + if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { + isTimeUp = true; + continue; + } + + // Now we can not schedule any block to move and there are + // no new blocks added to the source block list, so we wait. + try { + synchronized (Dispatcher.this) { + Dispatcher.this.wait(1000); // wait for targets/sources to be idle + } + } catch (InterruptedException ignored) { + } + } + } + } + + Dispatcher(NameNodeConnector theblockpool, Set includedNodes, + Set excludedNodes, Configuration conf) { + this.nnc = theblockpool; + this.keyManager = nnc.getKeyManager(); + this.excludedNodes = excludedNodes; + this.includedNodes = includedNodes; + + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + movedBlocks = new MovedBlocks(movedWinWidth); + + this.cluster = NetworkTopology.getInstance(conf); + + this.moveExecutor = Executors.newFixedThreadPool(conf.getInt( + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); + this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt( + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); + this.maxConcurrentMovesPerNode = conf.getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + + final boolean fallbackToSimpleAuthAllowed = conf.getBoolean( + CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed); + } + + StorageGroupMap getStorageGroupMap() { + return storageGroupMap; + } + + NetworkTopology getCluster() { + return cluster; + } + + long getBytesMoved() { + return bytesMoved.get(); + } + + long bytesToMove() { + Preconditions.checkState( + storageGroupMap.size() >= sources.size() + targets.size(), + "Mismatched number of storage groups (" + storageGroupMap.size() + + " < " + sources.size() + " sources + " + targets.size() + + " targets)"); + + long b = 0L; + for (Source src : sources) { + b += src.getScheduledSize(); + } + return b; + } + + void add(Source source, BalancerDatanode.StorageGroup target) { + sources.add(source); + targets.add(target); + } + + private boolean shouldIgnore(DatanodeInfo dn) { + // ignore decommissioned nodes + final boolean decommissioned = dn.isDecommissioned(); + // ignore decommissioning nodes + final boolean decommissioning = dn.isDecommissionInProgress(); + // ignore nodes in exclude list + final boolean excluded = Util.isExcluded(excludedNodes, dn); + // ignore nodes not in the include list (if include list is not empty) + final boolean notIncluded = !Util.isIncluded(includedNodes, dn); + + if (decommissioned || decommissioning || excluded || notIncluded) { + if (LOG.isTraceEnabled()) { + LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " + + decommissioning + ", " + excluded + ", " + notIncluded); + } + return true; + } + return false; + } + + /** Get live datanode storage reports and then build the network topology. */ + List init() throws IOException { + final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport(); + final List trimmed = new ArrayList(); + // create network topology and classify utilization collections: + // over-utilized, above-average, below-average and under-utilized. + for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) { + final DatanodeInfo datanode = r.getDatanodeInfo(); + if (shouldIgnore(datanode)) { + continue; + } + trimmed.add(r); + cluster.add(datanode); + } + return trimmed; + } + + public BalancerDatanode newDatanode(DatanodeStorageReport r) { + return new BalancerDatanode(r, maxConcurrentMovesPerNode); + } + + public boolean dispatchAndCheckContinue() throws InterruptedException { + return nnc.shouldContinue(dispatchBlockMoves()); + } + + /** + * Dispatch block moves for each source. The thread selects blocks to move & + * sends request to proxy source to initiate block move. The process is flow + * controlled. Block selection is blocked if there are too many un-confirmed + * block moves. + * + * @return the total number of bytes successfully moved in this iteration. + */ + private long dispatchBlockMoves() throws InterruptedException { + final long bytesLastMoved = bytesMoved.get(); + final Future[] futures = new Future[sources.size()]; + + final Iterator i = sources.iterator(); + for (int j = 0; j < futures.length; j++) { + final Source s = i.next(); + futures[j] = dispatchExecutor.submit(new Runnable() { + @Override + public void run() { + s.dispatchBlocks(); + } + }); + } + + // wait for all dispatcher threads to finish + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.warn("Dispatcher thread failed", e.getCause()); + } + } + + // wait for all block moving to be done + waitForMoveCompletion(); + + return bytesMoved.get() - bytesLastMoved; + } + + /** The sleeping period before checking if block move is completed again */ + static private long blockMoveWaitTime = 30000L; + + /** set the sleeping period for block move completion check */ + static void setBlockMoveWaitTime(long time) { + blockMoveWaitTime = time; + } + + /** Wait for all block move confirmations. */ + private void waitForMoveCompletion() { + for(;;) { + boolean empty = true; + for (BalancerDatanode.StorageGroup t : targets) { + if (!t.getBalancerDatanode().isPendingQEmpty()) { + empty = false; + break; + } + } + if (empty) { + return; //all pending queues are empty + } + try { + Thread.sleep(blockMoveWaitTime); + } catch (InterruptedException ignored) { + } + } + } + + /** + * Decide if the block is a good candidate to be moved from source to target. + * A block is a good candidate if + * 1. the block is not in the process of being moved/has not been moved; + * 2. the block does not have a replica on the target; + * 3. doing the move does not reduce the number of racks that the block has + */ + private boolean isGoodBlockCandidate(Source source, + BalancerDatanode.StorageGroup target, DBlock block) { + if (source.storageType != target.storageType) { + return false; + } + // check if the block is moved or not + if (movedBlocks.contains(block.getBlock())) { + return false; + } + if (block.isLocatedOn(target)) { + return false; + } + if (cluster.isNodeGroupAware() + && isOnSameNodeGroupWithReplicas(target, block, source)) { + return false; + } + if (reduceNumOfRacks(source, target, block)) { + return false; + } + return true; + } + + /** + * Determine whether moving the given block replica from source to target + * would reduce the number of racks of the block replicas. + */ + private boolean reduceNumOfRacks(Source source, + BalancerDatanode.StorageGroup target, DBlock block) { + final DatanodeInfo sourceDn = source.getDatanode(); + if (cluster.isOnSameRack(sourceDn, target.getDatanode())) { + // source and target are on the same rack + return false; + } + boolean notOnSameRack = true; + synchronized (block) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { + notOnSameRack = false; + break; + } + } + } + if (notOnSameRack) { + // target is not on the same rack as any replica + return false; + } + for (BalancerDatanode.StorageGroup g : block.getLocations()) { + if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) { + // source is on the same rack of another replica + return false; + } + } + return true; + } + + /** + * Check if there are any replica (other than source) on the same node group + * with target. If true, then target is not a good candidate for placing + * specific replica as we don't want 2 replicas under the same nodegroup. + * + * @return true if there are any replica (other than source) on the same node + * group with target + */ + private boolean isOnSameNodeGroupWithReplicas( + BalancerDatanode.StorageGroup target, DBlock block, Source source) { + final DatanodeInfo targetDn = target.getDatanode(); + for (BalancerDatanode.StorageGroup g : block.getLocations()) { + if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) { + return true; + } + } + return false; + } + + /** Reset all fields in order to prepare for the next iteration */ + void reset(Configuration conf) { + cluster = NetworkTopology.getInstance(conf); + storageGroupMap.clear(); + sources.clear(); + targets.clear(); + globalBlocks.removeAllButRetain(movedBlocks); + movedBlocks.cleanup(); + } + + /** shutdown thread pools */ + void shutdownNow() { + dispatchExecutor.shutdownNow(); + moveExecutor.shutdownNow(); + } + + static class Util { + /** @return true if data node is part of the excludedNodes. */ + static boolean isExcluded(Set excludedNodes, DatanodeInfo dn) { + return isIn(excludedNodes, dn); + } + + /** + * @return true if includedNodes is empty or data node is part of the + * includedNodes. + */ + static boolean isIncluded(Set includedNodes, DatanodeInfo dn) { + return (includedNodes.isEmpty() || isIn(includedNodes, dn)); + } + + /** + * Match is checked using host name , ip address with and without port + * number. + * + * @return true if the datanode's transfer address matches the set of nodes. + */ + private static boolean isIn(Set datanodes, DatanodeInfo dn) { + return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort()) + || isIn(datanodes, dn.getIpAddr(), dn.getXferPort()) + || isIn(datanodes, dn.getHostName(), dn.getXferPort()); + } + + /** @return true if nodes contains host or host:port */ + private static boolean isIn(Set nodes, String host, int port) { + if (host == null) { + return false; + } + return (nodes.contains(host) || nodes.contains(host + ":" + port)); + } + + /** + * Parse a comma separated string to obtain set of host names + * + * @return set of host names + */ + static Set parseHostList(String string) { + String[] addrs = StringUtils.getTrimmedStrings(string); + return new HashSet(Arrays.asList(addrs)); + } + + /** + * Read set of host names from a file + * + * @return set of host names + */ + static Set getHostListFromFile(String fileName, String type) { + Set nodes = new HashSet(); + try { + HostsFileReader.readFileToSet(type, fileName, nodes); + return StringUtils.getTrimmedStrings(nodes); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to read host list from file: " + fileName); + } + } + } +}