From commits-return-67059-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Sun Feb 4 16:14:17 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id DB87918064A for ; Sun, 4 Feb 2018 16:14:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CACAC160C61; Sun, 4 Feb 2018 15:14:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5A192160C37 for ; Sun, 4 Feb 2018 16:14:15 +0100 (CET) Received: (qmail 18914 invoked by uid 500); 4 Feb 2018 15:14:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 18904 invoked by uid 99); 4 Feb 2018 15:14:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Feb 2018 15:14:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55D27F4DD3; Sun, 4 Feb 2018 15:14:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sun, 04 Feb 2018 15:14:12 -0000 Message-Id: <2249bb5ddff74eb1ac2ed001d482235c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/51] [partial] hbase-site git commit: Published site at . Repository: hbase-site Updated Branches: refs/heads/asf-site 250fddb76 -> 6674e3ab7 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6674e3ab/devapidocs/src-html/org/apache/hadoop/hbase/master/SplitLogManager.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/SplitLogManager.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/SplitLogManager.html index 2939a56..681e263 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/SplitLogManager.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/SplitLogManager.html @@ -61,602 +61,608 @@ 053import org.apache.hadoop.hbase.monitoring.TaskMonitor; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.FSUtils; -056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -057import org.apache.yetus.audience.InterfaceAudience; -058import org.slf4j.Logger; -059import org.slf4j.LoggerFactory; -060import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -061 -062/** -063 * Distributes the task of log splitting to the available region servers. -064 * Coordination happens via coordination engine. For every log file that has to be split a -065 * task is created. SplitLogWorkers race to grab a task. -066 * -067 * <p>SplitLogManager monitors the tasks that it creates using the -068 * timeoutMonitor thread. If a task's progress is slow then -069 * {@link SplitLogManagerCoordination#checkTasks} will take away the -070 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} -071 * and the task will be up for grabs again. When the task is done then it is -072 * deleted by SplitLogManager. -073 * -074 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's -075 * log files. The caller thread waits in this method until all the log files -076 * have been split. -077 * -078 * <p>All the coordination calls made by this class are asynchronous. This is mainly -079 * to help reduce response time seen by the callers. -080 * -081 * <p>There is race in this design between the SplitLogManager and the -082 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality -083 * already been completed by a SplitLogWorker. We rely on the idempotency of -084 * the log splitting task for correctness. -085 * -086 * <p>It is also assumed that every log splitting task is unique and once -087 * completed (either with success or with error) it will be not be submitted -088 * again. If a task is resubmitted then there is a risk that old "delete task" -089 * can delete the re-submission. -090 */ -091@InterfaceAudience.Private -092public class SplitLogManager { -093 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); -094 -095 private final MasterServices server; -096 -097 private final Configuration conf; -098 private final ChoreService choreService; -099 -100 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min -101 -102 private long unassignedTimeout; -103 private long lastTaskCreateTime = Long.MAX_VALUE; -104 -105 @VisibleForTesting -106 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>(); -107 private TimeoutMonitor timeoutMonitor; -108 -109 private volatile Set<ServerName> deadWorkers = null; -110 private final Object deadWorkersLock = new Object(); -111 -112 /** -113 * Its OK to construct this object even when region-servers are not online. It does lookup the -114 * orphan tasks in coordination engine but it doesn't block waiting for them to be done. -115 * @param master the master services -116 * @param conf the HBase configuration -117 * @throws IOException -118 */ -119 public SplitLogManager(MasterServices master, Configuration conf) -120 throws IOException { -121 this.server = master; -122 this.conf = conf; -123 this.choreService = new ChoreService(master.getServerName() + "_splitLogManager_"); -124 if (server.getCoordinatedStateManager() != null) { -125 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); -126 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); -127 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); -128 coordination.setDetails(details); -129 coordination.init(); -130 } -131 this.unassignedTimeout = -132 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); -133 this.timeoutMonitor = -134 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), -135 master); -136 choreService.scheduleChore(timeoutMonitor); -137 } -138 -139 private SplitLogManagerCoordination getSplitLogManagerCoordination() { -140 return server.getCoordinatedStateManager().getSplitLogManagerCoordination(); -141 } -142 -143 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { -144 return getFileList(conf, logDirs, filter); -145 } -146 -147 /** -148 * Get a list of paths that need to be split given a set of server-specific directories and -149 * optionally a filter. -150 * -151 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory -152 * layout. -153 * -154 * Should be package-private, but is needed by -155 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, -156 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests. -157 */ -158 @VisibleForTesting -159 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs, -160 final PathFilter filter) -161 throws IOException { -162 List<FileStatus> fileStatus = new ArrayList<>(); -163 for (Path logDir : logDirs) { -164 final FileSystem fs = logDir.getFileSystem(conf); -165 if (!fs.exists(logDir)) { -166 LOG.warn(logDir + " doesn't exist. Nothing to do!"); -167 continue; -168 } -169 FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter); -170 if (logfiles == null || logfiles.length == 0) { -171 LOG.info(logDir + " is empty dir, no logs to split"); -172 } else { -173 Collections.addAll(fileStatus, logfiles); +056import org.apache.hadoop.hbase.util.HasThread; +057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +058import org.apache.yetus.audience.InterfaceAudience; +059import org.slf4j.Logger; +060import org.slf4j.LoggerFactory; +061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +062 +063/** +064 * Distributes the task of log splitting to the available region servers. +065 * Coordination happens via coordination engine. For every log file that has to be split a +066 * task is created. SplitLogWorkers race to grab a task. +067 * +068 * <p>SplitLogManager monitors the tasks that it creates using the +069 * timeoutMonitor thread. If a task's progress is slow then +070 * {@link SplitLogManagerCoordination#checkTasks} will take away the +071 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} +072 * and the task will be up for grabs again. When the task is done then it is +073 * deleted by SplitLogManager. +074 * +075 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's +076 * log files. The caller thread waits in this method until all the log files +077 * have been split. +078 * +079 * <p>All the coordination calls made by this class are asynchronous. This is mainly +080 * to help reduce response time seen by the callers. +081 * +082 * <p>There is race in this design between the SplitLogManager and the +083 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality +084 * already been completed by a SplitLogWorker. We rely on the idempotency of +085 * the log splitting task for correctness. +086 * +087 * <p>It is also assumed that every log splitting task is unique and once +088 * completed (either with success or with error) it will be not be submitted +089 * again. If a task is resubmitted then there is a risk that old "delete task" +090 * can delete the re-submission. +091 */ +092@InterfaceAudience.Private +093public class SplitLogManager { +094 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); +095 +096 private final MasterServices server; +097 +098 private final Configuration conf; +099 private final ChoreService choreService; +100 +101 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min +102 +103 private long unassignedTimeout; +104 private long lastTaskCreateTime = Long.MAX_VALUE; +105 +106 @VisibleForTesting +107 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>(); +108 private TimeoutMonitor timeoutMonitor; +109 +110 private volatile Set<ServerName> deadWorkers = null; +111 private final Object deadWorkersLock = new Object(); +112 +113 /** +114 * Its OK to construct this object even when region-servers are not online. It does lookup the +115 * orphan tasks in coordination engine but it doesn't block waiting for them to be done. +116 * @param master the master services +117 * @param conf the HBase configuration +118 * @throws IOException +119 */ +120 public SplitLogManager(MasterServices master, Configuration conf) +121 throws IOException { +122 this.server = master; +123 this.conf = conf; +124 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread. +125 // For example, in tests. +126 String name = master instanceof HasThread? ((HasThread)master).getName(): +127 master.getServerName().toShortString(); +128 this.choreService = +129 new ChoreService(name + ".splitLogManager."); +130 if (server.getCoordinatedStateManager() != null) { +131 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); +132 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); +133 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); +134 coordination.setDetails(details); +135 coordination.init(); +136 } +137 this.unassignedTimeout = +138 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); +139 this.timeoutMonitor = +140 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), +141 master); +142 choreService.scheduleChore(timeoutMonitor); +143 } +144 +145 private SplitLogManagerCoordination getSplitLogManagerCoordination() { +146 return server.getCoordinatedStateManager().getSplitLogManagerCoordination(); +147 } +148 +149 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { +150 return getFileList(conf, logDirs, filter); +151 } +152 +153 /** +154 * Get a list of paths that need to be split given a set of server-specific directories and +155 * optionally a filter. +156 * +157 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory +158 * layout. +159 * +160 * Should be package-private, but is needed by +161 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, +162 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests. +163 */ +164 @VisibleForTesting +165 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs, +166 final PathFilter filter) +167 throws IOException { +168 List<FileStatus> fileStatus = new ArrayList<>(); +169 for (Path logDir : logDirs) { +170 final FileSystem fs = logDir.getFileSystem(conf); +171 if (!fs.exists(logDir)) { +172 LOG.warn(logDir + " doesn't exist. Nothing to do!"); +173 continue; 174 } -175 } -176 FileStatus[] a = new FileStatus[fileStatus.size()]; -177 return fileStatus.toArray(a); -178 } -179 -180 /** -181 * @param logDir one region sever wal dir path in .logs -182 * @throws IOException if there was an error while splitting any log file -183 * @return cumulative size of the logfiles split -184 * @throws IOException -185 */ -186 public long splitLogDistributed(final Path logDir) throws IOException { -187 List<Path> logDirs = new ArrayList<>(); -188 logDirs.add(logDir); -189 return splitLogDistributed(logDirs); -190 } -191 -192 /** -193 * The caller will block until all the log files of the given region server have been processed - -194 * successfully split or an error is encountered - by an available worker region server. This -195 * method must only be called after the region servers have been brought online. -196 * @param logDirs List of log dirs to split -197 * @throws IOException If there was an error while splitting any log file -198 * @return cumulative size of the logfiles split -199 */ -200 public long splitLogDistributed(final List<Path> logDirs) throws IOException { -201 if (logDirs.isEmpty()) { -202 return 0; -203 } -204 Set<ServerName> serverNames = new HashSet<>(); -205 for (Path logDir : logDirs) { -206 try { -207 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir); -208 if (serverName != null) { -209 serverNames.add(serverName); -210 } -211 } catch (IllegalArgumentException e) { -212 // ignore invalid format error. -213 LOG.warn("Cannot parse server name from " + logDir); -214 } -215 } -216 return splitLogDistributed(serverNames, logDirs, null); -217 } -218 -219 /** -220 * The caller will block until all the hbase:meta log files of the given region server have been -221 * processed - successfully split or an error is encountered - by an available worker region -222 * server. This method must only be called after the region servers have been brought online. -223 * @param logDirs List of log dirs to split -224 * @param filter the Path filter to select specific files for considering -225 * @throws IOException If there was an error while splitting any log file -226 * @return cumulative size of the logfiles split -227 */ -228 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, -229 PathFilter filter) throws IOException { -230 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " + -231 logDirs + " for serverName=" + serverNames); -232 FileStatus[] logfiles = getFileList(logDirs, filter); -233 status.setStatus("Checking directory contents..."); -234 SplitLogCounters.tot_mgr_log_split_batch_start.increment(); -235 LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + -236 " for " + serverNames); -237 long t = EnvironmentEdgeManager.currentTime(); -238 long totalSize = 0; -239 TaskBatch batch = new TaskBatch(); -240 for (FileStatus lf : logfiles) { -241 // TODO If the log file is still being written to - which is most likely -242 // the case for the last log file - then its length will show up here -243 // as zero. The size of such a file can only be retrieved after -244 // recover-lease is done. totalSize will be under in most cases and the -245 // metrics that it drives will also be under-reported. -246 totalSize += lf.getLen(); -247 String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf); -248 if (!enqueueSplitTask(pathToLog, batch)) { -249 throw new IOException("duplicate log split scheduled for " + lf.getPath()); -250 } -251 } -252 waitForSplittingCompletion(batch, status); -253 -254 if (batch.done != batch.installed) { -255 batch.isDead = true; -256 SplitLogCounters.tot_mgr_log_split_batch_err.increment(); -257 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed -258 + " but only " + batch.done + " done"); -259 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; -260 status.abort(msg); -261 throw new IOException(msg); -262 } -263 for (Path logDir : logDirs) { -264 status.setStatus("Cleaning up log directory..."); -265 final FileSystem fs = logDir.getFileSystem(conf); -266 try { -267 if (fs.exists(logDir) && !fs.delete(logDir, false)) { -268 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); -269 } -270 } catch (IOException ioe) { -271 FileStatus[] files = fs.listStatus(logDir); -272 if (files != null && files.length > 0) { -273 LOG.warn("Returning success without actually splitting and " -274 + "deleting all the log files in path " + logDir + ": " -275 + Arrays.toString(files), ioe); -276 } else { -277 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); -278 } -279 } -280 SplitLogCounters.tot_mgr_log_split_batch_success.increment(); -281 } -282 String msg = -283 "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed -284 + " log files in " + logDirs + " in " -285 + (EnvironmentEdgeManager.currentTime() - t) + "ms"; -286 status.markComplete(msg); -287 LOG.info(msg); -288 return totalSize; -289 } -290 -291 /** -292 * Add a task entry to coordination if it is not already there. -293 * @param taskname the path of the log to be split -294 * @param batch the batch this task belongs to -295 * @return true if a new entry is created, false if it is already there. -296 */ -297 boolean enqueueSplitTask(String taskname, TaskBatch batch) { -298 lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); -299 String task = getSplitLogManagerCoordination().prepareTask(taskname); -300 Task oldtask = createTaskIfAbsent(task, batch); -301 if (oldtask == null) { -302 // publish the task in the coordination engine -303 getSplitLogManagerCoordination().submitTask(task); -304 return true; -305 } -306 return false; -307 } -308 -309 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) { -310 synchronized (batch) { -311 while ((batch.done + batch.error) != batch.installed) { -312 try { -313 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" -314 + batch.installed + " done=" + batch.done + " error=" + batch.error); -315 int remaining = batch.installed - (batch.done + batch.error); -316 int actual = activeTasks(batch); -317 if (remaining != actual) { -318 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); -319 } -320 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); -321 if (remainingTasks >= 0 && actual > remainingTasks) { -322 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " -323 + remainingTasks); -324 } -325 if (remainingTasks == 0 || actual == 0) { -326 LOG.warn("No more task remaining, splitting " -327 + "should have completed. Remaining tasks is " + remainingTasks -328 + ", active tasks in map " + actual); -329 if (remainingTasks == 0 && actual == 0) { -330 return; -331 } -332 } -333 batch.wait(100); -334 if (server.isStopped()) { -335 LOG.warn("Stopped while waiting for log splits to be completed"); -336 return; -337 } -338 } catch (InterruptedException e) { -339 LOG.warn("Interrupted while waiting for log splits to be completed"); -340 Thread.currentThread().interrupt(); -341 return; -342 } -343 } -344 } -345 } -346 -347 @VisibleForTesting -348 ConcurrentMap<String, Task> getTasks() { -349 return tasks; -350 } -351 -352 private int activeTasks(final TaskBatch batch) { -353 int count = 0; -354 for (Task t : tasks.values()) { -355 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { -356 count++; -357 } -358 } -359 return count; -360 -361 } -362 -363 /** -364 * @param path -365 * @param batch -366 * @return null on success, existing task on error -367 */ -368 private Task createTaskIfAbsent(String path, TaskBatch batch) { -369 Task oldtask; -370 // batch.installed is only changed via this function and -371 // a single thread touches batch.installed. -372 Task newtask = new Task(); -373 newtask.batch = batch; -374 oldtask = tasks.putIfAbsent(path, newtask); -375 if (oldtask == null) { -376 batch.installed++; -377 return null; -378 } -379 // new task was not used. -380 synchronized (oldtask) { -381 if (oldtask.isOrphan()) { -382 if (oldtask.status == SUCCESS) { -383 // The task is already done. Do not install the batch for this -384 // task because it might be too late for setDone() to update -385 // batch.done. There is no need for the batch creator to wait for -386 // this task to complete. -387 return (null); -388 } -389 if (oldtask.status == IN_PROGRESS) { -390 oldtask.batch = batch; -391 batch.installed++; -392 LOG.debug("Previously orphan task " + path + " is now being waited upon"); -393 return null; +175 FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter); +176 if (logfiles == null || logfiles.length == 0) { +177 LOG.info(logDir + " is empty dir, no logs to split"); +178 } else { +179 Collections.addAll(fileStatus, logfiles); +180 } +181 } +182 FileStatus[] a = new FileStatus[fileStatus.size()]; +183 return fileStatus.toArray(a); +184 } +185 +186 /** +187 * @param logDir one region sever wal dir path in .logs +188 * @throws IOException if there was an error while splitting any log file +189 * @return cumulative size of the logfiles split +190 * @throws IOException +191 */ +192 public long splitLogDistributed(final Path logDir) throws IOException { +193 List<Path> logDirs = new ArrayList<>(); +194 logDirs.add(logDir); +195 return splitLogDistributed(logDirs); +196 } +197 +198 /** +199 * The caller will block until all the log files of the given region server have been processed - +200 * successfully split or an error is encountered - by an available worker region server. This +201 * method must only be called after the region servers have been brought online. +202 * @param logDirs List of log dirs to split +203 * @throws IOException If there was an error while splitting any log file +204 * @return cumulative size of the logfiles split +205 */ +206 public long splitLogDistributed(final List<Path> logDirs) throws IOException { +207 if (logDirs.isEmpty()) { +208 return 0; +209 } +210 Set<ServerName> serverNames = new HashSet<>(); +211 for (Path logDir : logDirs) { +212 try { +213 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir); +214 if (serverName != null) { +215 serverNames.add(serverName); +216 } +217 } catch (IllegalArgumentException e) { +218 // ignore invalid format error. +219 LOG.warn("Cannot parse server name from " + logDir); +220 } +221 } +222 return splitLogDistributed(serverNames, logDirs, null); +223 } +224 +225 /** +226 * The caller will block until all the hbase:meta log files of the given region server have been +227 * processed - successfully split or an error is encountered - by an available worker region +228 * server. This method must only be called after the region servers have been brought online. +229 * @param logDirs List of log dirs to split +230 * @param filter the Path filter to select specific files for considering +231 * @throws IOException If there was an error while splitting any log file +232 * @return cumulative size of the logfiles split +233 */ +234 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, +235 PathFilter filter) throws IOException { +236 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " + +237 logDirs + " for serverName=" + serverNames); +238 FileStatus[] logfiles = getFileList(logDirs, filter); +239 status.setStatus("Checking directory contents..."); +240 SplitLogCounters.tot_mgr_log_split_batch_start.increment(); +241 LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + +242 " for " + serverNames); +243 long t = EnvironmentEdgeManager.currentTime(); +244 long totalSize = 0; +245 TaskBatch batch = new TaskBatch(); +246 for (FileStatus lf : logfiles) { +247 // TODO If the log file is still being written to - which is most likely +248 // the case for the last log file - then its length will show up here +249 // as zero. The size of such a file can only be retrieved after +250 // recover-lease is done. totalSize will be under in most cases and the +251 // metrics that it drives will also be under-reported. +252 totalSize += lf.getLen(); +253 String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf); +254 if (!enqueueSplitTask(pathToLog, batch)) { +255 throw new IOException("duplicate log split scheduled for " + lf.getPath()); +256 } +257 } +258 waitForSplittingCompletion(batch, status); +259 +260 if (batch.done != batch.installed) { +261 batch.isDead = true; +262 SplitLogCounters.tot_mgr_log_split_batch_err.increment(); +263 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed +264 + " but only " + batch.done + " done"); +265 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; +266 status.abort(msg); +267 throw new IOException(msg); +268 } +269 for (Path logDir : logDirs) { +270 status.setStatus("Cleaning up log directory..."); +271 final FileSystem fs = logDir.getFileSystem(conf); +272 try { +273 if (fs.exists(logDir) && !fs.delete(logDir, false)) { +274 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); +275 } +276 } catch (IOException ioe) { +277 FileStatus[] files = fs.listStatus(logDir); +278 if (files != null && files.length > 0) { +279 LOG.warn("Returning success without actually splitting and " +280 + "deleting all the log files in path " + logDir + ": " +281 + Arrays.toString(files), ioe); +282 } else { +283 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); +284 } +285 } +286 SplitLogCounters.tot_mgr_log_split_batch_success.increment(); +287 } +288 String msg = +289 "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed +290 + " log files in " + logDirs + " in " +291 + (EnvironmentEdgeManager.currentTime() - t) + "ms"; +292 status.markComplete(msg); +293 LOG.info(msg); +294 return totalSize; +295 } +296 +297 /** +298 * Add a task entry to coordination if it is not already there. +299 * @param taskname the path of the log to be split +300 * @param batch the batch this task belongs to +301 * @return true if a new entry is created, false if it is already there. +302 */ +303 boolean enqueueSplitTask(String taskname, TaskBatch batch) { +304 lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); +305 String task = getSplitLogManagerCoordination().prepareTask(taskname); +306 Task oldtask = createTaskIfAbsent(task, batch); +307 if (oldtask == null) { +308 // publish the task in the coordination engine +309 getSplitLogManagerCoordination().submitTask(task); +310 return true; +311 } +312 return false; +313 } +314 +315 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) { +316 synchronized (batch) { +317 while ((batch.done + batch.error) != batch.installed) { +318 try { +319 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" +320 + batch.installed + " done=" + batch.done + " error=" + batch.error); +321 int remaining = batch.installed - (batch.done + batch.error); +322 int actual = activeTasks(batch); +323 if (remaining != actual) { +324 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); +325 } +326 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); +327 if (remainingTasks >= 0 && actual > remainingTasks) { +328 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " +329 + remainingTasks); +330 } +331 if (remainingTasks == 0 || actual == 0) { +332 LOG.warn("No more task remaining, splitting " +333 + "should have completed. Remaining tasks is " + remainingTasks +334 + ", active tasks in map " + actual); +335 if (remainingTasks == 0 && actual == 0) { +336 return; +337 } +338 } +339 batch.wait(100); +340 if (server.isStopped()) { +341 LOG.warn("Stopped while waiting for log splits to be completed"); +342 return; +343 } +344 } catch (InterruptedException e) { +345 LOG.warn("Interrupted while waiting for log splits to be completed"); +346 Thread.currentThread().interrupt(); +347 return; +348 } +349 } +350 } +351 } +352 +353 @VisibleForTesting +354 ConcurrentMap<String, Task> getTasks() { +355 return tasks; +356 } +357 +358 private int activeTasks(final TaskBatch batch) { +359 int count = 0; +360 for (Task t : tasks.values()) { +361 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { +362 count++; +363 } +364 } +365 return count; +366 +367 } +368 +369 /** +370 * @param path +371 * @param batch +372 * @return null on success, existing task on error +373 */ +374 private Task createTaskIfAbsent(String path, TaskBatch batch) { +375 Task oldtask; +376 // batch.installed is only changed via this function and +377 // a single thread touches batch.installed. +378 Task newtask = new Task(); +379 newtask.batch = batch; +380 oldtask = tasks.putIfAbsent(path, newtask); +381 if (oldtask == null) { +382 batch.installed++; +383 return null; +384 } +385 // new task was not used. +386 synchronized (oldtask) { +387 if (oldtask.isOrphan()) { +388 if (oldtask.status == SUCCESS) { +389 // The task is already done. Do not install the batch for this +390 // task because it might be too late for setDone() to update +391 // batch.done. There is no need for the batch creator to wait for +392 // this task to complete. +393 return (null); 394 } -395 while (oldtask.status == FAILURE) { -396 LOG.debug("wait for status of task " + path + " to change to DELETED"); -397 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); -398 try { -399 oldtask.wait(); -400 } catch (InterruptedException e) { -401 Thread.currentThread().interrupt(); -402 LOG.warn("Interrupted when waiting for znode delete callback"); -403 // fall through to return failure -404 break; -405 } -406 } -407 if (oldtask.status != DELETED) { -408 LOG.warn("Failure because previously failed task" -409 + " state still present. Waiting for znode delete callback" + " path=" + path); -410 return oldtask; -411 } -412 // reinsert the newTask and it must succeed this time -413 Task t = tasks.putIfAbsent(path, newtask); -414 if (t == null) { -415 batch.installed++; -416 return null; +395 if (oldtask.status == IN_PROGRESS) { +396 oldtask.batch = batch; +397 batch.installed++; +398 LOG.debug("Previously orphan task " + path + " is now being waited upon"); +399 return null; +400 } +401 while (oldtask.status == FAILURE) { +402 LOG.debug("wait for status of task " + path + " to change to DELETED"); +403 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); +404 try { +405 oldtask.wait(); +406 } catch (InterruptedException e) { +407 Thread.currentThread().interrupt(); +408 LOG.warn("Interrupted when waiting for znode delete callback"); +409 // fall through to return failure +410 break; +411 } +412 } +413 if (oldtask.status != DELETED) { +414 LOG.warn("Failure because previously failed task" +415 + " state still present. Waiting for znode delete callback" + " path=" + path); +416 return oldtask; 417 } -418 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map"); -419 assert false : "Deleted task still present in tasks map"; -420 return t; -421 } -422 LOG.warn("Failure because two threads can't wait for the same task; path=" + path); -423 return oldtask; -424 } -425 } -426 -427 public void stop() { -428 if (choreService != null) { -429 choreService.shutdown(); +418 // reinsert the newTask and it must succeed this time +419 Task t = tasks.putIfAbsent(path, newtask); +420 if (t == null) { +421 batch.installed++; +422 return null; +423 } +424 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map"); +425 assert false : "Deleted task still present in tasks map"; +426 return t; +427 } +428 LOG.warn("Failure because two threads can't wait for the same task; path=" + path); +429 return oldtask; 430 } -431 if (timeoutMonitor != null) { -432 timeoutMonitor.cancel(true); -433 } -434 } -435 -436 void handleDeadWorker(ServerName workerName) { -437 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier -438 // to reason about concurrency. Makes it easier to retry. -439 synchronized (deadWorkersLock) { -440 if (deadWorkers == null) { -441 deadWorkers = new HashSet<>(100); -442 } -443 deadWorkers.add(workerName); -444 } -445 LOG.info("dead splitlog worker " + workerName); -446 } -447 -448 void handleDeadWorkers(Set<ServerName> serverNames) { -449 synchronized (deadWorkersLock) { -450 if (deadWorkers == null) { -451 deadWorkers = new HashSet<>(100); -452 } -453 deadWorkers.addAll(serverNames); -454 } -455 LOG.info("dead splitlog workers " + serverNames); -456 } -457 -458 /** -459 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). -460 * Clients threads use this object to wait for all their tasks to be done. -461 * <p> -462 * All access is synchronized. -463 */ -464 @InterfaceAudience.Private -465 public static class TaskBatch { -466 public int installed = 0; -467 public int done = 0; -468 public int error = 0; -469 public volatile boolean isDead = false; -470 -471 @Override -472 public String toString() { -473 return ("installed = " + installed + " done = " + done + " error = " + error); -474 } -475 } +431 } +432 +433 public void stop() { +434 if (choreService != null) { +435 choreService.shutdown(); +436 } +437 if (timeoutMonitor != null) { +438 timeoutMonitor.cancel(true); +439 } +440 } +441 +442 void handleDeadWorker(ServerName workerName) { +443 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier +444 // to reason about concurrency. Makes it easier to retry. +445 synchronized (deadWorkersLock) { +446 if (deadWorkers == null) { +447 deadWorkers = new HashSet<>(100); +448 } +449 deadWorkers.add(workerName); +450 } +451 LOG.info("dead splitlog worker " + workerName); +452 } +453 +454 void handleDeadWorkers(Set<ServerName> serverNames) { +455 synchronized (deadWorkersLock) { +456 if (deadWorkers == null) { +457 deadWorkers = new HashSet<>(100); +458 } +459 deadWorkers.addAll(serverNames); +460 } +461 LOG.info("dead splitlog workers " + serverNames); +462 } +463 +464 /** +465 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). +466 * Clients threads use this object to wait for all their tasks to be done. +467 * <p> +468 * All access is synchronized. +469 */ +470 @InterfaceAudience.Private +471 public static class TaskBatch { +472 public int installed = 0; +473 public int done = 0; +474 public int error = 0; +475 public volatile boolean isDead = false; 476 -477 /** -478 * in memory state of an active task. -479 */ -480 @InterfaceAudience.Private -481 public static class Task { -482 public volatile long last_update; -483 public volatile int last_version; -484 public volatile ServerName cur_worker_name; -485 public volatile TaskBatch batch; -486 public volatile TerminationStatus status; -487 public volatile AtomicInteger incarnation = new AtomicInteger(0); -488 public final AtomicInteger unforcedResubmits = new AtomicInteger(); -489 public volatile boolean resubmitThresholdReached; -490 -491 @Override -492 public String toString() { -493 return ("last_update = " + last_update + " last_version = " + last_version -494 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " -495 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); -496 } -497 -498 public Task() { -499 last_version = -1; -500 status = IN_PROGRESS; -501 setUnassigned(); +477 @Override +478 public String toString() { +479 return ("installed = " + installed + " done = " + done + " error = " + error); +480 } +481 } +482 +483 /** +484 * in memory state of an active task. +485 */ +486 @InterfaceAudience.Private +487 public static class Task { +488 public volatile long last_update; +489 public volatile int last_version; +490 public volatile ServerName cur_worker_name; +491 public vo