Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7503519A75 for ; Mon, 7 Mar 2016 16:13:40 +0000 (UTC) Received: (qmail 61539 invoked by uid 500); 7 Mar 2016 16:13:38 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 61420 invoked by uid 500); 7 Mar 2016 16:13:38 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 60292 invoked by uid 99); 7 Mar 2016 16:13:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Mar 2016 16:13:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7DA6DFE61; Mon, 7 Mar 2016 16:13:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Mon, 07 Mar 2016 16:13:57 -0000 Message-Id: In-Reply-To: <0e0e397b78fe4c43933b2afc1cdc126f@git.apache.org> References: <0e0e397b78fe4c43933b2afc1cdc126f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/51] [partial] hbase-site git commit: Published site at 05161fcbfdd78f5684b9cb52c49a02be5ad14499. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/2211f347/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable.html index d813e71..1eb85f1 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable.html @@ -63,1786 +63,1816 @@ 055import org.apache.hadoop.hbase.client.coprocessor.Batch; 056import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 057import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -058import org.apache.hadoop.hbase.util.Bytes; -059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -060import org.apache.htrace.Trace; -061 -062import com.google.common.annotations.VisibleForTesting; -063 -064/** -065 * This class allows a continuous flow of requests. It's written to be compatible with a -066 * synchronous caller such as HTable. -067 * <p> -068 * The caller sends a buffer of operation, by calling submit. This class extract from this list -069 * the operations it can send, i.e. the operations that are on region that are not considered -070 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to -071 * iterate on the list. If, and only if, the maximum number of current task is reached, the call -072 * to submit will block. Alternatively, the caller can call submitAll, in which case all the -073 * operations will be sent. Each call to submit returns a future-like object that can be used -074 * to track operation progress. -075 * </p> -076 * <p> -077 * The class manages internally the retries. -078 * </p> -079 * <p> -080 * The class can be constructed in regular mode, or "global error" mode. In global error mode, -081 * AP tracks errors across all calls (each "future" also has global view of all errors). That -082 * mode is necessary for backward compat with HTable behavior, where multiple submissions are -083 * made and the errors can propagate using any put/flush call, from previous calls. -084 * In "regular" mode, the errors are tracked inside the Future object that is returned. -085 * The results are always tracked inside the Future object and can be retrieved when the call -086 * has finished. Partial results can also be retrieved if some part of multi-request failed. -087 * </p> -088 * <p> -089 * This class is thread safe in regular mode; in global error code, submitting operations and -090 * retrieving errors from different threads may be not thread safe. -091 * Internally, the class is thread safe enough to manage simultaneously new submission and results -092 * arising from older operations. -093 * </p> -094 * <p> -095 * Internally, this class works with {@link Row}, this mean it could be theoretically used for -096 * gets as well. -097 * </p> -098 */ -099@InterfaceAudience.Private -100class AsyncProcess { -101 private static final Log LOG = LogFactory.getLog(AsyncProcess.class); -102 protected static final AtomicLong COUNTER = new AtomicLong(); -103 -104 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; -105 -106 /** -107 * Configure the number of failures after which the client will start logging. A few failures -108 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable -109 * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at -110 * this stage. -111 */ -112 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = -113 "hbase.client.start.log.errors.counter"; -114 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; -115 -116 /** -117 * The context used to wait for results from one submit call. -118 * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), -119 * then errors and failed operations in this object will reflect global errors. -120 * 2) If submit call is made with needResults false, results will not be saved. -121 * */ -122 public static interface AsyncRequestFuture { -123 public boolean hasError(); -124 public RetriesExhaustedWithDetailsException getErrors(); -125 public List<? extends Row> getFailedOperations(); -126 public Object[] getResults() throws InterruptedIOException; -127 /** Wait until all tasks are executed, successfully or not. */ -128 public void waitUntilDone() throws InterruptedIOException; -129 } -130 -131 /** -132 * Return value from a submit that didn't contain any requests. -133 */ -134 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { -135 final Object[] result = new Object[0]; -136 -137 @Override -138 public boolean hasError() { -139 return false; -140 } -141 -142 @Override -143 public RetriesExhaustedWithDetailsException getErrors() { -144 return null; -145 } -146 -147 @Override -148 public List<? extends Row> getFailedOperations() { -149 return null; -150 } -151 -152 @Override -153 public Object[] getResults() { -154 return result; -155 } -156 -157 @Override -158 public void waitUntilDone() throws InterruptedIOException { -159 } -160 }; -161 -162 /** Sync point for calls to multiple replicas for the same user request (Get). -163 * Created and put in the results array (we assume replica calls require results) when -164 * the replica calls are launched. See results for details of this process. -165 * POJO, all fields are public. To modify them, the object itself is locked. */ -166 private static class ReplicaResultState { -167 public ReplicaResultState(int callCount) { -168 this.callCount = callCount; -169 } -170 -171 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ -172 int callCount; -173 /** Errors for which it is not decided whether we will report them to user. If one of the -174 * calls succeeds, we will discard the errors that may have happened in the other calls. */ -175 BatchErrors replicaErrors = null; -176 -177 @Override -178 public String toString() { -179 return "[call count " + callCount + "; errors " + replicaErrors + "]"; -180 } -181 } -182 +058import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +059import org.apache.hadoop.hbase.util.Bytes; +060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +061import org.apache.htrace.Trace; +062 +063import com.google.common.annotations.VisibleForTesting; +064 +065/** +066 * This class allows a continuous flow of requests. It's written to be compatible with a +067 * synchronous caller such as HTable. +068 * <p> +069 * The caller sends a buffer of operation, by calling submit. This class extract from this list +070 * the operations it can send, i.e. the operations that are on region that are not considered +071 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to +072 * iterate on the list. If, and only if, the maximum number of current task is reached, the call +073 * to submit will block. Alternatively, the caller can call submitAll, in which case all the +074 * operations will be sent. Each call to submit returns a future-like object that can be used +075 * to track operation progress. +076 * </p> +077 * <p> +078 * The class manages internally the retries. +079 * </p> +080 * <p> +081 * The class can be constructed in regular mode, or "global error" mode. In global error mode, +082 * AP tracks errors across all calls (each "future" also has global view of all errors). That +083 * mode is necessary for backward compat with HTable behavior, where multiple submissions are +084 * made and the errors can propagate using any put/flush call, from previous calls. +085 * In "regular" mode, the errors are tracked inside the Future object that is returned. +086 * The results are always tracked inside the Future object and can be retrieved when the call +087 * has finished. Partial results can also be retrieved if some part of multi-request failed. +088 * </p> +089 * <p> +090 * This class is thread safe in regular mode; in global error code, submitting operations and +091 * retrieving errors from different threads may be not thread safe. +092 * Internally, the class is thread safe enough to manage simultaneously new submission and results +093 * arising from older operations. +094 * </p> +095 * <p> +096 * Internally, this class works with {@link Row}, this mean it could be theoretically used for +097 * gets as well. +098 * </p> +099 */ +100@InterfaceAudience.Private +101class AsyncProcess { +102 private static final Log LOG = LogFactory.getLog(AsyncProcess.class); +103 protected static final AtomicLong COUNTER = new AtomicLong(); +104 +105 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; +106 +107 /** +108 * Configure the number of failures after which the client will start logging. A few failures +109 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable +110 * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at +111 * this stage. +112 */ +113 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = +114 "hbase.client.start.log.errors.counter"; +115 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; +116 +117 /** +118 * The context used to wait for results from one submit call. +119 * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), +120 * then errors and failed operations in this object will reflect global errors. +121 * 2) If submit call is made with needResults false, results will not be saved. +122 * */ +123 public static interface AsyncRequestFuture { +124 public boolean hasError(); +125 public RetriesExhaustedWithDetailsException getErrors(); +126 public List<? extends Row> getFailedOperations(); +127 public Object[] getResults() throws InterruptedIOException; +128 /** Wait until all tasks are executed, successfully or not. */ +129 public void waitUntilDone() throws InterruptedIOException; +130 } +131 +132 /** +133 * Return value from a submit that didn't contain any requests. +134 */ +135 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { +136 final Object[] result = new Object[0]; +137 +138 @Override +139 public boolean hasError() { +140 return false; +141 } +142 +143 @Override +144 public RetriesExhaustedWithDetailsException getErrors() { +145 return null; +146 } +147 +148 @Override +149 public List<? extends Row> getFailedOperations() { +150 return null; +151 } +152 +153 @Override +154 public Object[] getResults() { +155 return result; +156 } +157 +158 @Override +159 public void waitUntilDone() throws InterruptedIOException { +160 } +161 }; +162 +163 /** Sync point for calls to multiple replicas for the same user request (Get). +164 * Created and put in the results array (we assume replica calls require results) when +165 * the replica calls are launched. See results for details of this process. +166 * POJO, all fields are public. To modify them, the object itself is locked. */ +167 private static class ReplicaResultState { +168 public ReplicaResultState(int callCount) { +169 this.callCount = callCount; +170 } +171 +172 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ +173 int callCount; +174 /** Errors for which it is not decided whether we will report them to user. If one of the +175 * calls succeeds, we will discard the errors that may have happened in the other calls. */ +176 BatchErrors replicaErrors = null; +177 +178 @Override +179 public String toString() { +180 return "[call count " + callCount + "; errors " + replicaErrors + "]"; +181 } +182 } 183 -184 // TODO: many of the fields should be made private -185 protected final long id; -186 -187 protected final ClusterConnection connection; -188 protected final RpcRetryingCallerFactory rpcCallerFactory; -189 protected final RpcControllerFactory rpcFactory; -190 protected final BatchErrors globalErrors; -191 protected final ExecutorService pool; -192 -193 protected final AtomicLong tasksInProgress = new AtomicLong(0); -194 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = -195 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); -196 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = -197 new ConcurrentHashMap<ServerName, AtomicInteger>(); -198 -199 // Start configuration settings. -200 private final int startLogErrorsCnt; -201 -202 /** -203 * The number of tasks simultaneously executed on the cluster. -204 */ -205 protected final int maxTotalConcurrentTasks; -206 -207 /** -208 * The number of tasks we run in parallel on a single region. -209 * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start -210 * a set of operations on a region before the previous one is done. As well, this limits -211 * the pressure we put on the region server. -212 */ -213 protected final int maxConcurrentTasksPerRegion; -214 -215 /** -216 * The number of task simultaneously executed on a single region server. -217 */ -218 protected final int maxConcurrentTasksPerServer; -219 protected final long pause; -220 protected int numTries; -221 protected int serverTrackerTimeout; -222 protected int timeout; -223 protected long primaryCallTimeoutMicroseconds; -224 // End configuration settings. -225 -226 protected static class BatchErrors { -227 private final List<Throwable> throwables = new ArrayList<Throwable>(); -228 private final List<Row> actions = new ArrayList<Row>(); -229 private final List<String> addresses = new ArrayList<String>(); -230 -231 public synchronized void add(Throwable ex, Row row, ServerName serverName) { -232 if (row == null){ -233 throw new IllegalArgumentException("row cannot be null. location=" + serverName); -234 } -235 -236 throwables.add(ex); -237 actions.add(row); -238 addresses.add(serverName != null ? serverName.toString() : "null"); -239 } -240 -241 public boolean hasErrors() { -242 return !throwables.isEmpty(); -243 } -244 -245 private synchronized RetriesExhaustedWithDetailsException makeException() { -246 return new RetriesExhaustedWithDetailsException( -247 new ArrayList<Throwable>(throwables), -248 new ArrayList<Row>(actions), new ArrayList<String>(addresses)); -249 } -250 -251 public synchronized void clear() { -252 throwables.clear(); -253 actions.clear(); -254 addresses.clear(); -255 } -256 -257 public synchronized void merge(BatchErrors other) { -258 throwables.addAll(other.throwables); -259 actions.addAll(other.actions); -260 addresses.addAll(other.addresses); -261 } -262 } -263 -264 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, -265 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, -266 RpcControllerFactory rpcFactory) { -267 if (hc == null) { -268 throw new IllegalArgumentException("HConnection cannot be null."); -269 } -270 -271 this.connection = hc; -272 this.pool = pool; -273 this.globalErrors = useGlobalErrors ? new BatchErrors() : null; -274 -275 this.id = COUNTER.incrementAndGet(); -276 -277 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, -278 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); -279 // how many times we could try in total, one more than retry number -280 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, -281 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; -282 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, -283 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); -284 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); -285 -286 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, -287 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); -288 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -289 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); -290 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -291 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); -292 -293 this.startLogErrorsCnt = -294 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); -295 -296 if (this.maxTotalConcurrentTasks <= 0) { -297 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); -298 } -299 if (this.maxConcurrentTasksPerServer <= 0) { -300 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" + -301 maxConcurrentTasksPerServer); -302 } -303 if (this.maxConcurrentTasksPerRegion <= 0) { -304 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + -305 maxConcurrentTasksPerRegion); -306 } -307 -308 // Server tracker allows us to do faster, and yet useful (hopefully), retries. -309 // However, if we are too useful, we might fail very quickly due to retry count limit. -310 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum -311 // retry time if normal retries were used. Then we will retry until this time runs out. -312 // If we keep hitting one server, the net effect will be the incremental backoff, and -313 // essentially the same number of retries as planned. If we have to do faster retries, -314 // we will do more retries in aggregate, but the user will be none the wiser. -315 this.serverTrackerTimeout = 0; -316 for (int i = 0; i < this.numTries; ++i) { -317 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i); -318 } -319 -320 this.rpcCallerFactory = rpcCaller; -321 this.rpcFactory = rpcFactory; -322 } -323 -324 /** -325 * @return pool if non null, otherwise returns this.pool if non null, otherwise throws -326 * RuntimeException -327 */ -328 private ExecutorService getPool(ExecutorService pool) { -329 if (pool != null) { -330 return pool; -331 } -332 if (this.pool != null) { -333 return this.pool; -334 } -335 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); -336 } -337 -338 /** -339 * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. -340 * Uses default ExecutorService for this AP (must have been created with one). -341 */ -342 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, -343 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) -344 throws InterruptedIOException { -345 return submit(null, tableName, rows, atLeastOne, callback, needResults); -346 } -347 -348 /** -349 * Extract from the rows list what we can submit. The rows we can not submit are kept in the -350 * list. Does not send requests to replicas (not currently used for anything other -351 * than streaming puts anyway). -352 * -353 * @param pool ExecutorService to use. -354 * @param tableName The table for which this request is needed. -355 * @param callback Batch callback. Only called on success (94 behavior). -356 * @param needResults Whether results are needed, or can be discarded. -357 * @param rows - the submitted row. Modified by the method: we remove the rows we took. -358 * @param atLeastOne true if we should submit at least a subset. -359 */ -360 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, -361 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, -362 boolean needResults) throws InterruptedIOException { -363 if (rows.isEmpty()) { -364 return NO_REQS_RESULT; -365 } -366 -367 Map<ServerName, MultiAction<Row>> actionsByServer = -368 new HashMap<ServerName, MultiAction<Row>>(); -369 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); -370 -371 NonceGenerator ng = this.connection.getNonceGenerator(); -372 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. -373 -374 // Location errors that happen before we decide what requests to take. -375 List<Exception> locationErrors = null; -376 List<Integer> locationErrorRows = null; -377 do { -378 // Wait until there is at least one slot for a new task. -379 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); -380 -381 // Remember the previous decisions about regions or region servers we put in the -382 // final multi. -383 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); -384 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); -385 -386 int posInList = -1; -387 Iterator<? extends Row> it = rows.iterator(); -388 while (it.hasNext()) { -389 Row r = it.next(); -390 HRegionLocation loc; -391 try { -392 if (r == null) { -393 throw new IllegalArgumentException("#" + id + ", row cannot be null"); -394 } -395 // Make sure we get 0-s replica. -396 RegionLocations locs = connection.locateRegion( -397 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); -398 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { -399 throw new IOException("#" + id + ", no location found, aborting submit for" -400 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); -401 } -402 loc = locs.getDefaultRegionLocation(); -403 } catch (IOException ex) { -404 locationErrors = new ArrayList<Exception>(); -405 locationErrorRows = new ArrayList<Integer>(); -406 LOG.error("Failed to get region location ", ex); -407 // This action failed before creating ars. Retain it, but do not add to submit list. -408 // We will then add it to ars in an already-failed state. -409 retainedActions.add(new Action<Row>(r, ++posInList)); -410 locationErrors.add(ex); -411 locationErrorRows.add(posInList); -412 it.remove(); -413 break; // Backward compat: we stop considering actions on location error. -414 } -415 -416 if (canTakeOperation(loc, regionIncluded, serverIncluded)) { -417 Action<Row> action = new Action<Row>(r, ++posInList); -418 setNonce(ng, r, action); -419 retainedActions.add(action); -420 // TODO: replica-get is not supported on this path -421 byte[] regionName = loc.getRegionInfo().getRegionName(); -422 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); -423 it.remove(); -424 } -425 } -426 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); -427 -428 if (retainedActions.isEmpty()) return NO_REQS_RESULT; -429 -430 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, -431 locationErrors, locationErrorRows, actionsByServer, pool); -432 } -433 -434 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName, -435 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, -436 Object[] results, boolean needResults, List<Exception> locationErrors, -437 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, -438 ExecutorService pool) { -439 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( -440 tableName, retainedActions, nonceGroup, pool, callback, results, needResults); -441 // Add location errors if any -442 if (locationErrors != null) { -443 for (int i = 0; i < locationErrors.size(); ++i) { -444 int originalIndex = locationErrorRows.get(i); -445 Row row = retainedActions.get(originalIndex).getAction(); -446 ars.manageError(originalIndex, row, -447 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); -448 } -449 } -450 ars.sendMultiAction(actionsByServer, 1, null, false); -451 return ars; -452 } -453 -454 /** -455 * Helper that is used when grouping the actions per region server. -456 * -457 * @param loc - the destination. Must not be null. -458 * @param action - the action to add to the multiaction -459 * @param actionsByServer the multiaction per server -460 * @param nonceGroup Nonce group. -461 */ -462 private static void addAction(ServerName server, byte[] regionName, Action<Row> action, -463 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) { -464 MultiAction<Row> multiAction = actionsByServer.get(server); -465 if (multiAction == null) { -466 multiAction = new MultiAction<Row>(); -467 actionsByServer.put(server, multiAction); -468 } -469 if (action.hasNonce() && !multiAction.hasNonceGroup()) { -470 multiAction.setNonceGroup(nonceGroup); -471 } -472 -473 multiAction.add(regionName, action); -474 } -475 -476 /** -477 * Check if we should send new operations to this region or region server. -478 * We're taking into account the past decision; if we have already accepted -479 * operation on a given region, we accept all operations for this region. -480 * -481 * @param loc; the region and the server name we want to use. -482 * @return true if this region is considered as busy. -483 */ -484 protected boolean canTakeOperation(HRegionLocation loc, -485 Map<Long, Boolean> regionsIncluded, -486 Map<ServerName, Boolean> serversIncluded) { -487 long regionId = loc.getRegionInfo().getRegionId(); -488 Boolean regionPrevious = regionsIncluded.get(regionId); -489 -490 if (regionPrevious != null) { -491 // We already know what to do with this region. -492 return regionPrevious; -493 } -494 -495 Boolean serverPrevious = serversIncluded.get(loc.getServerName()); -496 if (Boolean.FALSE.equals(serverPrevious)) { -497 // It's a new region, on a region server that we have already excluded. -498 regionsIncluded.put(regionId, Boolean.FALSE); -499 return false; -500 } -501 -502 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); -503 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { -504 // Too many tasks on this region already. -505 regionsIncluded.put(regionId, Boolean.FALSE); -506 return false; -507 } -508 -509 if (serverPrevious == null) { -510 // The region is ok, but we need to decide for this region server. -511 int newServers = 0; // number of servers we're going to contact so far -512 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) { -513 if (kv.getValue()) { -514 newServers++; -515 } -516 } -517 -518 // Do we have too many total tasks already? -519 boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; -520 -521 if (ok) { -522 // If the total is fine, is it ok for this individual server? -523 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); -524 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); -525 } -526 -527 if (!ok) { -528 regionsIncluded.put(regionId, Boolean.FALSE); -529 serversIncluded.put(loc.getServerName(), Boolean.FALSE); -530 return false; -531 } -532 -533 serversIncluded.put(loc.getServerName(), Boolean.TRUE); -534 } else { -535 assert serverPrevious.equals(Boolean.TRUE); -536 } -537 -538 regionsIncluded.put(regionId, Boolean.TRUE); -539 -540 return true; -541 } -542 -543 /** -544 * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. -545 * Uses default ExecutorService for this AP (must have been created with one). -546 */ -547 public <CResult> AsyncRequestFuture submitAll(TableName tableName, -548 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { -549 return submitAll(null, tableName, rows, callback, results); -550 } -551 -552 /** -553 * Submit immediately the list of rows, whatever the server status. Kept for backward -554 * compatibility: it allows to be used with the batch interface that return an array of objects. -555 * -556 * @param pool ExecutorService to use. -557 * @param tableName name of the table for which the submission is made. -558 * @param rows the list of rows. -559 * @param callback the callback. -560 * @param results Optional array to return the results thru; backward compat. -561 */ -562 public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, -563 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { -564 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); -565 -566 // The position will be used by the processBatch to match the object array returned. -567 int posInList = -1; -568 NonceGenerator ng = this.connection.getNonceGenerator(); -569 for (Row r : rows) { -570 posInList++; -571 if (r instanceof Put) { -572 Put put = (Put) r; -573 if (put.isEmpty()) { -574 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); -575 } -576 } -577 Action<Row> action = new Action<Row>(r, posInList); -578 setNonce(ng, r, action); -579 actions.add(action); -580 } -581 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( -582 tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); -583 ars.groupAndSendMultiAction(actions, 1); -584 return ars; -585 } -586 -587 private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) { -588 if (!(r instanceof Append) && !(r instanceof Increment)) return; -589 action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. -590 } -591 -592 /** -593 * The context, and return value, for a single submit/submitAll call. -594 * Note on how this class (one AP submit) works. Initially, all requests are split into groups -595 * by server; request is sent to each server in parallel; the RPC calls are not async so a -596 * thread per server is used. Every time some actions fail, regions/locations might have -597 * changed, so we re-group them by server and region again and send these groups in parallel -598 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after -599 * scheduling children. This is why lots of code doesn't require any synchronization. -600 */ -601 protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { -602 -603 /** -604 * Runnable (that can be submitted to thread pool) that waits for when it's time -605 * to issue replica calls, finds region replicas, groups the requests by replica and -606 * issues the calls (on separate threads, via sendMultiAction). -607 * This is done on a separate thread because we don't want to wait on user thread for -608 * our asynchronous call, and usually we have to wait before making replica calls. -609 */ -610 private final class ReplicaCallIssuingRunnable implements Runnable { -611 private final long startTime; -612 private final List<Action<Row>> initialActions; -613 -614 public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) { -615 this.initialActions = initialActions; -616 this.startTime = startTime; -617 } -618 -619 @Override -620 public void run() { -621 boolean done = false; -622 if (primaryCallTimeoutMicroseconds > 0) { -623 try { -624 done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds); -625 } catch (InterruptedException ex) { -626 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); -627 return; -628 } -629 } -630 if (done) return; // Done within primary timeout -631 Map<ServerName, MultiAction<Row>> actionsByServer = -632 new HashMap<ServerName, MultiAction<Row>>(); -633 List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>(); -634 if (replicaGetIndices == null) { -635 for (int i = 0; i < results.length; ++i) { -636 addReplicaActions(i, actionsByServer, unknownLocActions); -637 } -638 } else { -639 for (int replicaGetIndice : replicaGetIndices) { -640 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); -641 } -642 } -643 if (!actionsByServer.isEmpty()) { -644 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); -645 } -646 if (!unknownLocActions.isEmpty()) { -647 actionsByServer = new HashMap<ServerName, MultiAction<Row>>(); -648 for (Action<Row> action : unknownLocActions) { -649 addReplicaActionsAgain(action, actionsByServer); -650 } -651 // Some actions may have completely failed, they are handled inside addAgain. -652 if (!actionsByServer.isEmpty()) { -653 sendMultiAction(actionsByServer, 1, null, true); -654 } -655 } -656 } -657 -658 /** -659 * Add replica actions to action map by server. -660 * @param index Index of the original action. -661 * @param actionsByServer The map by server to add it to. -662 */ -663 private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer, -664 List<Action<Row>> unknownReplicaActions) { -665 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. -666 Action<Row> action = initialActions.get(index); -667 RegionLocations loc = findAllLocationsOrFail(action, true); -668 if (loc == null) return; -669 HRegionLocation[] locs = loc.getRegionLocations(); -670 if (locs.length == 1) { -671 LOG.warn("No replicas found for " + action.getAction()); -672 return; -673 } -674 synchronized (replicaResultLock) { -675 // Don't run replica calls if the original has finished. We could do it e.g. if -676 // original has already failed before first replica call (unlikely given retries), -677 // but that would require additional synchronization w.r.t. returning to caller. -678 if (results[index] != null) return; -679 // We set the number of calls here. After that any path must call setResult/setError. -680 // True even for replicas that are not found - if we refuse to send we MUST set error. -681 results[index] = new ReplicaResultState(locs.length); -682 } -683 for (int i = 1; i < locs.length; ++i) { -684 Action<Row> replicaAction = new Action<Row>(action, i); -685 if (locs[i] != null) { -686 addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), -687 replicaAction, actionsByServer, nonceGroup); -688 } else { -689 unknownReplicaActions.add(replicaAction); -690 } -691 } -692 } -693 -694 private void addReplicaActionsAgain( -695 Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) { -696 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { -697 throw new AssertionError("Cannot have default replica here"); +184 +185 // TODO: many of the fields should be made private +186 protected final long id; +187 +188 protected final ClusterConnection connection; +189 protected final RpcRetryingCallerFactory rpcCallerFactory; +190 protected final RpcControllerFactory rpcFactory; +191 protected final BatchErrors globalErrors; +192 protected final ExecutorService pool; +193 +194 protected final AtomicLong tasksInProgress = new AtomicLong(0); +195 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = +196 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); +197 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = +198 new ConcurrentHashMap<ServerName, AtomicInteger>(); +199 +200 // Start configuration settings. +201 private final int startLogErrorsCnt; +202 +203 /** +204 * The number of tasks simultaneously executed on the cluster. +205 */ +206 protected final int maxTotalConcurrentTasks; +207 +208 /** +209 * The number of tasks we run in parallel on a single region. +210 * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start +211 * a set of operations on a region before the previous one is done. As well, this limits +212 * the pressure we put on the region server. +213 */ +214 protected final int maxConcurrentTasksPerRegion; +215 +216 /** +217 * The number of task simultaneously executed on a single region server. +218 */ +219 protected final int maxConcurrentTasksPerServer; +220 protected final long pause; +221 protected int numTries; +222 protected int serverTrackerTimeout; +223 protected int timeout; +224 protected long primaryCallTimeoutMicroseconds; +225 // End configuration settings. +226 +227 protected static class BatchErrors { +228 private final List<Throwable> throwables = new ArrayList<Throwable>(); +229 private final List<Row> actions = new ArrayList<Row>(); +230 private final List<String> addresses = new ArrayList<String>(); +231 +232 public synchronized void add(Throwable ex, Row row, ServerName serverName) { +233 if (row == null){ +234 throw new IllegalArgumentException("row cannot be null. location=" + serverName); +235 } +236 +237 throwables.add(ex); +238 actions.add(row); +239 addresses.add(serverName != null ? serverName.toString() : "null"); +240 } +241 +242 public boolean hasErrors() { +243 return !throwables.isEmpty(); +244 } +245 +246 private synchronized RetriesExhaustedWithDetailsException makeException() { +247 return new RetriesExhaustedWithDetailsException( +248 new ArrayList<Throwable>(throwables), +249 new ArrayList<Row>(actions), new ArrayList<String>(addresses)); +250 } +251 +252 public synchronized void clear() { +253 throwables.clear(); +254 actions.clear(); +255 addresses.clear(); +256 } +257 +258 public synchronized void merge(BatchErrors other) { +259 throwables.addAll(other.throwables); +260 actions.addAll(other.actions); +261 addresses.addAll(other.addresses); +262 } +263 } +264 +265 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, +266 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, +267 RpcControllerFactory rpcFa