Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 113EB200D29 for ; Wed, 11 Oct 2017 17:13:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0F97A160BE9; Wed, 11 Oct 2017 15:13:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0E276160BEC for ; Wed, 11 Oct 2017 17:13:12 +0200 (CEST) Received: (qmail 10255 invoked by uid 500); 11 Oct 2017 15:13:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 8549 invoked by uid 99); 11 Oct 2017 15:13:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 15:13:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7537DFC3F; Wed, 11 Oct 2017 15:13:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 11 Oct 2017 15:13:28 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/51] [partial] hbase-site git commit: Published site at . archived-at: Wed, 11 Oct 2017 15:13:15 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/c0571676/devapidocs/src-html/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool.html index 9bbfc18..d22fccc 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool.html @@ -54,299 +54,300 @@ 046import org.apache.hadoop.hbase.procedure.Subprocedure; 047import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 048import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; -049import org.apache.hadoop.hbase.regionserver.HRegionServer; -050import org.apache.hadoop.hbase.regionserver.Region; -051import org.apache.hadoop.hbase.regionserver.RegionServerServices; -052import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -053import org.apache.zookeeper.KeeperException; -054 -055/** -056 * This manager class handles flushing of the regions for table on a {@link HRegionServer}. -057 */ -058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -059public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager { -060 private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class); -061 -062 private static final String CONCURENT_FLUSH_TASKS_KEY = -063 "hbase.flush.procedure.region.concurrentTasks"; -064 private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3; -065 -066 public static final String FLUSH_REQUEST_THREADS_KEY = -067 "hbase.flush.procedure.region.pool.threads"; -068 public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10; -069 -070 public static final String FLUSH_TIMEOUT_MILLIS_KEY = -071 "hbase.flush.procedure.region.timeout"; -072 public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; -073 -074 public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = -075 "hbase.flush.procedure.region.wakefrequency"; -076 private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500; -077 -078 private RegionServerServices rss; -079 private ProcedureMemberRpcs memberRpcs; -080 private ProcedureMember member; -081 -082 /** -083 * Exposed for testing. -084 * @param conf HBase configuration. -085 * @param server region server. -086 * @param memberRpc use specified memberRpc instance -087 * @param procMember use specified ProcedureMember -088 */ -089 RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, -090 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { -091 this.rss = server; -092 this.memberRpcs = memberRpc; -093 this.member = procMember; -094 } -095 -096 public RegionServerFlushTableProcedureManager() {} -097 -098 /** -099 * Start accepting flush table requests. -100 */ -101 @Override -102 public void start() { -103 LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString()); -104 this.memberRpcs.start(rss.getServerName().toString(), member); -105 } -106 -107 /** -108 * Close <tt>this</tt> and all running tasks -109 * @param force forcefully stop all running tasks -110 * @throws IOException -111 */ -112 @Override -113 public void stop(boolean force) throws IOException { -114 String mode = force ? "abruptly" : "gracefully"; -115 LOG.info("Stopping region server flush procedure manager " + mode + "."); -116 -117 try { -118 this.member.close(); -119 } finally { -120 this.memberRpcs.close(); -121 } -122 } -123 -124 /** -125 * If in a running state, creates the specified subprocedure to flush table regions. -126 * -127 * Because this gets the local list of regions to flush and not the set the master had, -128 * there is a possibility of a race where regions may be missed. -129 * -130 * @param table -131 * @return Subprocedure to submit to the ProcedureMemeber. -132 */ -133 public Subprocedure buildSubprocedure(String table) { -134 -135 // don't run the subprocedure if the parent is stop(ping) -136 if (rss.isStopping() || rss.isStopped()) { -137 throw new IllegalStateException("Can't start flush region subprocedure on RS: " -138 + rss.getServerName() + ", because stopping/stopped!"); -139 } -140 -141 // check to see if this server is hosting any regions for the table -142 List<Region> involvedRegions; -143 try { -144 involvedRegions = getRegionsToFlush(table); -145 } catch (IOException e1) { -146 throw new IllegalStateException("Failed to figure out if there is region to flush.", e1); -147 } -148 -149 // We need to run the subprocedure even if we have no relevant regions. The coordinator -150 // expects participation in the procedure and without sending message the master procedure -151 // will hang and fail. -152 -153 LOG.debug("Launching subprocedure to flush regions for " + table); -154 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table); -155 Configuration conf = rss.getConfiguration(); -156 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, -157 FLUSH_TIMEOUT_MILLIS_DEFAULT); -158 long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, -159 FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); -160 -161 FlushTableSubprocedurePool taskManager = -162 new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); -163 return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, -164 timeoutMillis, involvedRegions, table, taskManager); -165 } -166 -167 /** -168 * Get the list of regions to flush for the table on this server -169 * -170 * It is possible that if a region moves somewhere between the calls -171 * we'll miss the region. -172 * -173 * @param table -174 * @return the list of online regions. Empty list is returned if no regions. -175 * @throws IOException -176 */ -177 private List<Region> getRegionsToFlush(String table) throws IOException { -178 return rss.getRegions(TableName.valueOf(table)); -179 } -180 -181 public class FlushTableSubprocedureBuilder implements SubprocedureFactory { -182 -183 @Override -184 public Subprocedure buildSubprocedure(String name, byte[] data) { -185 // The name of the procedure instance from the master is the table name. -186 return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); -187 } -188 -189 } -190 -191 /** -192 * We use the FlushTableSubprocedurePool, a class specific thread pool instead of -193 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. -194 * -195 * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of -196 * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation -197 * failures. -198 */ -199 static class FlushTableSubprocedurePool { -200 private final Abortable abortable; -201 private final ExecutorCompletionService<Void> taskPool; -202 private final ThreadPoolExecutor executor; -203 private volatile boolean stopped; -204 private final List<Future<Void>> futures = new ArrayList<>(); -205 private final String name; -206 -207 FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { -208 this.abortable = abortable; -209 // configure the executor service -210 long keepAlive = conf.getLong( -211 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, -212 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); -213 int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); -214 this.name = name; -215 executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS, -216 new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs(" -217 + name + ")-flush-proc-pool")); -218 executor.allowCoreThreadTimeOut(true); -219 taskPool = new ExecutorCompletionService<>(executor); -220 } -221 -222 boolean hasTasks() { -223 return futures.size() != 0; -224 } -225 -226 /** -227 * Submit a task to the pool. -228 * -229 * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. -230 */ -231 void submitTask(final Callable<Void> task) { -232 Future<Void> f = this.taskPool.submit(task); -233 futures.add(f); -234 } -235 -236 /** -237 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. -238 * This *must* be called after all tasks are submitted via submitTask. -239 * -240 * @return <tt>true</tt> on success, <tt>false</tt> otherwise -241 * @throws InterruptedException -242 */ -243 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { -244 LOG.debug("Waiting for local region flush to finish."); -245 -246 int sz = futures.size(); -247 try { -248 // Using the completion service to process the futures. -249 for (int i = 0; i < sz; i++) { -250 Future<Void> f = taskPool.take(); -251 f.get(); -252 if (!futures.remove(f)) { -253 LOG.warn("unexpected future" + f); -254 } -255 LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks."); -256 } -257 LOG.debug("Completed " + sz + " local region flush tasks."); -258 return true; -259 } catch (InterruptedException e) { -260 LOG.warn("Got InterruptedException in FlushSubprocedurePool", e); -261 if (!stopped) { -262 Thread.currentThread().interrupt(); -263 throw new ForeignException("FlushSubprocedurePool", e); -264 } -265 // we are stopped so we can just exit. -266 } catch (ExecutionException e) { -267 Throwable cause = e.getCause(); -268 if (cause instanceof ForeignException) { -269 LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); -270 throw (ForeignException)e.getCause(); -271 } else if (cause instanceof DroppedSnapshotException) { -272 // we have to abort the region server according to contract of flush -273 abortable.abort("Received DroppedSnapshotException, aborting", cause); -274 } -275 LOG.warn("Got Exception in FlushSubprocedurePool", e); -276 throw new ForeignException(name, e.getCause()); -277 } finally { -278 cancelTasks(); -279 } -280 return false; -281 } -282 -283 /** -284 * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running -285 * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). -286 * @throws InterruptedException -287 */ -288 void cancelTasks() throws InterruptedException { -289 Collection<Future<Void>> tasks = futures; -290 LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name); -291 for (Future<Void> f: tasks) { -292 f.cancel(false); -293 } -294 -295 // evict remaining tasks and futures from taskPool. -296 futures.clear(); -297 while (taskPool.poll() != null) {} -298 stop(); -299 } -300 -301 /** -302 * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be -303 * interrupted (see HBASE-13877) -304 */ -305 void stop() { -306 if (this.stopped) return; -307 -308 this.stopped = true; -309 this.executor.shutdown(); -310 } -311 } -312 -313 /** -314 * Initialize this region server flush procedure manager -315 * Uses a zookeeper based member controller. -316 * @param rss region server -317 * @throws KeeperException if the zookeeper cannot be reached -318 */ -319 @Override -320 public void initialize(RegionServerServices rss) throws KeeperException { -321 this.rss = rss; -322 ZooKeeperWatcher zkw = rss.getZooKeeper(); -323 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, -324 MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE); -325 -326 Configuration conf = rss.getConfiguration(); -327 long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); -328 int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); -329 -330 // create the actual flush table procedure member -331 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), -332 opThreads, keepAlive); -333 this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); -334 } -335 -336 @Override -337 public String getProcedureSignature() { -338 return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE; -339 } -340 -341} +049import org.apache.hadoop.hbase.regionserver.HRegion; +050import org.apache.hadoop.hbase.regionserver.HRegionServer; +051import org.apache.hadoop.hbase.regionserver.Region; +052import org.apache.hadoop.hbase.regionserver.RegionServerServices; +053import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +054import org.apache.zookeeper.KeeperException; +055 +056/** +057 * This manager class handles flushing of the regions for table on a {@link HRegionServer}. +058 */ +059@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +060public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager { +061 private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class); +062 +063 private static final String CONCURENT_FLUSH_TASKS_KEY = +064 "hbase.flush.procedure.region.concurrentTasks"; +065 private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3; +066 +067 public static final String FLUSH_REQUEST_THREADS_KEY = +068 "hbase.flush.procedure.region.pool.threads"; +069 public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10; +070 +071 public static final String FLUSH_TIMEOUT_MILLIS_KEY = +072 "hbase.flush.procedure.region.timeout"; +073 public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; +074 +075 public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = +076 "hbase.flush.procedure.region.wakefrequency"; +077 private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500; +078 +079 private RegionServerServices rss; +080 private ProcedureMemberRpcs memberRpcs; +081 private ProcedureMember member; +082 +083 /** +084 * Exposed for testing. +085 * @param conf HBase configuration. +086 * @param server region server. +087 * @param memberRpc use specified memberRpc instance +088 * @param procMember use specified ProcedureMember +089 */ +090 RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, +091 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { +092 this.rss = server; +093 this.memberRpcs = memberRpc; +094 this.member = procMember; +095 } +096 +097 public RegionServerFlushTableProcedureManager() {} +098 +099 /** +100 * Start accepting flush table requests. +101 */ +102 @Override +103 public void start() { +104 LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString()); +105 this.memberRpcs.start(rss.getServerName().toString(), member); +106 } +107 +108 /** +109 * Close <tt>this</tt> and all running tasks +110 * @param force forcefully stop all running tasks +111 * @throws IOException +112 */ +113 @Override +114 public void stop(boolean force) throws IOException { +115 String mode = force ? "abruptly" : "gracefully"; +116 LOG.info("Stopping region server flush procedure manager " + mode + "."); +117 +118 try { +119 this.member.close(); +120 } finally { +121 this.memberRpcs.close(); +122 } +123 } +124 +125 /** +126 * If in a running state, creates the specified subprocedure to flush table regions. +127 * +128 * Because this gets the local list of regions to flush and not the set the master had, +129 * there is a possibility of a race where regions may be missed. +130 * +131 * @param table +132 * @return Subprocedure to submit to the ProcedureMemeber. +133 */ +134 public Subprocedure buildSubprocedure(String table) { +135 +136 // don't run the subprocedure if the parent is stop(ping) +137 if (rss.isStopping() || rss.isStopped()) { +138 throw new IllegalStateException("Can't start flush region subprocedure on RS: " +139 + rss.getServerName() + ", because stopping/stopped!"); +140 } +141 +142 // check to see if this server is hosting any regions for the table +143 List<HRegion> involvedRegions; +144 try { +145 involvedRegions = getRegionsToFlush(table); +146 } catch (IOException e1) { +147 throw new IllegalStateException("Failed to figure out if there is region to flush.", e1); +148 } +149 +150 // We need to run the subprocedure even if we have no relevant regions. The coordinator +151 // expects participation in the procedure and without sending message the master procedure +152 // will hang and fail. +153 +154 LOG.debug("Launching subprocedure to flush regions for " + table); +155 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table); +156 Configuration conf = rss.getConfiguration(); +157 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, +158 FLUSH_TIMEOUT_MILLIS_DEFAULT); +159 long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, +160 FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); +161 +162 FlushTableSubprocedurePool taskManager = +163 new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); +164 return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, +165 timeoutMillis, involvedRegions, table, taskManager); +166 } +167 +168 /** +169 * Get the list of regions to flush for the table on this server +170 * +171 * It is possible that if a region moves somewhere between the calls +172 * we'll miss the region. +173 * +174 * @param table +175 * @return the list of online regions. Empty list is returned if no regions. +176 * @throws IOException +177 */ +178 private List<HRegion> getRegionsToFlush(String table) throws IOException { +179 return (List<HRegion>) rss.getRegions(TableName.valueOf(table)); +180 } +181 +182 public class FlushTableSubprocedureBuilder implements SubprocedureFactory { +183 +184 @Override +185 public Subprocedure buildSubprocedure(String name, byte[] data) { +186 // The name of the procedure instance from the master is the table name. +187 return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); +188 } +189 +190 } +191 +192 /** +193 * We use the FlushTableSubprocedurePool, a class specific thread pool instead of +194 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. +195 * +196 * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of +197 * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation +198 * failures. +199 */ +200 static class FlushTableSubprocedurePool { +201 private final Abortable abortable; +202 private final ExecutorCompletionService<Void> taskPool; +203 private final ThreadPoolExecutor executor; +204 private volatile boolean stopped; +205 private final List<Future<Void>> futures = new ArrayList<>(); +206 private final String name; +207 +208 FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { +209 this.abortable = abortable; +210 // configure the executor service +211 long keepAlive = conf.getLong( +212 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, +213 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); +214 int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); +215 this.name = name; +216 executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS, +217 new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs(" +218 + name + ")-flush-proc-pool")); +219 executor.allowCoreThreadTimeOut(true); +220 taskPool = new ExecutorCompletionService<>(executor); +221 } +222 +223 boolean hasTasks() { +224 return futures.size() != 0; +225 } +226 +227 /** +228 * Submit a task to the pool. +229 * +230 * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. +231 */ +232 void submitTask(final Callable<Void> task) { +233 Future<Void> f = this.taskPool.submit(task); +234 futures.add(f); +235 } +236 +237 /** +238 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. +239 * This *must* be called after all tasks are submitted via submitTask. +240 * +241 * @return <tt>true</tt> on success, <tt>false</tt> otherwise +242 * @throws InterruptedException +243 */ +244 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { +245 LOG.debug("Waiting for local region flush to finish."); +246 +247 int sz = futures.size(); +248 try { +249 // Using the completion service to process the futures. +250 for (int i = 0; i < sz; i++) { +251 Future<Void> f = taskPool.take(); +252 f.get(); +253 if (!futures.remove(f)) { +254 LOG.warn("unexpected future" + f); +255 } +256 LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks."); +257 } +258 LOG.debug("Completed " + sz + " local region flush tasks."); +259 return true; +260 } catch (InterruptedException e) { +261 LOG.warn("Got InterruptedException in FlushSubprocedurePool", e); +262 if (!stopped) { +263 Thread.currentThread().interrupt(); +264 throw new ForeignException("FlushSubprocedurePool", e); +265 } +266 // we are stopped so we can just exit. +267 } catch (ExecutionException e) { +268 Throwable cause = e.getCause(); +269 if (cause instanceof ForeignException) { +270 LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); +271 throw (ForeignException)e.getCause(); +272 } else if (cause instanceof DroppedSnapshotException) { +273 // we have to abort the region server according to contract of flush +274 abortable.abort("Received DroppedSnapshotException, aborting", cause); +275 } +276 LOG.warn("Got Exception in FlushSubprocedurePool", e); +277 throw new ForeignException(name, e.getCause()); +278 } finally { +279 cancelTasks(); +280 } +281 return false; +282 } +283 +284 /** +285 * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running +286 * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). +287 * @throws InterruptedException +288 */ +289 void cancelTasks() throws InterruptedException { +290 Collection<Future<Void>> tasks = futures; +291 LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name); +292 for (Future<Void> f: tasks) { +293 f.cancel(false); +294 } +295 +296 // evict remaining tasks and futures from taskPool. +297 futures.clear(); +298 while (taskPool.poll() != null) {} +299 stop(); +300 } +301 +302 /** +303 * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be +304 * interrupted (see HBASE-13877) +305 */ +306 void stop() { +307 if (this.stopped) return; +308 +309 this.stopped = true; +310 this.executor.shutdown(); +311 } +312 } +313 +314 /** +315 * Initialize this region server flush procedure manager +316 * Uses a zookeeper based member controller. +317 * @param rss region server +318 * @throws KeeperException if the zookeeper cannot be reached +319 */ +320 @Override +321 public void initialize(RegionServerServices rss) throws KeeperException { +322 this.rss = rss; +323 ZooKeeperWatcher zkw = rss.getZooKeeper(); +324 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, +325 MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE); +326 +327 Configuration conf = rss.getConfiguration(); +328 long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); +329 int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); +330 +331 // create the actual flush table procedure member +332 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), +333 opThreads, keepAlive); +334 this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); +335 } +336 +337 @Override +338 public String getProcedureSignature() { +339 return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE; +340 } +341 +342}