Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A267F1899B for ; Fri, 11 Dec 2015 16:43:22 +0000 (UTC) Received: (qmail 51226 invoked by uid 500); 11 Dec 2015 16:39:25 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 51099 invoked by uid 500); 11 Dec 2015 16:39:24 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 46000 invoked by uid 99); 11 Dec 2015 16:31:26 -0000 Received: from Unknown (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 16:31:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43BECE0F7C; Fri, 11 Dec 2015 16:30:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Fri, 11 Dec 2015 16:30:59 -0000 Message-Id: In-Reply-To: <30edfa6e54394a378028ad3e3c4e06ca@git.apache.org> References: <30edfa6e54394a378028ad3e3c4e06ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/51] [partial] hbase-site git commit: Published site at 22b95aebcd7fc742412ab514520008fda5e327de. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/900a9477/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.html index c8d868f..d0040f6 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.html @@ -49,1752 +49,1792 @@ 041 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; -044import org.apache.hadoop.hbase.classification.InterfaceAudience; -045import org.apache.hadoop.conf.Configuration; -046import org.apache.hadoop.hbase.DoNotRetryIOException; -047import org.apache.hadoop.hbase.HConstants; -048import org.apache.hadoop.hbase.HRegionInfo; -049import org.apache.hadoop.hbase.HRegionLocation; -050import org.apache.hadoop.hbase.RegionLocations; -051import org.apache.hadoop.hbase.ServerName; -052import org.apache.hadoop.hbase.TableName; -053import org.apache.hadoop.hbase.client.backoff.ServerStatistics; -054import org.apache.hadoop.hbase.client.coprocessor.Batch; -055import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -056import org.apache.hadoop.hbase.util.Bytes; -057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -058import org.apache.htrace.Trace; -059 -060import com.google.common.annotations.VisibleForTesting; -061 -062/** -063 * This class allows a continuous flow of requests. It's written to be compatible with a -064 * synchronous caller such as HTable. -065 * <p> -066 * The caller sends a buffer of operation, by calling submit. This class extract from this list -067 * the operations it can send, i.e. the operations that are on region that are not considered -068 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to -069 * iterate on the list. If, and only if, the maximum number of current task is reached, the call -070 * to submit will block. Alternatively, the caller can call submitAll, in which case all the -071 * operations will be sent. Each call to submit returns a future-like object that can be used -072 * to track operation progress. -073 * </p> -074 * <p> -075 * The class manages internally the retries. -076 * </p> -077 * <p> -078 * The class can be constructed in regular mode, or "global error" mode. In global error mode, -079 * AP tracks errors across all calls (each "future" also has global view of all errors). That -080 * mode is necessary for backward compat with HTable behavior, where multiple submissions are -081 * made and the errors can propagate using any put/flush call, from previous calls. -082 * In "regular" mode, the errors are tracked inside the Future object that is returned. -083 * The results are always tracked inside the Future object and can be retrieved when the call -084 * has finished. Partial results can also be retrieved if some part of multi-request failed. -085 * </p> -086 * <p> -087 * This class is thread safe in regular mode; in global error code, submitting operations and -088 * retrieving errors from different threads may be not thread safe. -089 * Internally, the class is thread safe enough to manage simultaneously new submission and results -090 * arising from older operations. -091 * </p> -092 * <p> -093 * Internally, this class works with {@link Row}, this mean it could be theoretically used for -094 * gets as well. -095 * </p> -096 */ -097@InterfaceAudience.Private -098class AsyncProcess { -099 private static final Log LOG = LogFactory.getLog(AsyncProcess.class); -100 protected static final AtomicLong COUNTER = new AtomicLong(); -101 -102 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; -103 -104 /** -105 * Configure the number of failures after which the client will start logging. A few failures -106 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable -107 * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at -108 * this stage. -109 */ -110 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = -111 "hbase.client.start.log.errors.counter"; -112 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; -113 -114 /** -115 * The context used to wait for results from one submit call. -116 * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), -117 * then errors and failed operations in this object will reflect global errors. -118 * 2) If submit call is made with needResults false, results will not be saved. -119 * */ -120 public static interface AsyncRequestFuture { -121 public boolean hasError(); -122 public RetriesExhaustedWithDetailsException getErrors(); -123 public List<? extends Row> getFailedOperations(); -124 public Object[] getResults() throws InterruptedIOException; -125 /** Wait until all tasks are executed, successfully or not. */ -126 public void waitUntilDone() throws InterruptedIOException; -127 } -128 -129 /** Return value from a submit that didn't contain any requests. */ -130 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { -131 final Object[] result = new Object[0]; -132 @Override -133 public boolean hasError() { return false; } -134 @Override -135 public RetriesExhaustedWithDetailsException getErrors() { return null; } +044import org.apache.hadoop.hbase.RetryImmediatelyException; +045import org.apache.hadoop.hbase.classification.InterfaceAudience; +046import org.apache.hadoop.conf.Configuration; +047import org.apache.hadoop.hbase.DoNotRetryIOException; +048import org.apache.hadoop.hbase.HConstants; +049import org.apache.hadoop.hbase.HRegionInfo; +050import org.apache.hadoop.hbase.HRegionLocation; +051import org.apache.hadoop.hbase.RegionLocations; +052import org.apache.hadoop.hbase.ServerName; +053import org.apache.hadoop.hbase.TableName; +054import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +055import org.apache.hadoop.hbase.client.coprocessor.Batch; +056import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +057import org.apache.hadoop.hbase.util.Bytes; +058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +059import org.apache.htrace.Trace; +060 +061import com.google.common.annotations.VisibleForTesting; +062 +063/** +064 * This class allows a continuous flow of requests. It's written to be compatible with a +065 * synchronous caller such as HTable. +066 * <p> +067 * The caller sends a buffer of operation, by calling submit. This class extract from this list +068 * the operations it can send, i.e. the operations that are on region that are not considered +069 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to +070 * iterate on the list. If, and only if, the maximum number of current task is reached, the call +071 * to submit will block. Alternatively, the caller can call submitAll, in which case all the +072 * operations will be sent. Each call to submit returns a future-like object that can be used +073 * to track operation progress. +074 * </p> +075 * <p> +076 * The class manages internally the retries. +077 * </p> +078 * <p> +079 * The class can be constructed in regular mode, or "global error" mode. In global error mode, +080 * AP tracks errors across all calls (each "future" also has global view of all errors). That +081 * mode is necessary for backward compat with HTable behavior, where multiple submissions are +082 * made and the errors can propagate using any put/flush call, from previous calls. +083 * In "regular" mode, the errors are tracked inside the Future object that is returned. +084 * The results are always tracked inside the Future object and can be retrieved when the call +085 * has finished. Partial results can also be retrieved if some part of multi-request failed. +086 * </p> +087 * <p> +088 * This class is thread safe in regular mode; in global error code, submitting operations and +089 * retrieving errors from different threads may be not thread safe. +090 * Internally, the class is thread safe enough to manage simultaneously new submission and results +091 * arising from older operations. +092 * </p> +093 * <p> +094 * Internally, this class works with {@link Row}, this mean it could be theoretically used for +095 * gets as well. +096 * </p> +097 */ +098@InterfaceAudience.Private +099class AsyncProcess { +100 private static final Log LOG = LogFactory.getLog(AsyncProcess.class); +101 protected static final AtomicLong COUNTER = new AtomicLong(); +102 +103 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; +104 +105 /** +106 * Configure the number of failures after which the client will start logging. A few failures +107 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable +108 * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at +109 * this stage. +110 */ +111 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = +112 "hbase.client.start.log.errors.counter"; +113 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; +114 +115 /** +116 * The context used to wait for results from one submit call. +117 * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), +118 * then errors and failed operations in this object will reflect global errors. +119 * 2) If submit call is made with needResults false, results will not be saved. +120 * */ +121 public static interface AsyncRequestFuture { +122 public boolean hasError(); +123 public RetriesExhaustedWithDetailsException getErrors(); +124 public List<? extends Row> getFailedOperations(); +125 public Object[] getResults() throws InterruptedIOException; +126 /** Wait until all tasks are executed, successfully or not. */ +127 public void waitUntilDone() throws InterruptedIOException; +128 } +129 +130 /** +131 * Return value from a submit that didn't contain any requests. +132 */ +133 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { +134 final Object[] result = new Object[0]; +135 136 @Override -137 public List<? extends Row> getFailedOperations() { return null; } -138 @Override -139 public Object[] getResults() { return result; } -140 @Override -141 public void waitUntilDone() throws InterruptedIOException {} -142 }; -143 -144 /** Sync point for calls to multiple replicas for the same user request (Get). -145 * Created and put in the results array (we assume replica calls require results) when -146 * the replica calls are launched. See results for details of this process. -147 * POJO, all fields are public. To modify them, the object itself is locked. */ -148 private static class ReplicaResultState { -149 public ReplicaResultState(int callCount) { -150 this.callCount = callCount; -151 } -152 -153 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ -154 int callCount; -155 /** Errors for which it is not decided whether we will report them to user. If one of the -156 * calls succeeds, we will discard the errors that may have happened in the other calls. */ -157 BatchErrors replicaErrors = null; -158 -159 @Override -160 public String toString() { -161 return "[call count " + callCount + "; errors " + replicaErrors + "]"; -162 } -163 } -164 -165 -166 // TODO: many of the fields should be made private -167 protected final long id; -168 -169 protected final ClusterConnection connection; -170 protected final RpcRetryingCallerFactory rpcCallerFactory; -171 protected final RpcControllerFactory rpcFactory; -172 protected final BatchErrors globalErrors; -173 protected final ExecutorService pool; -174 -175 protected final AtomicLong tasksInProgress = new AtomicLong(0); -176 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = -177 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); -178 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = -179 new ConcurrentHashMap<ServerName, AtomicInteger>(); -180 -181 // Start configuration settings. -182 private final int startLogErrorsCnt; -183 -184 /** -185 * The number of tasks simultaneously executed on the cluster. -186 */ -187 protected final int maxTotalConcurrentTasks; -188 -189 /** -190 * The number of tasks we run in parallel on a single region. -191 * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start -192 * a set of operations on a region before the previous one is done. As well, this limits -193 * the pressure we put on the region server. -194 */ -195 protected final int maxConcurrentTasksPerRegion; -196 -197 /** -198 * The number of task simultaneously executed on a single region server. -199 */ -200 protected final int maxConcurrentTasksPerServer; -201 protected final long pause; -202 protected int numTries; -203 protected int serverTrackerTimeout; -204 protected int timeout; -205 protected long primaryCallTimeoutMicroseconds; -206 // End configuration settings. -207 -208 protected static class BatchErrors { -209 private final List<Throwable> throwables = new ArrayList<Throwable>(); -210 private final List<Row> actions = new ArrayList<Row>(); -211 private final List<String> addresses = new ArrayList<String>(); -212 -213 public synchronized void add(Throwable ex, Row row, ServerName serverName) { -214 if (row == null){ -215 throw new IllegalArgumentException("row cannot be null. location=" + serverName); -216 } -217 -218 throwables.add(ex); -219 actions.add(row); -220 addresses.add(serverName != null ? serverName.toString() : "null"); -221 } -222 -223 public boolean hasErrors() { -224 return !throwables.isEmpty(); -225 } -226 -227 private synchronized RetriesExhaustedWithDetailsException makeException() { -228 return new RetriesExhaustedWithDetailsException( -229 new ArrayList<Throwable>(throwables), -230 new ArrayList<Row>(actions), new ArrayList<String>(addresses)); -231 } -232 -233 public synchronized void clear() { -234 throwables.clear(); -235 actions.clear(); -236 addresses.clear(); -237 } -238 -239 public synchronized void merge(BatchErrors other) { -240 throwables.addAll(other.throwables); -241 actions.addAll(other.actions); -242 addresses.addAll(other.addresses); -243 } -244 } -245 -246 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, -247 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, -248 RpcControllerFactory rpcFactory) { -249 if (hc == null) { -250 throw new IllegalArgumentException("HConnection cannot be null."); -251 } -252 -253 this.connection = hc; -254 this.pool = pool; -255 this.globalErrors = useGlobalErrors ? new BatchErrors() : null; -256 -257 this.id = COUNTER.incrementAndGet(); -258 -259 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, -260 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); -261 // how many times we could try in total, one more than retry number -262 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, -263 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; -264 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, -265 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); -266 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); -267 -268 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, -269 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); -270 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -271 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); -272 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -273 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); -274 -275 this.startLogErrorsCnt = -276 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); -277 -278 if (this.maxTotalConcurrentTasks <= 0) { -279 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); -280 } -281 if (this.maxConcurrentTasksPerServer <= 0) { -282 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" + -283 maxConcurrentTasksPerServer); -284 } -285 if (this.maxConcurrentTasksPerRegion <= 0) { -286 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + -287 maxConcurrentTasksPerRegion); -288 } -289 -290 // Server tracker allows us to do faster, and yet useful (hopefully), retries. -291 // However, if we are too useful, we might fail very quickly due to retry count limit. -292 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum -293 // retry time if normal retries were used. Then we will retry until this time runs out. -294 // If we keep hitting one server, the net effect will be the incremental backoff, and -295 // essentially the same number of retries as planned. If we have to do faster retries, -296 // we will do more retries in aggregate, but the user will be none the wiser. -297 this.serverTrackerTimeout = 0; -298 for (int i = 0; i < this.numTries; ++i) { -299 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i); -300 } -301 -302 this.rpcCallerFactory = rpcCaller; -303 this.rpcFactory = rpcFactory; -304 } -305 -306 /** -307 * @return pool if non null, otherwise returns this.pool if non null, otherwise throws -308 * RuntimeException -309 */ -310 private ExecutorService getPool(ExecutorService pool) { -311 if (pool != null) return pool; -312 if (this.pool != null) return this.pool; -313 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); -314 } -315 -316 /** -317 * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. -318 * Uses default ExecutorService for this AP (must have been created with one). -319 */ -320 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, -321 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) -322 throws InterruptedIOException { -323 return submit(null, tableName, rows, atLeastOne, callback, needResults); -324 } -325 -326 /** -327 * Extract from the rows list what we can submit. The rows we can not submit are kept in the -328 * list. Does not send requests to replicas (not currently used for anything other -329 * than streaming puts anyway). -330 * -331 * @param pool ExecutorService to use. -332 * @param tableName The table for which this request is needed. -333 * @param callback Batch callback. Only called on success (94 behavior). -334 * @param needResults Whether results are needed, or can be discarded. -335 * @param rows - the submitted row. Modified by the method: we remove the rows we took. -336 * @param atLeastOne true if we should submit at least a subset. -337 */ -338 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, -339 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, -340 boolean needResults) throws InterruptedIOException { -341 if (rows.isEmpty()) { -342 return NO_REQS_RESULT; -343 } -344 -345 Map<ServerName, MultiAction<Row>> actionsByServer = -346 new HashMap<ServerName, MultiAction<Row>>(); -347 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); -348 -349 NonceGenerator ng = this.connection.getNonceGenerator(); -350 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. -351 -352 // Location errors that happen before we decide what requests to take. -353 List<Exception> locationErrors = null; -354 List<Integer> locationErrorRows = null; -355 do { -356 // Wait until there is at least one slot for a new task. -357 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); -358 -359 // Remember the previous decisions about regions or region servers we put in the -360 // final multi. -361 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); -362 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); -363 -364 int posInList = -1; -365 Iterator<? extends Row> it = rows.iterator(); -366 while (it.hasNext()) { -367 Row r = it.next(); -368 HRegionLocation loc; -369 try { -370 if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); -371 // Make sure we get 0-s replica. -372 RegionLocations locs = connection.locateRegion( -373 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); -374 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { -375 throw new IOException("#" + id + ", no location found, aborting submit for" -376 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); -377 } -378 loc = locs.getDefaultRegionLocation(); -379 } catch (IOException ex) { -380 locationErrors = new ArrayList<Exception>(); -381 locationErrorRows = new ArrayList<Integer>(); -382 LOG.error("Failed to get region location ", ex); -383 // This action failed before creating ars. Retain it, but do not add to submit list. -384 // We will then add it to ars in an already-failed state. -385 retainedActions.add(new Action<Row>(r, ++posInList)); -386 locationErrors.add(ex); -387 locationErrorRows.add(posInList); -388 it.remove(); -389 break; // Backward compat: we stop considering actions on location error. -390 } -391 -392 if (canTakeOperation(loc, regionIncluded, serverIncluded)) { -393 Action<Row> action = new Action<Row>(r, ++posInList); -394 setNonce(ng, r, action); -395 retainedActions.add(action); -396 // TODO: replica-get is not supported on this path -397 byte[] regionName = loc.getRegionInfo().getRegionName(); -398 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); -399 it.remove(); -400 } -401 } -402 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); -403 -404 if (retainedActions.isEmpty()) return NO_REQS_RESULT; -405 -406 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, -407 locationErrors, locationErrorRows, actionsByServer, pool); -408 } -409 -410 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName, -411 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, -412 Object[] results, boolean needResults, List<Exception> locationErrors, -413 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, -414 ExecutorService pool) { -415 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( -416 tableName, retainedActions, nonceGroup, pool, callback, results, needResults); -417 // Add location errors if any -418 if (locationErrors != null) { -419 for (int i = 0; i < locationErrors.size(); ++i) { -420 int originalIndex = locationErrorRows.get(i); -421 Row row = retainedActions.get(originalIndex).getAction(); -422 ars.manageError(originalIndex, row, -423 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); +137 public boolean hasError() { +138 return false; +139 } +140 +141 @Override +142 public RetriesExhaustedWithDetailsException getErrors() { +143 return null; +144 } +145 +146 @Override +147 public List<? extends Row> getFailedOperations() { +148 return null; +149 } +150 +151 @Override +152 public Object[] getResults() { +153 return result; +154 } +155 +156 @Override +157 public void waitUntilDone() throws InterruptedIOException { +158 } +159 }; +160 +161 /** Sync point for calls to multiple replicas for the same user request (Get). +162 * Created and put in the results array (we assume replica calls require results) when +163 * the replica calls are launched. See results for details of this process. +164 * POJO, all fields are public. To modify them, the object itself is locked. */ +165 private static class ReplicaResultState { +166 public ReplicaResultState(int callCount) { +167 this.callCount = callCount; +168 } +169 +170 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ +171 int callCount; +172 /** Errors for which it is not decided whether we will report them to user. If one of the +173 * calls succeeds, we will discard the errors that may have happened in the other calls. */ +174 BatchErrors replicaErrors = null; +175 +176 @Override +177 public String toString() { +178 return "[call count " + callCount + "; errors " + replicaErrors + "]"; +179 } +180 } +181 +182 +183 // TODO: many of the fields should be made private +184 protected final long id; +185 +186 protected final ClusterConnection connection; +187 protected final RpcRetryingCallerFactory rpcCallerFactory; +188 protected final RpcControllerFactory rpcFactory; +189 protected final BatchErrors globalErrors; +190 protected final ExecutorService pool; +191 +192 protected final AtomicLong tasksInProgress = new AtomicLong(0); +193 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = +194 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); +195 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = +196 new ConcurrentHashMap<ServerName, AtomicInteger>(); +197 +198 // Start configuration settings. +199 private final int startLogErrorsCnt; +200 +201 /** +202 * The number of tasks simultaneously executed on the cluster. +203 */ +204 protected final int maxTotalConcurrentTasks; +205 +206 /** +207 * The number of tasks we run in parallel on a single region. +208 * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start +209 * a set of operations on a region before the previous one is done. As well, this limits +210 * the pressure we put on the region server. +211 */ +212 protected final int maxConcurrentTasksPerRegion; +213 +214 /** +215 * The number of task simultaneously executed on a single region server. +216 */ +217 protected final int maxConcurrentTasksPerServer; +218 protected final long pause; +219 protected int numTries; +220 protected int serverTrackerTimeout; +221 protected int timeout; +222 protected long primaryCallTimeoutMicroseconds; +223 // End configuration settings. +224 +225 protected static class BatchErrors { +226 private final List<Throwable> throwables = new ArrayList<Throwable>(); +227 private final List<Row> actions = new ArrayList<Row>(); +228 private final List<String> addresses = new ArrayList<String>(); +229 +230 public synchronized void add(Throwable ex, Row row, ServerName serverName) { +231 if (row == null){ +232 throw new IllegalArgumentException("row cannot be null. location=" + serverName); +233 } +234 +235 throwables.add(ex); +236 actions.add(row); +237 addresses.add(serverName != null ? serverName.toString() : "null"); +238 } +239 +240 public boolean hasErrors() { +241 return !throwables.isEmpty(); +242 } +243 +244 private synchronized RetriesExhaustedWithDetailsException makeException() { +245 return new RetriesExhaustedWithDetailsException( +246 new ArrayList<Throwable>(throwables), +247 new ArrayList<Row>(actions), new ArrayList<String>(addresses)); +248 } +249 +250 public synchronized void clear() { +251 throwables.clear(); +252 actions.clear(); +253 addresses.clear(); +254 } +255 +256 public synchronized void merge(BatchErrors other) { +257 throwables.addAll(other.throwables); +258 actions.addAll(other.actions); +259 addresses.addAll(other.addresses); +260 } +261 } +262 +263 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, +264 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, +265 RpcControllerFactory rpcFactory) { +266 if (hc == null) { +267 throw new IllegalArgumentException("HConnection cannot be null."); +268 } +269 +270 this.connection = hc; +271 this.pool = pool; +272 this.globalErrors = useGlobalErrors ? new BatchErrors() : null; +273 +274 this.id = COUNTER.incrementAndGet(); +275 +276 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, +277 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); +278 // how many times we could try in total, one more than retry number +279 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, +280 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; +281 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, +282 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); +283 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); +284 +285 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, +286 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); +287 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, +288 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); +289 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, +290 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); +291 +292 this.startLogErrorsCnt = +293 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); +294 +295 if (this.maxTotalConcurrentTasks <= 0) { +296 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); +297 } +298 if (this.maxConcurrentTasksPerServer <= 0) { +299 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" + +300 maxConcurrentTasksPerServer); +301 } +302 if (this.maxConcurrentTasksPerRegion <= 0) { +303 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + +304 maxConcurrentTasksPerRegion); +305 } +306 +307 // Server tracker allows us to do faster, and yet useful (hopefully), retries. +308 // However, if we are too useful, we might fail very quickly due to retry count limit. +309 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum +310 // retry time if normal retries were used. Then we will retry until this time runs out. +311 // If we keep hitting one server, the net effect will be the incremental backoff, and +312 // essentially the same number of retries as planned. If we have to do faster retries, +313 // we will do more retries in aggregate, but the user will be none the wiser. +314 this.serverTrackerTimeout = 0; +315 for (int i = 0; i < this.numTries; ++i) { +316 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i); +317 } +318 +319 this.rpcCallerFactory = rpcCaller; +320 this.rpcFactory = rpcFactory; +321 } +322 +323 /** +324 * @return pool if non null, otherwise returns this.pool if non null, otherwise throws +325 * RuntimeException +326 */ +327 private ExecutorService getPool(ExecutorService pool) { +328 if (pool != null) { +329 return pool; +330 } +331 if (this.pool != null) { +332 return this.pool; +333 } +334 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); +335 } +336 +337 /** +338 * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. +339 * Uses default ExecutorService for this AP (must have been created with one). +340 */ +341 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, +342 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) +343 throws InterruptedIOException { +344 return submit(null, tableName, rows, atLeastOne, callback, needResults); +345 } +346 +347 /** +348 * Extract from the rows list what we can submit. The rows we can not submit are kept in the +349 * list. Does not send requests to replicas (not currently used for anything other +350 * than streaming puts anyway). +351 * +352 * @param pool ExecutorService to use. +353 * @param tableName The table for which this request is needed. +354 * @param callback Batch callback. Only called on success (94 behavior). +355 * @param needResults Whether results are needed, or can be discarded. +356 * @param rows - the submitted row. Modified by the method: we remove the rows we took. +357 * @param atLeastOne true if we should submit at least a subset. +358 */ +359 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, +360 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, +361 boolean needResults) throws InterruptedIOException { +362 if (rows.isEmpty()) { +363 return NO_REQS_RESULT; +364 } +365 +366 Map<ServerName, MultiAction<Row>> actionsByServer = +367 new HashMap<ServerName, MultiAction<Row>>(); +368 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); +369 +370 NonceGenerator ng = this.connection.getNonceGenerator(); +371 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. +372 +373 // Location errors that happen before we decide what requests to take. +374 List<Exception> locationErrors = null; +375 List<Integer> locationErrorRows = null; +376 do { +377 // Wait until there is at least one slot for a new task. +378 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); +379 +380 // Remember the previous decisions about regions or region servers we put in the +381 // final multi. +382 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); +383 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); +384 +385 int posInList = -1; +386 Iterator<? extends Row> it = rows.iterator(); +387 while (it.hasNext()) { +388 Row r = it.next(); +389 HRegionLocation loc; +390 try { +391 if (r == null) { +392 throw new IllegalArgumentException("#" + id + ", row cannot be null"); +393 } +394 // Make sure we get 0-s replica. +395 RegionLocations locs = connection.locateRegion( +396 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); +397 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { +398 throw new IOException("#" + id + ", no location found, aborting submit for" +399 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); +400 } +401 loc = locs.getDefaultRegionLocation(); +402 } catch (IOException ex) { +403 locationErrors = new ArrayList<Exception>(); +404 locationErrorRows = new ArrayList<Integer>(); +405 LOG.error("Failed to get region location ", ex); +406 // This action failed before creating ars. Retain it, but do not add to submit list. +407 // We will then add it to ars in an already-failed state. +408 retainedActions.add(new Action<Row>(r, ++posInList)); +409 locationErrors.add(ex); +410 locationErrorRows.add(posInList); +411 it.remove(); +412 break; // Backward compat: we stop considering actions on location error. +413 } +414 +415 if (canTakeOperation(loc, regionIncluded, serverIncluded)) { +416 Action<Row> action = new Action<Row>(r, ++posInList); +417 setNonce(ng, r, action); +418 retainedActions.add(action); +419 // TODO: replica-get is not supported on this path +420 byte[] regionName = loc.getRegionInfo().getRegionName(); +421 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); +422 it.remove(); +423 } 424 } -425 } -426 ars.sendMultiAction(actionsByServer, 1, null, false); -427 return ars; -428 } -429 -430 /** -431 * Helper that is used when grouping the actions per region server. -432 * -433 * @param loc - the destination. Must not be null. -434 * @param action - the action to add to the multiaction -435 * @param actionsByServer the multiaction per server -436 * @param nonceGroup Nonce group. -437 */ -438 private static void addAction(ServerName server, byte[] regionName, Action<Row> action, -439 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) { -440 MultiAction<Row> multiAction = actionsByServer.get(server); -441 if (multiAction == null) { -442 multiAction = new MultiAction<Row>(); -443 actionsByServer.put(server, multiAction); -444 } -445 if (action.hasNonce() && !multiAction.hasNonceGroup()) { -446 multiAction.setNonceGroup(nonceGroup); -447 } -448 -449 multiAction.add(regionName, action); -450 } -451 -452 /** -453 * Check if we should send new operations to this region or region server. -454 * We're taking into account the past decision; if we have already accepted -455 * operation on a given region, we accept all operations for this region. -456 * -457 * @param loc; the region and the server name we want to use. -458 * @return true if this region is considered as busy. -459 */ -460 protected boolean canTakeOperation(HRegionLocation loc, -461 Map<Long, Boolean> regionsIncluded, -462 Map<ServerName, Boolean> serversIncluded) { -463 long regionId = loc.getRegionInfo().getRegionId(); -464 Boolean regionPrevious = regionsIncluded.get(regionId); -465 -466 if (regionPrevious != null) { -467 // We already know what to do with this region. -468 return regionPrevious; -469 } -470 -471 Boolean serverPrevious = serversIncluded.get(loc.getServerName()); -472 if (Boolean.FALSE.equals(serverPrevious)) { -473 // It's a new region, on a region server that we have already excluded. -474 regionsIncluded.put(regionId, Boolean.FALSE); -475 return false; -476 } -477 -478 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); -479 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { -480 // Too many tasks on this region already. -481 regionsIncluded.put(regionId, Boolean.FALSE); -482 return false; -483 } -484 -485 if (serverPrevious == null) { -486 // The region is ok, but we need to decide for this region server. -487 int newServers = 0; // number of servers we're going to contact so far -488 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) { -489 if (kv.getValue()) { -490 newServers++; -491 } -492 } +425 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); +426 +427 if (retainedActions.isEmpty()) return NO_REQS_RESULT; +428 +429 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, +430 locationErrors, locationErrorRows, actionsByServer, pool); +431 } +432 +433 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName, +434 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, +435 Object[] results, boolean needResults, List<Exception> locationErrors, +436