Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 58598200B50 for ; Fri, 24 Jun 2016 03:36:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 54F28160A6B; Fri, 24 Jun 2016 01:36:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF186160A59 for ; Fri, 24 Jun 2016 03:35:59 +0200 (CEST) Received: (qmail 51007 invoked by uid 500); 24 Jun 2016 01:35:57 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 50679 invoked by uid 99); 24 Jun 2016 01:35:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Jun 2016 01:35:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34AB8E9648; Fri, 24 Jun 2016 01:35:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Fri, 24 Jun 2016 01:35:58 -0000 Message-Id: <5423eb60bb7746de828731c5444a57e8@git.apache.org> In-Reply-To: <3e5211e11c5848839190aec1c5feaaa9@git.apache.org> References: <3e5211e11c5848839190aec1c5feaaa9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/49] hadoop git commit: HDFS-9720. DiskBalancer : Add configuration parameters. Contributed by Anu Engineer. archived-at: Fri, 24 Jun 2016 01:36:01 -0000 HDFS-9720. DiskBalancer : Add configuration parameters. Contributed by Anu Engineer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05067707 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05067707 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05067707 Branch: refs/heads/HDFS-1312 Commit: 050677077beaf42255b3936952b8e816a9201203 Parents: 6c606bf Author: Anu Engineer Authored: Tue Apr 5 12:23:35 2016 -0700 Committer: Arpit Agarwal Committed: Thu Jun 23 18:18:48 2016 -0700 ---------------------------------------------------------------------- .../hdfs/protocol/ClientDatanodeProtocol.java | 4 +- .../ClientDatanodeProtocolTranslatorPB.java | 8 +- .../server/datanode/DiskBalancerWorkItem.java | 77 +++++++++++++++++++ .../src/main/proto/ClientDatanodeProtocol.proto | 2 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 15 ++++ ...tDatanodeProtocolServerSideTranslatorPB.java | 6 +- .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../hdfs/server/datanode/DiskBalancer.java | 81 +++++++++++++++----- .../server/diskbalancer/planner/MoveStep.java | 75 ++++++++++++++++++ .../hdfs/server/diskbalancer/planner/Step.java | 23 ++++++ .../diskbalancer/TestDiskBalancerRPC.java | 31 ++++---- .../TestDiskBalancerWithMockMover.java | 37 ++++++++- .../hdfs/server/diskbalancer/TestPlanner.java | 29 ++++--- 13 files changed, 328 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index d8df7fb..3993ce5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -168,8 +168,8 @@ public interface ClientDatanodeProtocol { /** * Submit a disk balancer plan for execution. */ - void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth, - String plan) throws IOException; + void submitDiskBalancerPlan(String planID, long planVersion, String plan, + boolean skipDateCheck) throws IOException; /** * Cancel an executing plan. http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 7076026..4f314e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -342,22 +342,20 @@ public class ClientDatanodeProtocolTranslatorPB implements * local copies of these plans. * @param planVersion - The data format of the plans - for future , not * used now. - * @param bandwidth - Maximum disk bandwidth to consume, setting this value - * to zero allows datanode to use the value defined in - * configration. * @param plan - Actual plan. + * @param skipDateCheck - Skips the date check. * @throws IOException */ @Override public void submitDiskBalancerPlan(String planID, long planVersion, - long bandwidth, String plan) throws IOException { + String plan, boolean skipDateCheck) throws IOException { try { SubmitDiskBalancerPlanRequestProto request = SubmitDiskBalancerPlanRequestProto.newBuilder() .setPlanID(planID) .setPlanVersion(planVersion) - .setMaxDiskBandwidth(bandwidth) .setPlan(plan) + .setIgnoreDateCheck(skipDateCheck) .build(); rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request); } catch (ServiceException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java index 11730e2..7381499 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkItem.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; @@ -31,6 +32,7 @@ import java.io.IOException; */ @InterfaceAudience.Private @InterfaceStability.Unstable +@JsonInclude(JsonInclude.Include.NON_DEFAULT) public class DiskBalancerWorkItem { private final long bytesToCopy; private long bytesCopied; @@ -38,6 +40,10 @@ public class DiskBalancerWorkItem { private String errMsg; private long blocksCopied; + private long maxDiskErrors; + private long tolerancePercent; + private long bandwidth; + /** * Constructs a DiskBalancerWorkItem. * @@ -157,4 +163,75 @@ public class DiskBalancerWorkItem { return mapper.writeValueAsString(this); } + /** + * Sets the Error counts for this step. + * + * @param errorCount long. + */ + public void setErrorCount(long errorCount) { + this.errorCount = errorCount; + } + + /** + * Number of blocks copied so far. + * + * @param blocksCopied Blocks copied. + */ + public void setBlocksCopied(long blocksCopied) { + this.blocksCopied = blocksCopied; + } + + /** + * Gets maximum disk errors to tolerate before we fail this copy step. + * + * @return long. + */ + public long getMaxDiskErrors() { + return maxDiskErrors; + } + + /** + * Sets maximum disk errors to tolerate before we fail this copy step. + * + * @param maxDiskErrors long + */ + public void setMaxDiskErrors(long maxDiskErrors) { + this.maxDiskErrors = maxDiskErrors; + } + + /** + * Allowed deviation from ideal storage in percentage. + * + * @return long + */ + public long getTolerancePercent() { + return tolerancePercent; + } + + /** + * Sets the tolerance percentage. + * + * @param tolerancePercent - tolerance. + */ + public void setTolerancePercent(long tolerancePercent) { + this.tolerancePercent = tolerancePercent; + } + + /** + * Max disk bandwidth to use. MB per second. + * + * @return - long. + */ + public long getBandwidth() { + return bandwidth; + } + + /** + * Sets max disk bandwidth to use, in MBs per second. + * + * @param bandwidth - long. + */ + public void setBandwidth(long bandwidth) { + this.bandwidth = bandwidth; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index c7cd4fb..e0a7fd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -157,7 +157,7 @@ message SubmitDiskBalancerPlanRequestProto { required string planID = 1; // A hash of the plan like SHA512 required string plan = 2; // Json String that describes the plan optional uint64 planVersion = 3; // Plan version number - optional uint64 maxDiskBandwidth = 4; // optional bandwidth control. + optional bool ignoreDateCheck = 4; // Ignore date checks on this plan. } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 224ab3d..6640ec6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -935,6 +935,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.disk.balancer.enabled"; public static final boolean DFS_DISK_BALANCER_ENABLED_DEFAULT = false; + public static final String DFS_DISK_BALANCER_MAX_DISK_THRUPUT = + "dfs.disk.balancer.max.disk.throughputInMBperSec"; + public static final int DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT = + 10; + + public static final String DFS_DISK_BALANCER_MAX_DISK_ERRORS = + "dfs.disk.balancer.max.disk.errors"; + public static final int DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT = 5; + + + public static final String DFS_DISK_BALANCER_BLOCK_TOLERANCE = + "dfs.disk.balancer.block.tolerance.percent"; + public static final int DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT = 5; + + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index d72a060..482e86f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -254,9 +254,9 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throws ServiceException { try { impl.submitDiskBalancerPlan(request.getPlanID(), - request.hasPlanVersion() ? request.getPlanVersion() : 0, - request.hasMaxDiskBandwidth() ? request.getMaxDiskBandwidth() : 0, - request.getPlan()); + request.hasPlanVersion() ? request.getPlanVersion() : 1, + request.getPlan(), + request.hasIgnoreDateCheck() ? request.getIgnoreDateCheck() : false); SubmitDiskBalancerPlanResponseProto response = SubmitDiskBalancerPlanResponseProto.newBuilder() .build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8a61291..776da3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3331,11 +3331,11 @@ public class DataNode extends ReconfigurableBase */ @Override public void submitDiskBalancerPlan(String planID, - long planVersion, long bandwidth, String plan) throws IOException { + long planVersion, String plan, boolean skipDateCheck) throws IOException { checkSuperuserPrivilege(); // TODO : Support force option - this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false); + this.diskBalancer.submitPlan(planID, planVersion, plan, skipDateCheck); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 972f0fc..b62a4fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import com.google.common.base.Preconditions; import org.apache.commons.codec.digest.DigestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -35,6 +33,8 @@ import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; import org.apache.hadoop.util.Time; import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; /** @@ -99,6 +100,9 @@ public class DiskBalancer { this.isDiskBalancerEnabled = conf.getBoolean( DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT); + this.bandwidth = conf.getInt( + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT, + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT); } /** @@ -144,13 +148,11 @@ public class DiskBalancer { * @param planID - A SHA512 of the plan string * @param planVersion - version of the plan string - for future use. * @param plan - Actual Plan - * @param bandwidth - BytesPerSec to copy * @param force - Skip some validations and execute the plan file. * @throws DiskBalancerException */ public void submitPlan(String planID, long planVersion, String plan, - long bandwidth, boolean force) - throws DiskBalancerException { + boolean force) throws DiskBalancerException { lock.lock(); try { @@ -160,12 +162,10 @@ public class DiskBalancer { throw new DiskBalancerException("Executing another plan", DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS); } - NodePlan nodePlan = - verifyPlan(planID, planVersion, plan, bandwidth, force); + NodePlan nodePlan = verifyPlan(planID, planVersion, plan, force); createWorkPlan(nodePlan); this.planID = planID; this.currentResult = Result.PLAN_UNDER_PROGRESS; - this.bandwidth = bandwidth; executePlan(); } finally { lock.unlock(); @@ -292,14 +292,12 @@ public class DiskBalancer { * @param planID - SHA 512 of the plan. * @param planVersion - Version of the plan, for future use. * @param plan - Plan String in Json. - * @param bandwidth - Max disk bandwidth to use per second. * @param force - Skip verifying when the plan was generated. * @return a NodePlan Object. * @throws DiskBalancerException */ private NodePlan verifyPlan(String planID, long planVersion, String plan, - long bandwidth, boolean force) - throws DiskBalancerException { + boolean force) throws DiskBalancerException { Preconditions.checkState(lock.isHeldByCurrentThread()); verifyPlanVersion(planVersion); @@ -428,7 +426,7 @@ public class DiskBalancer { throw new DiskBalancerException("Unable to find destination volume.", DiskBalancerException.Result.INVALID_VOLUME); } - createWorkPlan(sourceVol, destVol, step.getBytesToMove()); + createWorkPlan(sourceVol, destVol, step); } } @@ -488,17 +486,18 @@ public class DiskBalancer { * * @param source - Source vol * @param dest - destination volume - * @param bytesToMove - number of bytes to move + * @param step - Move Step */ private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest, - long bytesToMove) throws DiskBalancerException { + Step step) throws DiskBalancerException { if(source.getStorageID().equals(dest.getStorageID())) { - throw new DiskBalancerException("Same source and destination", - DiskBalancerException.Result.INVALID_MOVE); + LOG.info("Disk Balancer - source & destination volumes are same."); + throw new DiskBalancerException("source and destination volumes are " + + "same.", DiskBalancerException.Result.INVALID_MOVE); } VolumePair pair = new VolumePair(source, dest); - + long bytesToMove = step.getBytesToMove(); // In case we have a plan with more than // one line of same // we compress that into one work order. @@ -507,6 +506,12 @@ public class DiskBalancer { } DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0); + + // all these values can be zero, if so we will use + // values from configuration. + work.setBandwidth(step.getBandwidth()); + work.setTolerancePercent(step.getTolerancePercent()); + work.setMaxDiskErrors(step.getMaxDiskErrors()); workMap.put(pair, work); } @@ -600,11 +605,12 @@ public class DiskBalancer { /** * Actual DataMover class for DiskBalancer. *

- * TODO : Add implementation for this class. This is here as a place holder so - * that Datanode can make calls into this class. */ public static class DiskBalancerMover implements BlockMover { private final FsDatasetSpi dataset; + private long diskBandwidth; + private long blockTolerance; + private long maxDiskErrors; /** * Constructs diskBalancerMover. @@ -614,7 +620,42 @@ public class DiskBalancer { */ public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) { this.dataset = dataset; - // TODO : Read Config values. + + this.diskBandwidth = conf.getLong( + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT, + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT); + + this.blockTolerance = conf.getLong( + DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE, + DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT); + + this.maxDiskErrors = conf.getLong( + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS, + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT); + + // Since these are user provided values make sure it is sane + // or ignore faulty values. + if (this.diskBandwidth <= 0) { + LOG.debug("Found 0 or less as max disk throughput, ignoring config " + + "value. value : " + diskBandwidth); + diskBandwidth = + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT; + } + + if (this.blockTolerance <= 0) { + LOG.debug("Found 0 or less for block tolerance value, ignoring config" + + "value. value : " + blockTolerance); + blockTolerance = + DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT; + + } + + if (this.maxDiskErrors < 0) { + LOG.debug("Found less than 0 for maxDiskErrors value, ignoring " + + "config value. value : " + maxDiskErrors); + maxDiskErrors = + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java index 75af0d6..9a493a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/MoveStep.java @@ -19,8 +19,19 @@ package org.apache.hadoop.hdfs.server.diskbalancer.planner; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude; + + + + /** + * Ignore fields with default values. In most cases Throughtput, diskErrors + * tolerancePercent and bandwidth will be the system defaults. + * So we will avoid serializing them into JSON. + */ +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +/** * Move step is a step that planner can execute that will move data from one * volume to another. */ @@ -31,6 +42,10 @@ public class MoveStep implements Step { private long bytesToMove; private String volumeSetID; + private long maxDiskErrors; + private long tolerancePercent; + private long bandwidth; + /** * Constructs a MoveStep for the volume set. * @@ -178,4 +193,64 @@ public class MoveStep implements Step { public String getSizeString(long size) { return StringUtils.TraditionalBinaryPrefix.long2String(size, "", 1); } + + /** + * Gets Maximum numbers of errors to be tolerated before this + * move operation is aborted. + * @return long. + */ + public long getMaxDiskErrors() { + return maxDiskErrors; + } + + /** + * Sets the maximum numbers of Errors to be tolerated before this + * step is aborted. + * @param maxDiskErrors - long + */ + public void setMaxDiskErrors(long maxDiskErrors) { + this.maxDiskErrors = maxDiskErrors; + } + + /** + * Tolerance Percentage indicates when a move operation is considered good + * enough. This is a percentage of deviation from ideal that is considered + * fine. + * + * For example : if the ideal amount on each disk was 1 TB and the + * tolerance was 10%, then getting to 900 GB on the destination disk is + * considerd good enough. + * + * @return tolerance percentage. + */ + public long getTolerancePercent() { + return tolerancePercent; + } + + /** + * Sets the tolerance percentage. + * @param tolerancePercent - long + */ + public void setTolerancePercent(long tolerancePercent) { + this.tolerancePercent = tolerancePercent; + } + + /** + * Gets the disk Bandwidth. That is the MB/Sec to copied. We will max out + * on this amount of throughput. This is useful to prevent too much I/O on + * datanode while data node is in use. + * @return long. + */ + public long getBandwidth() { + return bandwidth; + } + + /** + * Sets the maximum disk bandwidth per sec to use for this step. + * @param bandwidth - Long, MB / Sec of data to be moved between + * source and destinatin volume. + */ + public void setBandwidth(long bandwidth) { + this.bandwidth = bandwidth; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java index d87209e..f13909f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/Step.java @@ -65,4 +65,27 @@ public interface Step { */ String getSizeString(long size); + /** + * Returns maximum number of disk erros tolerated. + * @return long. + */ + long getMaxDiskErrors(); + + /** + * Returns tolerance percentage, the good enough value + * when we move data from one to disk to another. + * @return long. + */ + long getTolerancePercent(); + + /** + * Returns max disk bandwidth that disk balancer will use. + * Expressed in MB/sec. For example, a value like 10 + * indicates that disk balancer will only move 10 MB / sec + * while it is running. + * @return long. + */ + long getBandwidth(); + + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index a65ed21..27cd8eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -30,7 +30,9 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; import org.hamcrest.*; import org.junit.After; import org.junit.Assert; @@ -76,7 +78,8 @@ public class TestDiskBalancerRPC { String planHash = rpcTestHelper.getPlanHash(); int planVersion = rpcTestHelper.getPlanVersion(); NodePlan plan = rpcTestHelper.getPlan(); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(), + false); } @Test @@ -91,7 +94,8 @@ public class TestDiskBalancerRPC { NodePlan plan = rpcTestHelper.getPlan(); thrown.expect(DiskBalancerException.class); thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH)); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(), + false); } @Test @@ -104,7 +108,8 @@ public class TestDiskBalancerRPC { NodePlan plan = rpcTestHelper.getPlan(); thrown.expect(DiskBalancerException.class); thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION)); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(), + false); } @Test @@ -116,8 +121,8 @@ public class TestDiskBalancerRPC { NodePlan plan = rpcTestHelper.getPlan(); thrown.expect(DiskBalancerException.class); thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN)); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, ""); - } + dataNode.submitDiskBalancerPlan(planHash, planVersion, "", + false); } @Test public void testCancelPlan() throws Exception { @@ -126,7 +131,8 @@ public class TestDiskBalancerRPC { String planHash = rpcTestHelper.getPlanHash(); int planVersion = rpcTestHelper.getPlanVersion(); NodePlan plan = rpcTestHelper.getPlan(); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(), + false); dataNode.cancelDiskBalancePlan(planHash); } @@ -189,15 +195,14 @@ public class TestDiskBalancerRPC { int planVersion = rpcTestHelper.getPlanVersion(); NodePlan plan = rpcTestHelper.getPlan(); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(), + false); String bandwidthString = dataNode.getDiskBalancerSetting( DiskBalancerConstants.DISKBALANCER_BANDWIDTH); long value = Long.decode(bandwidthString); Assert.assertEquals(10L, value); } - - @Test public void testQueryPlan() throws Exception { RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); @@ -206,7 +211,8 @@ public class TestDiskBalancerRPC { int planVersion = rpcTestHelper.getPlanVersion(); NodePlan plan = rpcTestHelper.getPlan(); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(), + false); DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan(); Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS || status.getResult() == PLAN_DONE); @@ -221,7 +227,6 @@ public class TestDiskBalancerRPC { Assert.assertTrue(status.getResult() == NO_PLAN); } - private class RpcTestHelper { private NodePlan plan; private int planVersion; http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java index ed761ed..5032611 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java @@ -50,11 +50,14 @@ import org.junit.rules.ExpectedException; import java.io.IOException; import java.net.URI; import java.util.Iterator; +import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TestDiskBalancerWithMockMover { @@ -120,7 +123,7 @@ public class TestDiskBalancerWithMockMover { int version) throws IOException { String planJson = plan.toJson(); String planID = DigestUtils.sha512Hex(planJson); - balancer.submitPlan(planID, version, planJson, 10, false); + balancer.submitPlan(planID, version, planJson, false); } private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer) @@ -209,7 +212,7 @@ public class TestDiskBalancerWithMockMover { thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException .Result.INVALID_PLAN)); - balancer.submitPlan(planID, 1, null, 10, false); + balancer.submitPlan(planID, 1, null, false); } @Test @@ -228,7 +231,7 @@ public class TestDiskBalancerWithMockMover { thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException .Result.INVALID_PLAN_HASH)); balancer.submitPlan(planID.replace(planID.charAt(0), repChar), - 1, planJson, 10, false); + 1, planJson, false); } @@ -278,6 +281,34 @@ public class TestDiskBalancerWithMockMover { } + + /** + * Test Custom bandwidth. + * + * @throws Exception + */ + @Test + public void testCustomBandwidth() throws Exception { + MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke(); + NodePlan plan = mockMoverHelper.getPlan(); + DiskBalancer balancer = mockMoverHelper.getBalancer(); + + for(Step step : plan.getVolumeSetPlans()){ + MoveStep tempStep = (MoveStep) step; + tempStep.setBandwidth(100); + } + executeSubmitPlan(plan, balancer); + DiskBalancerWorkStatus status = balancer + .queryWorkStatus(); + assertNotNull(status); + + DiskBalancerWorkStatus.DiskBalancerWorkEntry entry = + balancer.queryWorkStatus().getCurrentState().get(0); + assertEquals(100L, entry.getWorkItem().getBandwidth()); + + } + + @Before public void setUp() throws Exception { Configuration conf = new HdfsConfiguration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/05067707/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java index f756104..ad18075 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestPlanner.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel .DiskBalancerVolumeSet; import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner; -import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; import org.junit.Assert; @@ -48,7 +47,7 @@ public class TestPlanner { LoggerFactory.getLogger(TestPlanner.class); @Test - public void TestGreedyPlannerBalanceVolumeSet() throws Exception { + public void testGreedyPlannerBalanceVolumeSet() throws Exception { URI clusterJson = getClass() .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI(); ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson, @@ -65,7 +64,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerComputePlan() throws Exception { + public void testGreedyPlannerComputePlan() throws Exception { URI clusterJson = getClass() .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI(); ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson, @@ -90,13 +89,13 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerNoNodeCluster() throws Exception { + public void testGreedyPlannerNoNodeCluster() throws Exception { GreedyPlanner planner = new GreedyPlanner(10.0f, null); assertNotNull(planner); } @Test - public void TestGreedyPlannerNoVolumeTest() throws Exception { + public void testGreedyPlannerNoVolumeTest() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); List planList = cluster.computePlan(10.0f); @@ -104,7 +103,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerOneVolumeNoPlanTest() throws Exception { + public void testGreedyPlannerOneVolumeNoPlanTest() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -127,7 +126,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerTwoVolume() throws Exception { + public void testGreedyPlannerTwoVolume() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -166,7 +165,7 @@ public class TestPlanner { * That is the plan should say move 10 GB from volume30 to volume10. */ @Test - public void TestGreedyPlannerEqualizeData() throws Exception { + public void testGreedyPlannerEqualizeData() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -201,7 +200,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerEqualDisksNoMoves() throws Exception { + public void testGreedyPlannerEqualDisksNoMoves() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -232,7 +231,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerMoveFromSingleDisk() throws Exception { + public void testGreedyPlannerMoveFromSingleDisk() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -271,7 +270,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerThresholdTest() throws Exception { + public void testGreedyPlannerThresholdTest() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -327,7 +326,7 @@ public class TestPlanner { } @Test - public void TestGreedyPlannerPlanWithDifferentDiskSizes() throws Exception { + public void testGreedyPlannerPlanWithDifferentDiskSizes() throws Exception { NullConnector nullConnector = new NullConnector(); DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); @@ -381,7 +380,7 @@ public class TestPlanner { } @Test - public void TestLoadsCorrectClusterConnector() throws Exception { + public void testLoadsCorrectClusterConnector() throws Exception { ClusterConnector connector = ConnectorFactory.getCluster(getClass() .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI() , null); @@ -392,7 +391,7 @@ public class TestPlanner { } @Test - public void TestPlannerScale() throws Exception { + public void testPlannerScale() throws Exception { final int diskCount = 256; // it is rare to see more than 48 disks DiskBalancerTestUtil util = new DiskBalancerTestUtil(); DiskBalancerVolumeSet vSet = @@ -428,7 +427,7 @@ public class TestPlanner { } @Test - public void TestNodePlanSerialize() throws Exception { + public void testNodePlanSerialize() throws Exception { final int diskCount = 12; DiskBalancerTestUtil util = new DiskBalancerTestUtil(); DiskBalancerVolumeSet vSet = --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org