From commits-return-79879-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Thu Oct 25 16:54:48 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7755E1807BD for ; Thu, 25 Oct 2018 16:54:43 +0200 (CEST) Received: (qmail 2649 invoked by uid 500); 25 Oct 2018 14:54:40 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 1241 invoked by uid 99); 25 Oct 2018 14:54:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Oct 2018 14:54:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9A9CE11EF; Thu, 25 Oct 2018 14:54:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 25 Oct 2018 14:54:56 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/39] hbase-site git commit: Published site at 66469733ec9bffb236c143b858e5748182ad71b3. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5f45c993/devapidocs/src-html/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.html index 4e0832e..0c6debe 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.html @@ -30,78 +30,106 @@ 022import java.io.Closeable; 023import java.io.IOException; 024import org.apache.hadoop.conf.Configuration; -025import org.apache.hadoop.hbase.HConstants; -026import org.apache.hadoop.hbase.TableName; -027import org.apache.hadoop.hbase.client.Connection; -028import org.apache.hadoop.hbase.client.ConnectionFactory; -029import org.apache.hadoop.hbase.client.Table; -030import org.apache.hadoop.hbase.client.coprocessor.Batch; -031import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback; -032import org.apache.hadoop.hbase.ipc.ServerRpcController; -033import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos; -034import org.apache.yetus.audience.InterfaceAudience; -035import org.slf4j.Logger; -036import org.slf4j.LoggerFactory; -037 -038/** -039 * This client class is for invoking the refresh HFile function deployed on the -040 * Region Server side via the RefreshHFilesService. -041 */ -042@InterfaceAudience.Private -043public class RefreshHFilesClient implements Closeable { -044 private static final Logger LOG = LoggerFactory.getLogger(RefreshHFilesClient.class); -045 private final Connection connection; -046 -047 /** -048 * Constructor with Conf object -049 * -050 * @param cfg -051 */ -052 public RefreshHFilesClient(Configuration cfg) { -053 try { -054 this.connection = ConnectionFactory.createConnection(cfg); -055 } catch (IOException e) { -056 throw new RuntimeException(e); -057 } -058 } -059 -060 @Override -061 public void close() throws IOException { -062 if (this.connection != null && !this.connection.isClosed()) { -063 this.connection.close(); -064 } -065 } -066 -067 public void refreshHFiles(final TableName tableName) throws Throwable { -068 try (Table table = connection.getTable(tableName)) { -069 refreshHFiles(table); -070 } -071 } -072 -073 public void refreshHFiles(final Table table) throws Throwable { -074 final RefreshHFilesProtos.RefreshHFilesRequest request = RefreshHFilesProtos.RefreshHFilesRequest -075 .getDefaultInstance(); -076 table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, HConstants.EMPTY_START_ROW, -077 HConstants.EMPTY_END_ROW, -078 new Batch.Call<RefreshHFilesProtos.RefreshHFilesService, -079 RefreshHFilesProtos.RefreshHFilesResponse>() { -080 @Override -081 public RefreshHFilesProtos.RefreshHFilesResponse call( -082 RefreshHFilesProtos.RefreshHFilesService refreshHFilesService) -083 throws IOException { -084 ServerRpcController controller = new ServerRpcController(); -085 BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback = -086 new BlockingRpcCallback<>(); -087 refreshHFilesService.refreshHFiles(controller, request, rpcCallback); -088 if (controller.failedOnException()) { -089 throw controller.getFailedOn(); -090 } -091 return rpcCallback.get(); -092 } -093 }); -094 LOG.debug("Done refreshing HFiles"); -095 } -096} +025import org.apache.hadoop.conf.Configured; +026import org.apache.hadoop.hbase.HBaseConfiguration; +027import org.apache.hadoop.hbase.HConstants; +028import org.apache.hadoop.hbase.TableName; +029import org.apache.hadoop.hbase.client.Connection; +030import org.apache.hadoop.hbase.client.ConnectionFactory; +031import org.apache.hadoop.hbase.client.Table; +032import org.apache.hadoop.hbase.client.coprocessor.Batch; +033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback; +034import org.apache.hadoop.hbase.ipc.ServerRpcController; +035import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos; +036import org.apache.hadoop.util.Tool; +037import org.apache.hadoop.util.ToolRunner; +038import org.apache.yetus.audience.InterfaceAudience; +039import org.slf4j.Logger; +040import org.slf4j.LoggerFactory; +041 +042/** +043 * This client class is for invoking the refresh HFile function deployed on the +044 * Region Server side via the RefreshHFilesService. +045 */ +046@InterfaceAudience.Private +047public class RefreshHFilesClient extends Configured implements Tool, Closeable { +048 private static final Logger LOG = LoggerFactory.getLogger(RefreshHFilesClient.class); +049 private final Connection connection; +050 +051 /** +052 * Constructor with Conf object +053 * +054 * @param cfg +055 */ +056 public RefreshHFilesClient(Configuration cfg) { +057 try { +058 this.connection = ConnectionFactory.createConnection(cfg); +059 } catch (IOException e) { +060 throw new RuntimeException(e); +061 } +062 } +063 +064 @Override +065 public void close() throws IOException { +066 if (this.connection != null && !this.connection.isClosed()) { +067 this.connection.close(); +068 } +069 } +070 +071 public void refreshHFiles(final TableName tableName) throws Throwable { +072 try (Table table = connection.getTable(tableName)) { +073 refreshHFiles(table); +074 } +075 } +076 +077 public void refreshHFiles(final Table table) throws Throwable { +078 final RefreshHFilesProtos.RefreshHFilesRequest request = RefreshHFilesProtos.RefreshHFilesRequest +079 .getDefaultInstance(); +080 table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, HConstants.EMPTY_START_ROW, +081 HConstants.EMPTY_END_ROW, +082 new Batch.Call<RefreshHFilesProtos.RefreshHFilesService, +083 RefreshHFilesProtos.RefreshHFilesResponse>() { +084 @Override +085 public RefreshHFilesProtos.RefreshHFilesResponse call( +086 RefreshHFilesProtos.RefreshHFilesService refreshHFilesService) +087 throws IOException { +088 ServerRpcController controller = new ServerRpcController(); +089 BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback = +090 new BlockingRpcCallback<>(); +091 refreshHFilesService.refreshHFiles(controller, request, rpcCallback); +092 if (controller.failedOnException()) { +093 throw controller.getFailedOn(); +094 } +095 return rpcCallback.get(); +096 } +097 }); +098 LOG.debug("Done refreshing HFiles"); +099 } +100 +101 @Override +102 public int run(String[] args) throws Exception { +103 if (args.length != 1) { +104 String message = "When there are multiple HBase clusters are sharing a common root dir, " +105 + "especially for read replica cluster (see detail in HBASE-18477), please consider to " +106 + "use this tool manually sync the flushed HFiles from the source cluster."; +107 message += "\nUsage: " + this.getClass().getName() + " tableName"; +108 System.out.println(message); +109 return -1; +110 } +111 final TableName tableName = TableName.valueOf(args[0]); +112 try { +113 refreshHFiles(tableName); +114 } catch (Throwable t) { +115 LOG.error("Refresh HFiles from table " + tableName.getNameAsString() + " failed: ", t); +116 return -1; +117 } +118 return 0; +119 } +120 +121 public static void main(String[] args) throws Exception { +122 ToolRunner.run(new RefreshHFilesClient(HBaseConfiguration.create()), args); +123 } +124} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5f45c993/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html index 7e9572c..801dc55 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionChore.html @@ -139,7 +139,7 @@ 131 132 public static final String ASSIGN_MAX_ATTEMPTS = 133 "hbase.assignment.maximum.attempts"; -134 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = 10; +134 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 135 136 /** Region in Transition metrics threshold time */ 137 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5f45c993/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionStat.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionStat.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionStat.html index 7e9572c..801dc55 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionStat.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.RegionInTransitionStat.html @@ -139,7 +139,7 @@ 131 132 public static final String ASSIGN_MAX_ATTEMPTS = 133 "hbase.assignment.maximum.attempts"; -134 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = 10; +134 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 135 136 /** Region in Transition metrics threshold time */ 137 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5f45c993/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.html index 7e9572c..801dc55 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/assignment/AssignmentManager.html @@ -139,7 +139,7 @@ 131 132 public static final String ASSIGN_MAX_ATTEMPTS = 133 "hbase.assignment.maximum.attempts"; -134 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = 10; +134 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 135 136 /** Region in Transition metrics threshold time */ 137 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5f45c993/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.html index 387eb34..9ee37c0 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.html @@ -94,209 +94,219 @@ 086 } 087 088 @Override -089 public void addFront(Iterator<Procedure> procedureIterator) { -090 schedLock(); -091 try { -092 int count = 0; -093 while (procedureIterator.hasNext()) { -094 Procedure procedure = procedureIterator.next(); -095 if (LOG.isTraceEnabled()) { -096 LOG.trace("Wake " + procedure); -097 } -098 push(procedure, /* addFront= */ true, /* notify= */false); -099 count++; -100 } -101 wakePollIfNeeded(count); -102 } finally { -103 schedUnlock(); -104 } -105 } -106 -107 @Override -108 public void addBack(final Procedure procedure) { -109 push(procedure, false, true); +089 public void addFront(final Procedure procedure, boolean notify) { +090 push(procedure, true, notify); +091 } +092 +093 @Override +094 public void addFront(Iterator<Procedure> procedureIterator) { +095 schedLock(); +096 try { +097 int count = 0; +098 while (procedureIterator.hasNext()) { +099 Procedure procedure = procedureIterator.next(); +100 if (LOG.isTraceEnabled()) { +101 LOG.trace("Wake " + procedure); +102 } +103 push(procedure, /* addFront= */ true, /* notify= */false); +104 count++; +105 } +106 wakePollIfNeeded(count); +107 } finally { +108 schedUnlock(); +109 } 110 } 111 -112 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { -113 schedLock(); -114 try { -115 enqueue(procedure, addFront); -116 if (notify) { -117 schedWaitCond.signal(); -118 } -119 } finally { -120 schedUnlock(); -121 } -122 } -123 -124 // ========================================================================== -125 // Poll related -126 // ========================================================================== -127 /** -128 * Fetch one Procedure from the queue -129 * NOTE: this method is called with the sched lock held. -130 * @return the Procedure to execute, or null if nothing is available. -131 */ -132 protected abstract Procedure dequeue(); +112 @Override +113 public void addBack(final Procedure procedure) { +114 push(procedure, false, true); +115 } +116 +117 @Override +118 public void addBack(final Procedure procedure, boolean notify) { +119 push(procedure, false, notify); +120 } +121 +122 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { +123 schedLock(); +124 try { +125 enqueue(procedure, addFront); +126 if (notify) { +127 schedWaitCond.signal(); +128 } +129 } finally { +130 schedUnlock(); +131 } +132 } 133 -134 @Override -135 public Procedure poll() { -136 return poll(-1); -137 } -138 -139 @Override -140 public Procedure poll(long timeout, TimeUnit unit) { -141 return poll(unit.toNanos(timeout)); -142 } +134 // ========================================================================== +135 // Poll related +136 // ========================================================================== +137 /** +138 * Fetch one Procedure from the queue +139 * NOTE: this method is called with the sched lock held. +140 * @return the Procedure to execute, or null if nothing is available. +141 */ +142 protected abstract Procedure dequeue(); 143 -144 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") -145 public Procedure poll(final long nanos) { -146 schedLock(); -147 try { -148 if (!running) { -149 LOG.debug("the scheduler is not running"); -150 return null; -151 } -152 -153 if (!queueHasRunnables()) { -154 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller -155 // to take decisions after a wake/interruption. -156 if (nanos < 0) { -157 schedWaitCond.await(); -158 } else { -159 schedWaitCond.awaitNanos(nanos); -160 } -161 if (!queueHasRunnables()) { -162 nullPollCalls++; -163 return null; -164 } -165 } -166 final Procedure pollResult = dequeue(); -167 -168 pollCalls++; -169 nullPollCalls += (pollResult == null) ? 1 : 0; -170 return pollResult; -171 } catch (InterruptedException e) { -172 Thread.currentThread().interrupt(); -173 nullPollCalls++; -174 return null; -175 } finally { -176 schedUnlock(); -177 } -178 } -179 -180 // ========================================================================== -181 // Utils -182 // ========================================================================== -183 /** -184 * Returns the number of elements in this queue. -185 * NOTE: this method is called with the sched lock held. -186 * @return the number of elements in this queue. -187 */ -188 protected abstract int queueSize(); +144 @Override +145 public Procedure poll() { +146 return poll(-1); +147 } +148 +149 @Override +150 public Procedure poll(long timeout, TimeUnit unit) { +151 return poll(unit.toNanos(timeout)); +152 } +153 +154 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") +155 public Procedure poll(final long nanos) { +156 schedLock(); +157 try { +158 if (!running) { +159 LOG.debug("the scheduler is not running"); +160 return null; +161 } +162 +163 if (!queueHasRunnables()) { +164 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller +165 // to take decisions after a wake/interruption. +166 if (nanos < 0) { +167 schedWaitCond.await(); +168 } else { +169 schedWaitCond.awaitNanos(nanos); +170 } +171 if (!queueHasRunnables()) { +172 nullPollCalls++; +173 return null; +174 } +175 } +176 final Procedure pollResult = dequeue(); +177 +178 pollCalls++; +179 nullPollCalls += (pollResult == null) ? 1 : 0; +180 return pollResult; +181 } catch (InterruptedException e) { +182 Thread.currentThread().interrupt(); +183 nullPollCalls++; +184 return null; +185 } finally { +186 schedUnlock(); +187 } +188 } 189 -190 /** -191 * Returns true if there are procedures available to process. -192 * NOTE: this method is called with the sched lock held. -193 * @return true if there are procedures available to process, otherwise false. -194 */ -195 protected abstract boolean queueHasRunnables(); -196 -197 @Override -198 public int size() { -199 schedLock(); -200 try { -201 return queueSize(); -202 } finally { -203 schedUnlock(); -204 } -205 } +190 // ========================================================================== +191 // Utils +192 // ========================================================================== +193 /** +194 * Returns the number of elements in this queue. +195 * NOTE: this method is called with the sched lock held. +196 * @return the number of elements in this queue. +197 */ +198 protected abstract int queueSize(); +199 +200 /** +201 * Returns true if there are procedures available to process. +202 * NOTE: this method is called with the sched lock held. +203 * @return true if there are procedures available to process, otherwise false. +204 */ +205 protected abstract boolean queueHasRunnables(); 206 207 @Override -208 public boolean hasRunnables() { +208 public int size() { 209 schedLock(); 210 try { -211 return queueHasRunnables(); +211 return queueSize(); 212 } finally { 213 schedUnlock(); 214 } 215 } 216 -217 // ============================================================================ -218 // TODO: Metrics -219 // ============================================================================ -220 public long getPollCalls() { -221 return pollCalls; -222 } -223 -224 public long getNullPollCalls() { -225 return nullPollCalls; -226 } -227 -228 // ========================================================================== -229 // Procedure Events -230 // ========================================================================== -231 -232 /** -233 * Wake up all of the given events. -234 * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event. -235 * Access should remain package-private. Use ProcedureEvent class to wake/suspend events. -236 * @param events the list of events to wake -237 */ -238 void wakeEvents(ProcedureEvent[] events) { -239 schedLock(); -240 try { -241 for (ProcedureEvent event : events) { -242 if (event == null) { -243 continue; -244 } -245 event.wakeInternal(this); -246 } -247 } finally { -248 schedUnlock(); -249 } -250 } -251 -252 /** -253 * Wakes up given waiting procedures by pushing them back into scheduler queues. -254 * @return size of given {@code waitQueue}. -255 */ -256 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { -257 return lockAndQueue.wakeWaitingProcedures(this); -258 } -259 -260 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { -261 lockAndQueue.addLast(proc); -262 } -263 -264 protected void wakeProcedure(final Procedure procedure) { -265 LOG.trace("Wake {}", procedure); -266 push(procedure, /* addFront= */ true, /* notify= */false); -267 } -268 +217 @Override +218 public boolean hasRunnables() { +219 schedLock(); +220 try { +221 return queueHasRunnables(); +222 } finally { +223 schedUnlock(); +224 } +225 } +226 +227 // ============================================================================ +228 // TODO: Metrics +229 // ============================================================================ +230 public long getPollCalls() { +231 return pollCalls; +232 } +233 +234 public long getNullPollCalls() { +235 return nullPollCalls; +236 } +237 +238 // ========================================================================== +239 // Procedure Events +240 // ========================================================================== +241 +242 /** +243 * Wake up all of the given events. +244 * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event. +245 * Access should remain package-private. Use ProcedureEvent class to wake/suspend events. +246 * @param events the list of events to wake +247 */ +248 void wakeEvents(ProcedureEvent[] events) { +249 schedLock(); +250 try { +251 for (ProcedureEvent event : events) { +252 if (event == null) { +253 continue; +254 } +255 event.wakeInternal(this); +256 } +257 } finally { +258 schedUnlock(); +259 } +260 } +261 +262 /** +263 * Wakes up given waiting procedures by pushing them back into scheduler queues. +264 * @return size of given {@code waitQueue}. +265 */ +266 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { +267 return lockAndQueue.wakeWaitingProcedures(this); +268 } 269 -270 // ========================================================================== -271 // Internal helpers -272 // ========================================================================== -273 protected void schedLock() { -274 schedulerLock.lock(); -275 } -276 -277 protected void schedUnlock() { -278 schedulerLock.unlock(); -279 } -280 -281 protected void wakePollIfNeeded(final int waitingCount) { -282 if (waitingCount <= 0) { -283 return; -284 } -285 if (waitingCount == 1) { -286 schedWaitCond.signal(); -287 } else { -288 schedWaitCond.signalAll(); -289 } -290 } -291} +270 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { +271 lockAndQueue.addLast(proc); +272 } +273 +274 protected void wakeProcedure(final Procedure procedure) { +275 LOG.trace("Wake {}", procedure); +276 push(procedure, /* addFront= */ true, /* notify= */false); +277 } +278 +279 +280 // ========================================================================== +281 // Internal helpers +282 // ========================================================================== +283 protected void schedLock() { +284 schedulerLock.lock(); +285 } +286 +287 protected void schedUnlock() { +288 schedulerLock.unlock(); +289 } +290 +291 protected void wakePollIfNeeded(final int waitingCount) { +292 if (waitingCount <= 0) { +293 return; +294 } +295 if (waitingCount == 1) { +296 schedWaitCond.signal(); +297 } else { +298 schedWaitCond.signalAll(); +299 } +300 } +301} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5f45c993/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/LockAndQueue.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/LockAndQueue.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/LockAndQueue.html index 6df87bb..1c77e4e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/LockAndQueue.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/LockAndQueue.html @@ -81,123 +81,124 @@ 073 074 @Override 075 public boolean hasParentLock(Procedure<?> proc) { -076 // TODO: need to check all the ancestors -077 return proc.hasParent() && -078 (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); -079 } -080 -081 @Override -082 public boolean hasLockAccess(Procedure<?> proc) { -083 return isLockOwner(proc.getProcId()) || hasParentLock(proc); -084 } -085 -086 @Override -087 public Procedure<?> getExclusiveLockOwnerProcedure() { -088 return exclusiveLockOwnerProcedure; -089 } -090 -091 @Override -092 public long getExclusiveLockProcIdOwner() { -093 if (exclusiveLockOwnerProcedure == null) { -094 return Long.MIN_VALUE; -095 } else { -096 return exclusiveLockOwnerProcedure.getProcId(); -097 } -098 } -099 -100 @Override -101 public int getSharedLockCount() { -102 return sharedLock; -103 } -104 -105 // ====================================================================== -106 // try/release Shared/Exclusive lock -107 // ====================================================================== -108 -109 /** -110 * @return whether we have succesfully acquired the shared lock. -111 */ -112 public boolean trySharedLock(Procedure<?> proc) { -113 if (hasExclusiveLock() && !hasLockAccess(proc)) { -114 return false; -115 } -116 // If no one holds the xlock, then we are free to hold the sharedLock -117 // If the parent proc or we have already held the xlock, then we return true here as -118 // xlock is more powerful then shared lock. -119 sharedLock++; -120 return true; -121 } -122 -123 /** -124 * @return whether we should wake the procedures waiting on the lock here. -125 */ -126 public boolean releaseSharedLock() { -127 // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our -128 // parent have held the xlock. And since there is still an exclusive lock, we do not need to -129 // wake any procedures. -130 return --sharedLock == 0 && !hasExclusiveLock(); -131 } -132 -133 public boolean tryExclusiveLock(Procedure<?> proc) { -134 if (isLocked()) { -135 return hasLockAccess(proc); -136 } -137 exclusiveLockOwnerProcedure = proc; -138 return true; -139 } -140 -141 /** -142 * @return whether we should wake the procedures waiting on the lock here. -143 */ -144 public boolean releaseExclusiveLock(Procedure<?> proc) { -145 if (!isLockOwner(proc.getProcId())) { -146 // We are not the lock owner, it is probably inherited from the parent procedures. -147 return false; -148 } -149 exclusiveLockOwnerProcedure = null; -150 // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent -151 // proc or we have already held the xlock, and also allow releasing the locks in any order, so -152 // it could happen that the xlock is released but there are still some procs holding the shared -153 // lock. -154 // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which -155 // acquire the shared lock on the same lock. This is because we will schedule the sub proces -156 // before releasing the lock, so the sub procs could call acquire lock before we releasing the -157 // xlock. -158 return sharedLock == 0; -159 } -160 -161 public boolean isWaitingQueueEmpty() { -162 return queue.isEmpty(); -163 } -164 -165 public Procedure<?> removeFirst() { -166 return queue.removeFirst(); -167 } -168 -169 public void addLast(Procedure<?> proc) { -170 queue.addLast(proc); -171 } -172 -173 public int wakeWaitingProcedures(ProcedureScheduler scheduler) { -174 int count = queue.size(); -175 // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so -176 // that the procedure which was added first goes in the front for the scheduler queue. -177 scheduler.addFront(queue.descendingIterator()); -178 queue.clear(); -179 return count; -180 } -181 -182 @SuppressWarnings("rawtypes") -183 public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) { -184 return queue.stream().filter(predicate); -185 } -186 -187 @Override -188 public String toString() { -189 return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") + -190 ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size(); -191 } -192} +076 // TODO: need to check all the ancestors. need to passed in the procedures +077 // to find the ancestors. +078 return proc.hasParent() && +079 (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); +080 } +081 +082 @Override +083 public boolean hasLockAccess(Procedure<?> proc) { +084 return isLockOwner(proc.getProcId()) || hasParentLock(proc); +085 } +086 +087 @Override +088 public Procedure<?> getExclusiveLockOwnerProcedure() { +089 return exclusiveLockOwnerProcedure; +090 } +091 +092 @Override +093 public long getExclusiveLockProcIdOwner() { +094 if (exclusiveLockOwnerProcedure == null) { +095 return Long.MIN_VALUE; +096 } else { +097 return exclusiveLockOwnerProcedure.getProcId(); +098 } +099 } +100 +101 @Override +102 public int getSharedLockCount() { +103 return sharedLock; +104 } +105 +106 // ====================================================================== +107 // try/release Shared/Exclusive lock +108 // ====================================================================== +109 +110 /** +111 * @return whether we have succesfully acquired the shared lock. +112 */ +113 public boolean trySharedLock(Procedure<?> proc) { +114 if (hasExclusiveLock() && !hasLockAccess(proc)) { +115 return false; +116 } +117 // If no one holds the xlock, then we are free to hold the sharedLock +118 // If the parent proc or we have already held the xlock, then we return true here as +119 // xlock is more powerful then shared lock. +120 sharedLock++; +121 return true; +122 } +123 +124 /** +125 * @return whether we should wake the procedures waiting on the lock here. +126 */ +127 public boolean releaseSharedLock() { +128 // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our +129 // parent have held the xlock. And since there is still an exclusive lock, we do not need to +130 // wake any procedures. +131 return --sharedLock == 0 && !hasExclusiveLock(); +132 } +133 +134 public boolean tryExclusiveLock(Procedure<?> proc) { +135 if (isLocked()) { +136 return hasLockAccess(proc); +137 } +138 exclusiveLockOwnerProcedure = proc; +139 return true; +140 } +141 +142 /** +143 * @return whether we should wake the procedures waiting on the lock here. +144 */ +145 public boolean releaseExclusiveLock(Procedure<?> proc) { +146 if (!isLockOwner(proc.getProcId())) { +147 // We are not the lock owner, it is probably inherited from the parent procedures. +148 return false; +149 } +150 exclusiveLockOwnerProcedure = null; +151 // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent +152 // proc or we have already held the xlock, and also allow releasing the locks in any order, so +153 // it could happen that the xlock is released but there are still some procs holding the shared +154 // lock. +155 // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which +156 // acquire the shared lock on the same lock. This is because we will schedule the sub proces +157 // before releasing the lock, so the sub procs could call acquire lock before we releasing the +158 // xlock. +159 return sharedLock == 0; +160 } +161 +162 public boolean isWaitingQueueEmpty() { +163 return queue.isEmpty(); +164 } +165 +166 public Procedure<?> removeFirst() { +167 return queue.removeFirst(); +168 } +169 +170 public void addLast(Procedure<?> proc) { +171 queue.addLast(proc); +172 } +173 +174 public int wakeWaitingProcedures(ProcedureScheduler scheduler) { +175 int count = queue.size(); +176 // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so +177 // that the procedure which was added first goes in the front for the scheduler queue. +178 scheduler.addFront(queue.descendingIterator()); +179 queue.clear(); +180 return count; +181 } +182 +183 @SuppressWarnings("rawtypes") +184 public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) { +185 return queue.stream().filter(predicate); +186 } +187 +188 @Override +189 public String toString() { +190 return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") + +191 ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size(); +192 } +193}