From commits-return-78935-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Wed Oct 10 16:52:44 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 72F941807A3 for ; Wed, 10 Oct 2018 16:52:42 +0200 (CEST) Received: (qmail 32527 invoked by uid 500); 10 Oct 2018 14:52:40 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 32073 invoked by uid 99); 10 Oct 2018 14:52:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Oct 2018 14:52:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A7B6E0AF3; Wed, 10 Oct 2018 14:52:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 10 Oct 2018 14:52:45 -0000 Message-Id: In-Reply-To: <83a35095c2514023b89fe30a362449bf@git.apache.org> References: <83a35095c2514023b89fe30a362449bf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/20] hbase-site git commit: Published site at fe579a1bb34818a058b763afcd2bd3bd9923d5af. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/87832ef6/devapidocs/src-html/org/apache/hadoop/hbase/util/RegionMover.MoveWithAck.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/util/RegionMover.MoveWithAck.html b/devapidocs/src-html/org/apache/hadoop/hbase/util/RegionMover.MoveWithAck.html index fe7808c..0382c19 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/util/RegionMover.MoveWithAck.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/util/RegionMover.MoveWithAck.html @@ -27,994 +27,772 @@ 019 020package org.apache.hadoop.hbase.util; 021 -022import java.io.BufferedReader; -023import java.io.DataInputStream; -024import java.io.DataOutputStream; -025import java.io.File; -026import java.io.FileInputStream; -027import java.io.FileOutputStream; -028import java.io.FileReader; -029import java.io.IOException; -030import java.util.ArrayList; -031import java.util.Collections; -032import java.util.EnumSet; -033import java.util.Iterator; -034import java.util.List; -035import java.util.Locale; -036import java.util.concurrent.Callable; -037import java.util.concurrent.CancellationException; -038import java.util.concurrent.ExecutionException; -039import java.util.concurrent.ExecutorService; -040import java.util.concurrent.Executors; -041import java.util.concurrent.Future; -042import java.util.concurrent.TimeUnit; -043import java.util.concurrent.TimeoutException; -044import org.apache.hadoop.conf.Configuration; -045import org.apache.hadoop.hbase.ClusterMetrics.Option; -046import org.apache.hadoop.hbase.HBaseConfiguration; -047import org.apache.hadoop.hbase.HConstants; -048import org.apache.hadoop.hbase.ServerName; -049import org.apache.hadoop.hbase.TableName; -050import org.apache.hadoop.hbase.client.Admin; -051import org.apache.hadoop.hbase.client.Connection; -052import org.apache.hadoop.hbase.client.ConnectionFactory; -053import org.apache.hadoop.hbase.client.Get; -054import org.apache.hadoop.hbase.client.RegionInfo; -055import org.apache.hadoop.hbase.client.Result; -056import org.apache.hadoop.hbase.client.ResultScanner; -057import org.apache.hadoop.hbase.client.Scan; -058import org.apache.hadoop.hbase.client.Table; -059import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -060import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -062import org.apache.yetus.audience.InterfaceAudience; -063import org.slf4j.Logger; -064import org.slf4j.LoggerFactory; -065 -066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; -068 -069/** -070 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command -071 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode -072 * acknowledges if regions are online after movement while noAck mode is best effort mode that -073 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck -074 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it -075 * anyways. This can also be used by constructiong an Object using the builder and then calling -076 * {@link #load()} or {@link #unload()} methods for the desired operations. -077 */ -078@InterfaceAudience.Public -079public class RegionMover extends AbstractHBaseTool { -080 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; -081 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; -082 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; -083 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; -084 public static final int DEFAULT_MOVE_WAIT_MAX = 60; -085 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; -086 static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); -087 private RegionMoverBuilder rmbuilder; -088 private boolean ack = true; -089 private int maxthreads = 1; -090 private int timeout; -091 private String loadUnload; -092 private String hostname; -093 private String filename; -094 private String excludeFile; -095 private int port; -096 -097 private RegionMover(RegionMoverBuilder builder) { -098 this.hostname = builder.hostname; -099 this.filename = builder.filename; -100 this.excludeFile = builder.excludeFile; -101 this.maxthreads = builder.maxthreads; -102 this.ack = builder.ack; -103 this.port = builder.port; -104 this.timeout = builder.timeout; -105 setConf(builder.conf); -106 } -107 -108 private RegionMover() { -109 } -110 -111 /** -112 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has -113 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, -114 * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options -115 */ -116 public static class RegionMoverBuilder { -117 private boolean ack = true; -118 private int maxthreads = 1; -119 private int timeout = Integer.MAX_VALUE; -120 private String hostname; -121 private String filename; -122 private String excludeFile = null; -123 private String defaultDir = System.getProperty("java.io.tmpdir"); -124 -125 @VisibleForTesting -126 final int port; -127 private final Configuration conf; -128 -129 public RegionMoverBuilder(String hostname) { -130 this(hostname, createConf()); -131 } -132 -133 /** -134 * Creates a new configuration and sets region mover specific overrides -135 */ -136 private static Configuration createConf() { -137 Configuration conf = HBaseConfiguration.create(); -138 conf.setInt("hbase.client.prefetch.limit", 1); -139 conf.setInt("hbase.client.pause", 500); -140 conf.setInt("hbase.client.retries.number", 100); -141 return conf; +022import java.io.BufferedInputStream; +023import java.io.BufferedOutputStream; +024import java.io.Closeable; +025import java.io.DataInputStream; +026import java.io.DataOutputStream; +027import java.io.File; +028import java.io.FileInputStream; +029import java.io.FileOutputStream; +030import java.io.IOException; +031import java.nio.file.Files; +032import java.nio.file.Paths; +033import java.util.ArrayList; +034import java.util.Collections; +035import java.util.EnumSet; +036import java.util.Iterator; +037import java.util.List; +038import java.util.Locale; +039import java.util.concurrent.Callable; +040import java.util.concurrent.CancellationException; +041import java.util.concurrent.ExecutionException; +042import java.util.concurrent.ExecutorService; +043import java.util.concurrent.Executors; +044import java.util.concurrent.Future; +045import java.util.concurrent.TimeUnit; +046import java.util.concurrent.TimeoutException; +047import java.util.function.Predicate; +048 +049import org.apache.commons.io.IOUtils; +050import org.apache.hadoop.conf.Configuration; +051import org.apache.hadoop.hbase.ClusterMetrics.Option; +052import org.apache.hadoop.hbase.HBaseConfiguration; +053import org.apache.hadoop.hbase.HConstants; +054import org.apache.hadoop.hbase.MetaTableAccessor; +055import org.apache.hadoop.hbase.ServerName; +056import org.apache.hadoop.hbase.client.Admin; +057import org.apache.hadoop.hbase.client.Connection; +058import org.apache.hadoop.hbase.client.ConnectionFactory; +059import org.apache.hadoop.hbase.client.RegionInfo; +060import org.apache.hadoop.hbase.client.ResultScanner; +061import org.apache.hadoop.hbase.client.Scan; +062import org.apache.hadoop.hbase.client.Table; +063import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +064import org.apache.yetus.audience.InterfaceAudience; +065import org.slf4j.Logger; +066import org.slf4j.LoggerFactory; +067 +068import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +069import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +070 +071/** +072 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command +073 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode +074 * acknowledges if regions are online after movement while noAck mode is best effort mode that +075 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck +076 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it +077 * anyways. This can also be used by constructiong an Object using the builder and then calling +078 * {@link #load()} or {@link #unload()} methods for the desired operations. +079 */ +080@InterfaceAudience.Public +081public class RegionMover extends AbstractHBaseTool implements Closeable { +082 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; +083 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; +084 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; +085 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; +086 public static final int DEFAULT_MOVE_WAIT_MAX = 60; +087 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; +088 static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); +089 private RegionMoverBuilder rmbuilder; +090 private boolean ack = true; +091 private int maxthreads = 1; +092 private int timeout; +093 private String loadUnload; +094 private String hostname; +095 private String filename; +096 private String excludeFile; +097 private int port; +098 private Connection conn; +099 private Admin admin; +100 +101 private RegionMover(RegionMoverBuilder builder) throws IOException { +102 this.hostname = builder.hostname; +103 this.filename = builder.filename; +104 this.excludeFile = builder.excludeFile; +105 this.maxthreads = builder.maxthreads; +106 this.ack = builder.ack; +107 this.port = builder.port; +108 this.timeout = builder.timeout; +109 setConf(builder.conf); +110 this.conn = ConnectionFactory.createConnection(conf); +111 this.admin = conn.getAdmin(); +112 } +113 +114 private RegionMover() { +115 } +116 +117 @Override +118 public void close() { +119 IOUtils.closeQuietly(this.admin); +120 IOUtils.closeQuietly(this.conn); +121 } +122 +123 /** +124 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has +125 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, +126 * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options +127 */ +128 public static class RegionMoverBuilder { +129 private boolean ack = true; +130 private int maxthreads = 1; +131 private int timeout = Integer.MAX_VALUE; +132 private String hostname; +133 private String filename; +134 private String excludeFile = null; +135 private String defaultDir = System.getProperty("java.io.tmpdir"); +136 @VisibleForTesting +137 final int port; +138 private final Configuration conf; +139 +140 public RegionMoverBuilder(String hostname) { +141 this(hostname, createConf()); 142 } 143 144 /** -145 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname -146 * or hostname:port. -147 * @param conf Configuration object -148 */ -149 public RegionMoverBuilder(String hostname, Configuration conf) { -150 String[] splitHostname = hostname.toLowerCase().split(":"); -151 this.hostname = splitHostname[0]; -152 if (splitHostname.length == 2) { -153 this.port = Integer.parseInt(splitHostname[1]); -154 } else { -155 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); -156 } -157 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname -158 + ":" + Integer.toString(this.port); -159 this.conf = conf; -160 } -161 -162 /** -163 * Path of file where regions will be written to during unloading/read from during loading -164 * @param filename -165 * @return RegionMoverBuilder object -166 */ -167 public RegionMoverBuilder filename(String filename) { -168 this.filename = filename; -169 return this; -170 } -171 -172 /** -173 * Set the max number of threads that will be used to move regions -174 */ -175 public RegionMoverBuilder maxthreads(int threads) { -176 this.maxthreads = threads; -177 return this; -178 } -179 -180 /** -181 * Path of file containing hostnames to be excluded during region movement. Exclude file should -182 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single -183 * host. -184 */ -185 public RegionMoverBuilder excludeFile(String excludefile) { -186 this.excludeFile = excludefile; -187 return this; -188 } -189 -190 /** -191 * Set ack/noAck mode. -192 * <p> -193 * In ack mode regions are acknowledged before and after moving and the move is retried -194 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best -195 * effort mode,each region movement is tried once.This can be used during graceful shutdown as -196 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. -197 * <p> -198 * @param ack -199 * @return RegionMoverBuilder object -200 */ -201 public RegionMoverBuilder ack(boolean ack) { -202 this.ack = ack; -203 return this; -204 } -205 -206 /** -207 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for -208 * movers also have a separate time which is hbase.move.wait.max * number of regions to -209 * load/unload -210 * @param timeout in seconds -211 * @return RegionMoverBuilder object -212 */ -213 public RegionMoverBuilder timeout(int timeout) { -214 this.timeout = timeout; -215 return this; -216 } -217 -218 /** -219 * This method builds the appropriate RegionMover object which can then be used to load/unload -220 * using load and unload methods -221 * @return RegionMover object -222 */ -223 public RegionMover build() { -224 return new RegionMover(this); -225 } -226 } -227 -228 /** -229 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover -230 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} -231 * @return true if loading succeeded, false otherwise -232 * @throws ExecutionException -233 * @throws InterruptedException if the loader thread was interrupted -234 * @throws TimeoutException -235 */ -236 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { -237 ExecutorService loadPool = Executors.newFixedThreadPool(1); -238 Future<Boolean> loadTask = loadPool.submit(new Load(this)); -239 loadPool.shutdown(); -240 try { -241 if (!loadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { -242 LOG.warn("Timed out before finishing the loading operation. Timeout:" + this.timeout -243 + "sec"); -244 loadPool.shutdownNow(); -245 } -246 } catch (InterruptedException e) { -247 loadPool.shutdownNow(); -248 Thread.currentThread().interrupt(); -249 } -250 try { -251 return loadTask.get(5, TimeUnit.SECONDS); -252 } catch (InterruptedException e) { -253 LOG.warn("Interrupted while loading Regions on " + this.hostname, e); -254 throw e; -255 } catch (ExecutionException e) { -256 LOG.error("Error while loading regions on RegionServer " + this.hostname, e); -257 throw e; -258 } -259 } -260 -261 private class Load implements Callable<Boolean> { -262 -263 private RegionMover rm; -264 -265 public Load(RegionMover rm) { -266 this.rm = rm; -267 } -268 -269 @Override -270 public Boolean call() throws IOException { -271 Connection conn = ConnectionFactory.createConnection(rm.conf); -272 try { -273 List<RegionInfo> regionsToMove = readRegionsFromFile(rm.filename); -274 if (regionsToMove.isEmpty()) { -275 LOG.info("No regions to load.Exiting"); -276 return true; -277 } -278 Admin admin = conn.getAdmin(); -279 try { -280 loadRegions(admin, rm.hostname, rm.port, regionsToMove, rm.ack); -281 } finally { -282 admin.close(); +145 * Creates a new configuration and sets region mover specific overrides +146 */ +147 private static Configuration createConf() { +148 Configuration conf = HBaseConfiguration.create(); +149 conf.setInt("hbase.client.prefetch.limit", 1); +150 conf.setInt("hbase.client.pause", 500); +151 conf.setInt("hbase.client.retries.number", 100); +152 return conf; +153 } +154 +155 /** +156 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname +157 * or hostname:port. +158 * @param conf Configuration object +159 */ +160 public RegionMoverBuilder(String hostname, Configuration conf) { +161 String[] splitHostname = hostname.toLowerCase().split(":"); +162 this.hostname = splitHostname[0]; +163 if (splitHostname.length == 2) { +164 this.port = Integer.parseInt(splitHostname[1]); +165 } else { +166 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); +167 } +168 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname +169 + ":" + Integer.toString(this.port); +170 this.conf = conf; +171 } +172 +173 /** +174 * Path of file where regions will be written to during unloading/read from during loading +175 * @param filename +176 * @return RegionMoverBuilder object +177 */ +178 public RegionMoverBuilder filename(String filename) { +179 this.filename = filename; +180 return this; +181 } +182 +183 /** +184 * Set the max number of threads that will be used to move regions +185 */ +186 public RegionMoverBuilder maxthreads(int threads) { +187 this.maxthreads = threads; +188 return this; +189 } +190 +191 /** +192 * Path of file containing hostnames to be excluded during region movement. Exclude file should +193 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single +194 * host. +195 */ +196 public RegionMoverBuilder excludeFile(String excludefile) { +197 this.excludeFile = excludefile; +198 return this; +199 } +200 +201 /** +202 * Set ack/noAck mode. +203 * <p> +204 * In ack mode regions are acknowledged before and after moving and the move is retried +205 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best +206 * effort mode,each region movement is tried once.This can be used during graceful shutdown as +207 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. +208 * <p> +209 * @param ack +210 * @return RegionMoverBuilder object +211 */ +212 public RegionMoverBuilder ack(boolean ack) { +213 this.ack = ack; +214 return this; +215 } +216 +217 /** +218 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for +219 * movers also have a separate time which is hbase.move.wait.max * number of regions to +220 * load/unload +221 * @param timeout in seconds +222 * @return RegionMoverBuilder object +223 */ +224 public RegionMoverBuilder timeout(int timeout) { +225 this.timeout = timeout; +226 return this; +227 } +228 +229 /** +230 * This method builds the appropriate RegionMover object which can then be used to load/unload +231 * using load and unload methods +232 * @return RegionMover object +233 */ +234 public RegionMover build() throws IOException { +235 return new RegionMover(this); +236 } +237 } +238 +239 /** +240 * Move Regions and make sure that they are up on the target server.If a region movement fails we +241 * exit as failure +242 */ +243 private class MoveWithAck implements Callable<Boolean> { +244 private RegionInfo region; +245 private ServerName targetServer; +246 private List<RegionInfo> movedRegions; +247 private ServerName sourceServer; +248 +249 public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer, +250 ServerName targetServer, List<RegionInfo> movedRegions) { +251 this.region = regionInfo; +252 this.targetServer = targetServer; +253 this.movedRegions = movedRegions; +254 this.sourceServer = sourceServer; +255 } +256 +257 @Override +258 public Boolean call() throws IOException, InterruptedException { +259 boolean moved = false; +260 int count = 0; +261 int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); +262 int maxWaitInSeconds = +263 admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); +264 long startTime = EnvironmentEdgeManager.currentTime(); +265 boolean sameServer = true; +266 // Assert we can scan the region in its current location +267 isSuccessfulScan(region); +268 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " +269 + targetServer); +270 while (count < retries && sameServer) { +271 if (count > 0) { +272 LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); +273 } +274 count = count + 1; +275 admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName())); +276 long maxWait = startTime + (maxWaitInSeconds * 1000); +277 while (EnvironmentEdgeManager.currentTime() < maxWait) { +278 sameServer = isSameServer(region, sourceServer); +279 if (!sameServer) { +280 break; +281 } +282 Thread.sleep(100); 283 } -284 } catch (Exception e) { -285 LOG.error("Error while loading regions to " + rm.hostname, e); -286 return false; -287 } finally { -288 conn.close(); -289 } -290 return true; -291 } -292 } -293 -294 /** -295 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In -296 * noAck mode we do not make sure that region is successfully online on the target region -297 * server,hence it is best effort.We do not unload regions to hostnames given in -298 * {@link #excludeFile}. -299 * @return true if unloading succeeded, false otherwise -300 * @throws InterruptedException if the unloader thread was interrupted -301 * @throws ExecutionException -302 * @throws TimeoutException -303 */ -304 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { -305 deleteFile(this.filename); -306 ExecutorService unloadPool = Executors.newFixedThreadPool(1); -307 Future<Boolean> unloadTask = unloadPool.submit(new Unload(this)); -308 unloadPool.shutdown(); -309 try { -310 if (!unloadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { -311 LOG.warn("Timed out before finishing the unloading operation. Timeout:" + this.timeout -312 + "sec"); -313 unloadPool.shutdownNow(); -314 } -315 } catch (InterruptedException e) { -316 unloadPool.shutdownNow(); -317 Thread.currentThread().interrupt(); +284 } +285 if (sameServer) { +286 LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer +287 + ",newServer=" + this.targetServer); +288 } else { +289 isSuccessfulScan(region); +290 LOG.info("Moved Region " +291 + region.getRegionNameAsString() +292 + " cost:" +293 + String.format("%.3f", +294 (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); +295 moved = true; +296 movedRegions.add(region); +297 } +298 return moved; +299 } +300 } +301 +302 /** +303 * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the +304 * RS down anyways and not abort on a stuck region. Improves movement performance +305 */ +306 private class MoveWithoutAck implements Callable<Boolean> { +307 private RegionInfo region; +308 private ServerName targetServer; +309 private List<RegionInfo> movedRegions; +310 private ServerName sourceServer; +311 +312 public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer, +313 ServerName targetServer, List<RegionInfo> movedRegions) { +314 this.region = regionInfo; +315 this.targetServer = targetServer; +316 this.movedRegions = movedRegions; +317 this.sourceServer = sourceServer; 318 } -319 try { -320 return unloadTask.get(5, TimeUnit.SECONDS); -321 } catch (InterruptedException e) { -322 LOG.warn("Interrupted while unloading Regions from " + this.hostname, e); -323 throw e; -324 } catch (ExecutionException e) { -325 LOG.error("Error while unloading regions from RegionServer " + this.hostname, e); -326 throw e; -327 } -328 } -329 -330 private class Unload implements Callable<Boolean> { -331 -332 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<RegionInfo>()); -333 private RegionMover rm; -334 -335 public Unload(RegionMover rm) { -336 this.rm = rm; -337 } -338 -339 @Override -340 public Boolean call() throws IOException { -341 Connection conn = ConnectionFactory.createConnection(rm.conf); -342 try { -343 Admin admin = conn.getAdmin(); -344 // Get Online RegionServers -345 ArrayList<String> regionServers = getServers(admin); -346 if (LOG.isDebugEnabled()) { -347 LOG.debug("Online region servers:" + regionServers.toString()); -348 } -349 // Remove the host Region server from target Region Servers list -350 String server = stripServer(regionServers, hostname, port); -351 // Remove RS present in the exclude file -352 stripExcludes(regionServers, rm.excludeFile); -353 stripMaster(regionServers, admin); -354 unloadRegions(admin, server, regionServers, rm.ack, movedRegions); -355 } catch (Exception e) { -356 LOG.error("Error while unloading regions ", e); -357 return false; -358 } finally { -359 try { -360 conn.close(); -361 } catch (IOException e) { -362 // ignore -363 } -364 if (movedRegions != null) { -365 writeFile(rm.filename, movedRegions); -366 } -367 } -368 return true; -369 } -370 } -371 -372 private void loadRegions(Admin admin, String hostname, int port, -373 List<RegionInfo> regionsToMove, boolean ack) throws Exception { -374 String server = null; -375 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<RegionInfo>()); -376 int maxWaitInSeconds = -377 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); -378 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; -379 while ((EnvironmentEdgeManager.currentTime() < maxWait) && (server == null)) { -380 try { -381 ArrayList<String> regionServers = getServers(admin); -382 // Remove the host Region server from target Region Servers list -383 server = stripServer(regionServers, hostname, port); -384 if (server != null) { -385 break; -386 } -387 } catch (IOException e) { -388 LOG.warn("Could not get list of region servers", e); -389 } catch (Exception e) { -390 LOG.info("hostname=" + hostname + " is not up yet, waiting"); -391 } -392 try { -393 Thread.sleep(500); -394 } catch (InterruptedException e) { -395 LOG.error("Interrupted while waiting for " + hostname + " to be up.Quitting now", e); -396 throw e; -397 } -398 } -399 if (server == null) { -400 LOG.error("Host:" + hostname + " is not up.Giving up."); -401 throw new Exception("Host to load regions not online"); -402 } -403 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " -404 + this.maxthreads + " threads.Ack mode:" + this.ack); -405 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); -406 List<Future<Boolean>> taskList = new ArrayList<>(); -407 int counter = 0; -408 while (counter < regionsToMove.size()) { -409 RegionInfo region = regionsToMove.get(counter); -410 String currentServer = getServerNameForRegion(admin, region); -411 if (currentServer == null) { -412 LOG.warn("Could not get server for Region:" + region.getEncodedName() + " moving on"); -413 counter++; -414 continue; -415 } else if (server.equals(currentServer)) { -416 LOG.info("Region " + region.getRegionNameAsString() + -417 " is already on target server=" + server); -418 counter++; -419 continue; -420 } -421 if (ack) { -422 Future<Boolean> task = -423 moveRegionsPool.submit(new MoveWithAck(admin, region, currentServer, server, -424 movedRegions)); -425 taskList.add(task); -426 } else { -427 Future<Boolean> task = -428 moveRegionsPool.submit(new MoveWithoutAck(admin, region, currentServer, server, -429 movedRegions)); -430 taskList.add(task); -431 } -432 counter++; -433 } -434 moveRegionsPool.shutdown(); -435 long timeoutInSeconds = -436 regionsToMove.size() -437 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); -438 try { -439 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { -440 moveRegionsPool.shutdownNow(); -441 } -442 } catch (InterruptedException e) { -443 moveRegionsPool.shutdownNow(); -444 Thread.currentThread().interrupt(); -445 } -446 for (Future<Boolean> future : taskList) { -447 try { -448 // if even after shutdownNow threads are stuck we wait for 5 secs max -449 if (!future.get(5, TimeUnit.SECONDS)) { -450 LOG.error("Was Not able to move region....Exiting Now"); -451 throw new Exception("Could not move region Exception"); -452 } -453 } catch (InterruptedException e) { -454 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); -455 throw e; -456 } catch (ExecutionException e) { -457 LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); -458 throw e; -459 } catch (CancellationException e) { -460 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" -461 + timeoutInSeconds + "secs", e); -462 throw e; -463 } -464 } -465 } -466 -467 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE", -468 justification="FB is wrong; its size is read") -469 private void unloadRegions(Admin admin, String server, ArrayList<String> regionServers, -470 boolean ack, List<RegionInfo> movedRegions) throws Exception { -471 List<RegionInfo> regionsToMove = new ArrayList<>();// FindBugs: DLS_DEAD_LOCAL_STORE -472 regionsToMove = getRegions(this.conf, server); -473 if (regionsToMove.isEmpty()) { -474 LOG.info("No Regions to move....Quitting now"); -475 return; -476 } else if (regionServers.isEmpty()) { -477 LOG.warn("No Regions were moved - no servers available"); -478 throw new Exception("No online region servers"); -479 } -480 while (true) { -481 regionsToMove = getRegions(this.conf, server); -482 regionsToMove.removeAll(movedRegions); -483 if (regionsToMove.isEmpty()) { -484 break; -485 } -486 int counter = 0; -487 LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " -488 + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" -489 + ack); -490 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); -491 List<Future<Boolean>> taskList = new ArrayList<>(); -492 int serverIndex = 0; -493 while (counter < regionsToMove.size()) { -494 if (ack) { -495 Future<Boolean> task = -496 moveRegionsPool.submit(new MoveWithAck(admin, regionsToMove.get(counter), server, -497 regionServers.get(serverIndex), movedRegions)); -498 taskList.add(task); -499 } else { -500 Future<Boolean> task = -501 moveRegionsPool.submit(new MoveWithoutAck(admin, regionsToMove.get(counter), server, -502 regionServers.get(serverIndex), movedRegions)); -503 taskList.add(task); -504 } -505 counter++; -506 serverIndex = (serverIndex + 1) % regionServers.size(); -507 } -508 moveRegionsPool.shutdown(); -509 long timeoutInSeconds = -510 regionsToMove.size() -511 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); -512 try { -513 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { -514 moveRegionsPool.shutdownNow(); -515 } -516 } catch (InterruptedException e) { -517 moveRegionsPool.shutdownNow(); -518 Thread.currentThread().interrupt(); -519 } -520 for (Future<Boolean> future : taskList) { -521 try { -522 // if even after shutdownNow threads are stuck we wait for 5 secs max -523 if (!future.get(5, TimeUnit.SECONDS)) { -524 LOG.error("Was Not able to move region....Exiting Now"); -525 throw new Exception("Could not move region Exception"); -526 } -527 } catch (InterruptedException e) { -528 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); -529 throw e; -530 } catch (ExecutionException e) { -531 LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); -532 throw e; -533 } catch (CancellationException e) { -534 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" -535 + timeoutInSeconds + "secs", e); -536 throw e; -537 } -538 } -539 } -540 } -541 -542 /** -543 * Move Regions and make sure that they are up on the target server.If a region movement fails we -544 * exit as failure -545 */ -546 private class MoveWithAck implements Callable<Boolean> { -547 private Admin admin; -548 private RegionInfo region; -549 private String targetServer; -550 private List<RegionInfo> movedRegions; -551 private String sourceServer; -552 -553 public MoveWithAck(Admin admin, RegionInfo regionInfo, String sourceServer, -554 String targetServer, List<RegionInfo> movedRegions) { -555 this.admin = admin; -556 this.region = regionInfo; -557 this.targetServer = targetServer; -558 this.movedRegions = movedRegions; -559 this.sourceServer = sourceServer; -560 } -561 -562 @Override -563 public Boolean call() throws IOException, InterruptedException { -564 boolean moved = false; -565 int count = 0; -566 int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); -567 int maxWaitInSeconds = -568 admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); -569 long startTime = EnvironmentEdgeManager.currentTime(); -570 boolean sameServer = true; -571 // Assert we can scan the region in its current location -572 isSuccessfulScan(admin, region); -573 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " -574 + targetServer); -575 while (count < retries && sameServer) { -576 if (count > 0) { -577 LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); -578 } -579 count = count + 1; -580 admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); -581 long maxWait = startTime + (maxWaitInSeconds * 1000); -582 while (EnvironmentEdgeManager.currentTime() < maxWait) { -583 sameServer = isSameServer(admin, region, sourceServer); -584 if (!sameServer) { -585 break; -586 } -587 Thread.sleep(100); -588 } -589 } -590 if (sameServer) { -591 LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer -592 + ",newServer=" + this.targetServer); -593 } else { -594 isSuccessfulScan(admin, region); -595 LOG.info("Moved Region " -596 + region.getRegionNameAsString() -597 + " cost:" -598 + String.format("%.3f", -599 (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); -600 moved = true; -601 movedRegions.add(region); -602 } -603 return moved; -604 } -605 } -606 -607 /** -608 * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the -609 * RS down anyways and not abort on a stuck region. Improves movement performance -610 */ -611 private static class MoveWithoutAck implements Callable<Boolean> { -612 private Admin admin; -613 private RegionInfo region; -614 private String targetServer; -615 private List<RegionInfo> movedRegions; -616 private String sourceServer; -617 -618 public MoveWithoutAck(Admin admin, RegionInfo regionInfo, String sourceServer, -619 String targetServer, List<RegionInfo> movedRegions) {