Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 60E0317A4F for ; Thu, 28 Jan 2016 17:24:59 +0000 (UTC) Received: (qmail 97900 invoked by uid 500); 28 Jan 2016 17:24:51 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 97821 invoked by uid 500); 28 Jan 2016 17:24:51 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 96402 invoked by uid 99); 28 Jan 2016 17:24:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jan 2016 17:24:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 611AFE0BB2; Thu, 28 Jan 2016 17:24:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Thu, 28 Jan 2016 17:25:20 -0000 Message-Id: <911c779acaa64a50910de64ec29ce506@git.apache.org> In-Reply-To: <6af0db3621d842a4ae1ed2dedaf142a4@git.apache.org> References: <6af0db3621d842a4ae1ed2dedaf142a4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [32/51] [partial] hbase-site git commit: Published site at 138b754671d51d3f494adc250ab0cb9e085c858a. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/39cf5e9b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.ReplicaResultState.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.ReplicaResultState.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.ReplicaResultState.html index d0040f6..3180076 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.ReplicaResultState.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncProcess.ReplicaResultState.html @@ -61,1780 +61,1781 @@ 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 055import org.apache.hadoop.hbase.client.coprocessor.Batch; -056import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -057import org.apache.hadoop.hbase.util.Bytes; -058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -059import org.apache.htrace.Trace; -060 -061import com.google.common.annotations.VisibleForTesting; -062 -063/** -064 * This class allows a continuous flow of requests. It's written to be compatible with a -065 * synchronous caller such as HTable. -066 * <p> -067 * The caller sends a buffer of operation, by calling submit. This class extract from this list -068 * the operations it can send, i.e. the operations that are on region that are not considered -069 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to -070 * iterate on the list. If, and only if, the maximum number of current task is reached, the call -071 * to submit will block. Alternatively, the caller can call submitAll, in which case all the -072 * operations will be sent. Each call to submit returns a future-like object that can be used -073 * to track operation progress. -074 * </p> -075 * <p> -076 * The class manages internally the retries. -077 * </p> -078 * <p> -079 * The class can be constructed in regular mode, or "global error" mode. In global error mode, -080 * AP tracks errors across all calls (each "future" also has global view of all errors). That -081 * mode is necessary for backward compat with HTable behavior, where multiple submissions are -082 * made and the errors can propagate using any put/flush call, from previous calls. -083 * In "regular" mode, the errors are tracked inside the Future object that is returned. -084 * The results are always tracked inside the Future object and can be retrieved when the call -085 * has finished. Partial results can also be retrieved if some part of multi-request failed. -086 * </p> -087 * <p> -088 * This class is thread safe in regular mode; in global error code, submitting operations and -089 * retrieving errors from different threads may be not thread safe. -090 * Internally, the class is thread safe enough to manage simultaneously new submission and results -091 * arising from older operations. -092 * </p> -093 * <p> -094 * Internally, this class works with {@link Row}, this mean it could be theoretically used for -095 * gets as well. -096 * </p> -097 */ -098@InterfaceAudience.Private -099class AsyncProcess { -100 private static final Log LOG = LogFactory.getLog(AsyncProcess.class); -101 protected static final AtomicLong COUNTER = new AtomicLong(); -102 -103 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; -104 -105 /** -106 * Configure the number of failures after which the client will start logging. A few failures -107 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable -108 * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at -109 * this stage. -110 */ -111 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = -112 "hbase.client.start.log.errors.counter"; -113 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; -114 -115 /** -116 * The context used to wait for results from one submit call. -117 * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), -118 * then errors and failed operations in this object will reflect global errors. -119 * 2) If submit call is made with needResults false, results will not be saved. -120 * */ -121 public static interface AsyncRequestFuture { -122 public boolean hasError(); -123 public RetriesExhaustedWithDetailsException getErrors(); -124 public List<? extends Row> getFailedOperations(); -125 public Object[] getResults() throws InterruptedIOException; -126 /** Wait until all tasks are executed, successfully or not. */ -127 public void waitUntilDone() throws InterruptedIOException; -128 } -129 -130 /** -131 * Return value from a submit that didn't contain any requests. -132 */ -133 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { -134 final Object[] result = new Object[0]; -135 -136 @Override -137 public boolean hasError() { -138 return false; -139 } -140 -141 @Override -142 public RetriesExhaustedWithDetailsException getErrors() { -143 return null; -144 } -145 -146 @Override -147 public List<? extends Row> getFailedOperations() { -148 return null; -149 } -150 -151 @Override -152 public Object[] getResults() { -153 return result; -154 } -155 -156 @Override -157 public void waitUntilDone() throws InterruptedIOException { -158 } -159 }; -160 -161 /** Sync point for calls to multiple replicas for the same user request (Get). -162 * Created and put in the results array (we assume replica calls require results) when -163 * the replica calls are launched. See results for details of this process. -164 * POJO, all fields are public. To modify them, the object itself is locked. */ -165 private static class ReplicaResultState { -166 public ReplicaResultState(int callCount) { -167 this.callCount = callCount; -168 } -169 -170 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ -171 int callCount; -172 /** Errors for which it is not decided whether we will report them to user. If one of the -173 * calls succeeds, we will discard the errors that may have happened in the other calls. */ -174 BatchErrors replicaErrors = null; -175 -176 @Override -177 public String toString() { -178 return "[call count " + callCount + "; errors " + replicaErrors + "]"; -179 } -180 } -181 +056import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +057import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +058import org.apache.hadoop.hbase.util.Bytes; +059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +060import org.apache.htrace.Trace; +061 +062import com.google.common.annotations.VisibleForTesting; +063 +064/** +065 * This class allows a continuous flow of requests. It's written to be compatible with a +066 * synchronous caller such as HTable. +067 * <p> +068 * The caller sends a buffer of operation, by calling submit. This class extract from this list +069 * the operations it can send, i.e. the operations that are on region that are not considered +070 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to +071 * iterate on the list. If, and only if, the maximum number of current task is reached, the call +072 * to submit will block. Alternatively, the caller can call submitAll, in which case all the +073 * operations will be sent. Each call to submit returns a future-like object that can be used +074 * to track operation progress. +075 * </p> +076 * <p> +077 * The class manages internally the retries. +078 * </p> +079 * <p> +080 * The class can be constructed in regular mode, or "global error" mode. In global error mode, +081 * AP tracks errors across all calls (each "future" also has global view of all errors). That +082 * mode is necessary for backward compat with HTable behavior, where multiple submissions are +083 * made and the errors can propagate using any put/flush call, from previous calls. +084 * In "regular" mode, the errors are tracked inside the Future object that is returned. +085 * The results are always tracked inside the Future object and can be retrieved when the call +086 * has finished. Partial results can also be retrieved if some part of multi-request failed. +087 * </p> +088 * <p> +089 * This class is thread safe in regular mode; in global error code, submitting operations and +090 * retrieving errors from different threads may be not thread safe. +091 * Internally, the class is thread safe enough to manage simultaneously new submission and results +092 * arising from older operations. +093 * </p> +094 * <p> +095 * Internally, this class works with {@link Row}, this mean it could be theoretically used for +096 * gets as well. +097 * </p> +098 */ +099@InterfaceAudience.Private +100class AsyncProcess { +101 private static final Log LOG = LogFactory.getLog(AsyncProcess.class); +102 protected static final AtomicLong COUNTER = new AtomicLong(); +103 +104 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; +105 +106 /** +107 * Configure the number of failures after which the client will start logging. A few failures +108 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable +109 * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at +110 * this stage. +111 */ +112 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = +113 "hbase.client.start.log.errors.counter"; +114 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; +115 +116 /** +117 * The context used to wait for results from one submit call. +118 * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), +119 * then errors and failed operations in this object will reflect global errors. +120 * 2) If submit call is made with needResults false, results will not be saved. +121 * */ +122 public static interface AsyncRequestFuture { +123 public boolean hasError(); +124 public RetriesExhaustedWithDetailsException getErrors(); +125 public List<? extends Row> getFailedOperations(); +126 public Object[] getResults() throws InterruptedIOException; +127 /** Wait until all tasks are executed, successfully or not. */ +128 public void waitUntilDone() throws InterruptedIOException; +129 } +130 +131 /** +132 * Return value from a submit that didn't contain any requests. +133 */ +134 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { +135 final Object[] result = new Object[0]; +136 +137 @Override +138 public boolean hasError() { +139 return false; +140 } +141 +142 @Override +143 public RetriesExhaustedWithDetailsException getErrors() { +144 return null; +145 } +146 +147 @Override +148 public List<? extends Row> getFailedOperations() { +149 return null; +150 } +151 +152 @Override +153 public Object[] getResults() { +154 return result; +155 } +156 +157 @Override +158 public void waitUntilDone() throws InterruptedIOException { +159 } +160 }; +161 +162 /** Sync point for calls to multiple replicas for the same user request (Get). +163 * Created and put in the results array (we assume replica calls require results) when +164 * the replica calls are launched. See results for details of this process. +165 * POJO, all fields are public. To modify them, the object itself is locked. */ +166 private static class ReplicaResultState { +167 public ReplicaResultState(int callCount) { +168 this.callCount = callCount; +169 } +170 +171 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ +172 int callCount; +173 /** Errors for which it is not decided whether we will report them to user. If one of the +174 * calls succeeds, we will discard the errors that may have happened in the other calls. */ +175 BatchErrors replicaErrors = null; +176 +177 @Override +178 public String toString() { +179 return "[call count " + callCount + "; errors " + replicaErrors + "]"; +180 } +181 } 182 -183 // TODO: many of the fields should be made private -184 protected final long id; -185 -186 protected final ClusterConnection connection; -187 protected final RpcRetryingCallerFactory rpcCallerFactory; -188 protected final RpcControllerFactory rpcFactory; -189 protected final BatchErrors globalErrors; -190 protected final ExecutorService pool; -191 -192 protected final AtomicLong tasksInProgress = new AtomicLong(0); -193 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = -194 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); -195 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = -196 new ConcurrentHashMap<ServerName, AtomicInteger>(); -197 -198 // Start configuration settings. -199 private final int startLogErrorsCnt; -200 -201 /** -202 * The number of tasks simultaneously executed on the cluster. -203 */ -204 protected final int maxTotalConcurrentTasks; -205 -206 /** -207 * The number of tasks we run in parallel on a single region. -208 * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start -209 * a set of operations on a region before the previous one is done. As well, this limits -210 * the pressure we put on the region server. -211 */ -212 protected final int maxConcurrentTasksPerRegion; -213 -214 /** -215 * The number of task simultaneously executed on a single region server. -216 */ -217 protected final int maxConcurrentTasksPerServer; -218 protected final long pause; -219 protected int numTries; -220 protected int serverTrackerTimeout; -221 protected int timeout; -222 protected long primaryCallTimeoutMicroseconds; -223 // End configuration settings. -224 -225 protected static class BatchErrors { -226 private final List<Throwable> throwables = new ArrayList<Throwable>(); -227 private final List<Row> actions = new ArrayList<Row>(); -228 private final List<String> addresses = new ArrayList<String>(); -229 -230 public synchronized void add(Throwable ex, Row row, ServerName serverName) { -231 if (row == null){ -232 throw new IllegalArgumentException("row cannot be null. location=" + serverName); -233 } -234 -235 throwables.add(ex); -236 actions.add(row); -237 addresses.add(serverName != null ? serverName.toString() : "null"); -238 } -239 -240 public boolean hasErrors() { -241 return !throwables.isEmpty(); -242 } -243 -244 private synchronized RetriesExhaustedWithDetailsException makeException() { -245 return new RetriesExhaustedWithDetailsException( -246 new ArrayList<Throwable>(throwables), -247 new ArrayList<Row>(actions), new ArrayList<String>(addresses)); -248 } -249 -250 public synchronized void clear() { -251 throwables.clear(); -252 actions.clear(); -253 addresses.clear(); -254 } -255 -256 public synchronized void merge(BatchErrors other) { -257 throwables.addAll(other.throwables); -258 actions.addAll(other.actions); -259 addresses.addAll(other.addresses); -260 } -261 } -262 -263 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, -264 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, -265 RpcControllerFactory rpcFactory) { -266 if (hc == null) { -267 throw new IllegalArgumentException("HConnection cannot be null."); -268 } -269 -270 this.connection = hc; -271 this.pool = pool; -272 this.globalErrors = useGlobalErrors ? new BatchErrors() : null; -273 -274 this.id = COUNTER.incrementAndGet(); -275 -276 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, -277 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); -278 // how many times we could try in total, one more than retry number -279 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, -280 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; -281 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, -282 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); -283 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); -284 -285 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, -286 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); -287 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -288 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); -289 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -290 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); -291 -292 this.startLogErrorsCnt = -293 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); -294 -295 if (this.maxTotalConcurrentTasks <= 0) { -296 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); -297 } -298 if (this.maxConcurrentTasksPerServer <= 0) { -299 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" + -300 maxConcurrentTasksPerServer); -301 } -302 if (this.maxConcurrentTasksPerRegion <= 0) { -303 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + -304 maxConcurrentTasksPerRegion); -305 } -306 -307 // Server tracker allows us to do faster, and yet useful (hopefully), retries. -308 // However, if we are too useful, we might fail very quickly due to retry count limit. -309 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum -310 // retry time if normal retries were used. Then we will retry until this time runs out. -311 // If we keep hitting one server, the net effect will be the incremental backoff, and -312 // essentially the same number of retries as planned. If we have to do faster retries, -313 // we will do more retries in aggregate, but the user will be none the wiser. -314 this.serverTrackerTimeout = 0; -315 for (int i = 0; i < this.numTries; ++i) { -316 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i); -317 } -318 -319 this.rpcCallerFactory = rpcCaller; -320 this.rpcFactory = rpcFactory; -321 } -322 -323 /** -324 * @return pool if non null, otherwise returns this.pool if non null, otherwise throws -325 * RuntimeException -326 */ -327 private ExecutorService getPool(ExecutorService pool) { -328 if (pool != null) { -329 return pool; -330 } -331 if (this.pool != null) { -332 return this.pool; -333 } -334 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); -335 } -336 -337 /** -338 * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. -339 * Uses default ExecutorService for this AP (must have been created with one). -340 */ -341 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, -342 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) -343 throws InterruptedIOException { -344 return submit(null, tableName, rows, atLeastOne, callback, needResults); -345 } -346 -347 /** -348 * Extract from the rows list what we can submit. The rows we can not submit are kept in the -349 * list. Does not send requests to replicas (not currently used for anything other -350 * than streaming puts anyway). -351 * -352 * @param pool ExecutorService to use. -353 * @param tableName The table for which this request is needed. -354 * @param callback Batch callback. Only called on success (94 behavior). -355 * @param needResults Whether results are needed, or can be discarded. -356 * @param rows - the submitted row. Modified by the method: we remove the rows we took. -357 * @param atLeastOne true if we should submit at least a subset. -358 */ -359 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, -360 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, -361 boolean needResults) throws InterruptedIOException { -362 if (rows.isEmpty()) { -363 return NO_REQS_RESULT; -364 } -365 -366 Map<ServerName, MultiAction<Row>> actionsByServer = -367 new HashMap<ServerName, MultiAction<Row>>(); -368 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); -369 -370 NonceGenerator ng = this.connection.getNonceGenerator(); -371 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. -372 -373 // Location errors that happen before we decide what requests to take. -374 List<Exception> locationErrors = null; -375 List<Integer> locationErrorRows = null; -376 do { -377 // Wait until there is at least one slot for a new task. -378 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); -379 -380 // Remember the previous decisions about regions or region servers we put in the -381 // final multi. -382 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); -383 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); -384 -385 int posInList = -1; -386 Iterator<? extends Row> it = rows.iterator(); -387 while (it.hasNext()) { -388 Row r = it.next(); -389 HRegionLocation loc; -390 try { -391 if (r == null) { -392 throw new IllegalArgumentException("#" + id + ", row cannot be null"); -393 } -394 // Make sure we get 0-s replica. -395 RegionLocations locs = connection.locateRegion( -396 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); -397 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { -398 throw new IOException("#" + id + ", no location found, aborting submit for" -399 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); -400 } -401 loc = locs.getDefaultRegionLocation(); -402 } catch (IOException ex) { -403 locationErrors = new ArrayList<Exception>(); -404 locationErrorRows = new ArrayList<Integer>(); -405 LOG.error("Failed to get region location ", ex); -406 // This action failed before creating ars. Retain it, but do not add to submit list. -407 // We will then add it to ars in an already-failed state. -408 retainedActions.add(new Action<Row>(r, ++posInList)); -409 locationErrors.add(ex); -410 locationErrorRows.add(posInList); -411 it.remove(); -412 break; // Backward compat: we stop considering actions on location error. -413 } -414 -415 if (canTakeOperation(loc, regionIncluded, serverIncluded)) { -416 Action<Row> action = new Action<Row>(r, ++posInList); -417 setNonce(ng, r, action); -418 retainedActions.add(action); -419 // TODO: replica-get is not supported on this path -420 byte[] regionName = loc.getRegionInfo().getRegionName(); -421 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); -422 it.remove(); -423 } -424 } -425 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); -426 -427 if (retainedActions.isEmpty()) return NO_REQS_RESULT; -428 -429 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, -430 locationErrors, locationErrorRows, actionsByServer, pool); -431 } -432 -433 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName, -434 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, -435 Object[] results, boolean needResults, List<Exception> locationErrors, -436 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, -437 ExecutorService pool) { -438 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( -439 tableName, retainedActions, nonceGroup, pool, callback, results, needResults); -440 // Add location errors if any -441 if (locationErrors != null) { -442 for (int i = 0; i < locationErrors.size(); ++i) { -443 int originalIndex = locationErrorRows.get(i); -444 Row row = retainedActions.get(originalIndex).getAction(); -445 ars.manageError(originalIndex, row, -446 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); -447 } -448 } -449 ars.sendMultiAction(actionsByServer, 1, null, false); -450 return ars; -451 } -452 -453 /** -454 * Helper that is used when grouping the actions per region server. -455 * -456 * @param loc - the destination. Must not be null. -457 * @param action - the action to add to the multiaction -458 * @param actionsByServer the multiaction per server -459 * @param nonceGroup Nonce group. -460 */ -461 private static void addAction(ServerName server, byte[] regionName, Action<Row> action, -462 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) { -463 MultiAction<Row> multiAction = actionsByServer.get(server); -464 if (multiAction == null) { -465 multiAction = new MultiAction<Row>(); -466 actionsByServer.put(server, multiAction); -467 } -468 if (action.hasNonce() && !multiAction.hasNonceGroup()) { -469 multiAction.setNonceGroup(nonceGroup); -470 } -471 -472 multiAction.add(regionName, action); -473 } -474 -475 /** -476 * Check if we should send new operations to this region or region server. -477 * We're taking into account the past decision; if we have already accepted -478 * operation on a given region, we accept all operations for this region. -479 * -480 * @param loc; the region and the server name we want to use. -481 * @return true if this region is considered as busy. -482 */ -483 protected boolean canTakeOperation(HRegionLocation loc, -484 Map<Long, Boolean> regionsIncluded, -485 Map<ServerName, Boolean> serversIncluded) { -486 long regionId = loc.getRegionInfo().getRegionId(); -487 Boolean regionPrevious = regionsIncluded.get(regionId); -488 -489 if (regionPrevious != null) { -490 // We already know what to do with this region. -491 return regionPrevious; -492 } -493 -494 Boolean serverPrevious = serversIncluded.get(loc.getServerName()); -495 if (Boolean.FALSE.equals(serverPrevious)) { -496 // It's a new region, on a region server that we have already excluded. -497 regionsIncluded.put(regionId, Boolean.FALSE); -498 return false; -499 } -500 -501 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); -502 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { -503 // Too many tasks on this region already. -504 regionsIncluded.put(regionId, Boolean.FALSE); -505 return false; -506 } -507 -508 if (serverPrevious == null) { -509 // The region is ok, but we need to decide for this region server. -510 int newServers = 0; // number of servers we're going to contact so far -511 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) { -512 if (kv.getValue()) { -513 newServers++; -514 } -515 } -516 -517 // Do we have too many total tasks already? -518 boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; -519 -520 if (ok) { -521 // If the total is fine, is it ok for this individual server? -522 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); -523 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); -524 } -525 -526 if (!ok) { -527 regionsIncluded.put(regionId, Boolean.FALSE); -528 serversIncluded.put(loc.getServerName(), Boolean.FALSE); -529 return false; -530 } -531 -532 serversIncluded.put(loc.getServerName(), Boolean.TRUE); -533 } else { -534 assert serverPrevious.equals(Boolean.TRUE); -535 } -536 -537 regionsIncluded.put(regionId, Boolean.TRUE); -538 -539 return true; -540 } -541 -542 /** -543 * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. -544 * Uses default ExecutorService for this AP (must have been created with one). -545 */ -546 public <CResult> AsyncRequestFuture submitAll(TableName tableName, -547 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { -548 return submitAll(null, tableName, rows, callback, results); -549 } -550 -551 /** -552 * Submit immediately the list of rows, whatever the server status. Kept for backward -553 * compatibility: it allows to be used with the batch interface that return an array of objects. -554 * -555 * @param pool ExecutorService to use. -556 * @param tableName name of the table for which the submission is made. -557 * @param rows the list of rows. -558 * @param callback the callback. -559 * @param results Optional array to return the results thru; backward compat. -560 */ -561 public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, -562 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { -563 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); -564 -565 // The position will be used by the processBatch to match the object array returned. -566 int posInList = -1; -567 NonceGenerator ng = this.connection.getNonceGenerator(); -568 for (Row r : rows) { -569 posInList++; -570 if (r instanceof Put) { -571 Put put = (Put) r; -572 if (put.isEmpty()) { -573 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); -574 } -575 } -576 Action<Row> action = new Action<Row>(r, posInList); -577 setNonce(ng, r, action); -578 actions.add(action); -579 } -580 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( -581 tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); -582 ars.groupAndSendMultiAction(actions, 1); -583 return ars; -584 } -585 -586 private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) { -587 if (!(r instanceof Append) && !(r instanceof Increment)) return; -588 action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. -589 } -590 -591 /** -592 * The context, and return value, for a single submit/submitAll call. -593 * Note on how this class (one AP submit) works. Initially, all requests are split into groups -594 * by server; request is sent to each server in parallel; the RPC calls are not async so a -595 * thread per server is used. Every time some actions fail, regions/locations might have -596 * changed, so we re-group them by server and region again and send these groups in parallel -597 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after -598 * scheduling children. This is why lots of code doesn't require any synchronization. -599 */ -600 protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { -601 -602 /** -603 * Runnable (that can be submitted to thread pool) that waits for when it's time -604 * to issue replica calls, finds region replicas, groups the requests by replica and -605 * issues the calls (on separate threads, via sendMultiAction). -606 * This is done on a separate thread because we don't want to wait on user thread for -607 * our asynchronous call, and usually we have to wait before making replica calls. -608 */ -609 private final class ReplicaCallIssuingRunnable implements Runnable { -610 private final long startTime; -611 private final List<Action<Row>> initialActions; -612 -613 public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) { -614 this.initialActions = initialActions; -615 this.startTime = startTime; -616 } -617 -618 @Override -619 public void run() { -620 boolean done = false; -621 if (primaryCallTimeoutMicroseconds > 0) { -622 try { -623 done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds); -624 } catch (InterruptedException ex) { -625 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); -626 return; -627 } -628 } -629 if (done) return; // Done within primary timeout -630 Map<ServerName, MultiAction<Row>> actionsByServer = -631 new HashMap<ServerName, MultiAction<Row>>(); -632 List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>(); -633 if (replicaGetIndices == null) { -634 for (int i = 0; i < results.length; ++i) { -635 addReplicaActions(i, actionsByServer, unknownLocActions); -636 } -637 } else { -638 for (int replicaGetIndice : replicaGetIndices) { -639 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); -640 } -641 } -642 if (!actionsByServer.isEmpty()) { -643 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); -644 } -645 if (!unknownLocActions.isEmpty()) { -646 actionsByServer = new HashMap<ServerName, MultiAction<Row>>(); -647 for (Action<Row> action : unknownLocActions) { -648 addReplicaActionsAgain(action, actionsByServer); -649 } -650 // Some actions may have completely failed, they are handled inside addAgain. -651 if (!actionsByServer.isEmpty()) { -652 sendMultiAction(actionsByServer, 1, null, true); -653 } -654 } -655 } -656 -657 /** -658 * Add replica actions to action map by server. -659 * @param index Index of the original action. -660 * @param actionsByServer The map by server to add it to. -661 */ -662 private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer, -663 List<Action<Row>> unknownReplicaActions) { -664 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. -665 Action<Row> action = initialActions.get(index); -666 RegionLocations loc = findAllLocationsOrFail(action, true); -667 if (loc == null) return; -668 HRegionLocation[] locs = loc.getRegionLocations(); -669 if (locs.length == 1) { -670 LOG.warn("No replicas found for " + action.getAction()); -671 return; -672 } -673 synchronized (replicaResultLock) { -674 // Don't run replica calls if the original has finished. We could do it e.g. if -675 // original has already failed before first replica call (unlikely given retries), -676 // but that would require additional synchronization w.r.t. returning to caller. -677 if (results[index] != null) return; -678 // We set the number of calls here. After that any path must call setResult/setError. -679 // True even for replicas that are not found - if we refuse to send we MUST set error. -680 results[index] = new ReplicaResultState(locs.length); -681 } -682 for (int i = 1; i < locs.length; ++i) { -683 Action<Row> replicaAction = new Action<Row>(action, i); -684 if (locs[i] != null) { -685 addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), -686 replicaAction, actionsByServer, nonceGroup); -687 } else { -688 unknownReplicaActions.add(replicaAction); -689 } -690 } -691 } -692 -693 private void addReplicaActionsAgain( -694 Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) { -695 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { -696 throw new AssertionError("Cannot have default replica here"); -697 } -698 HRegionLocation loc = getReplicaLocationOrFail(action); -699 if (loc == null) return; -700 addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), -701 action, actionsByServer, nonceGroup); -702 } -703 } -704 -705 /** -706 * Runnable (that can be submitted to thread pool) that submits MultiAction to a -707 * single server. The server call is synchronous, therefore we do it on a thread pool. -708 */ -709 private final class SingleServerRequestRunnable implements Runnable { -710 private final MultiAction<Row> multiAction; -711 private final int numAttempt; -712 private final ServerName server; -713 private final Set<MultiServerCallable<Row>> callsInProgress; -714 -715 private SingleServerRequestRunnable( -716 MultiAction<Row> multiAction, int numAttempt, ServerName server, -717 Set<MultiServerCallable<Row>> callsInProgress) { -718 this.multiAction = multiAction; -719 this.numAttempt = numAttempt; -720 this.server = server; -721 this.callsInProgress = callsInProgress; -722 } -723 -724 @Override -725 public void run() { -726 MultiResponse res; -727 MultiServerCallable<Row> callable = null; -728 try { -729 callable = createCallable(server, tableName, multiAction); -730 try { -731 RpcRetryingCaller<MultiResponse> caller = createCaller(callable); -732 if (callsInProgress != null) callsInProgress.add(callable); -733 res = caller.callWithoutRetries(callable, timeout); -734 -735 if (res == null) { -736 // Cancelled -737 return; -738 } -739 -740 } catch (IOException e) { -741 // The service itself failed . It may be an error coming from the communication -742 // layer, but, as well, a functional error raised by the server. -743 receiveGlobalFailure(multiAction, server, numAttempt, e); -744 return; -745 } catch (Throwable t) { -746 // This should not happen. Let's log & retry anyway. -747 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + -748 " Retrying. Server is " + server + ", tableName=" + tableName, t); -749 receiveGlobalFailure(multiAction, server, numAttempt, t); -750 return; -751 } -752 -753 // Normal case: we received an answer from the server, and it's not an exception. -754 receiveMultiAction(multiAction, server, res, numAttempt); -755 } catch (Throwable t) { -756 // Something really bad happened. We are on the send thread that will now die. -757 LOG.error("Internal AsyncProcess #" + id + " error for " -758 + tableName + " processing for " + server, t); -759 throw new RuntimeException(t); -760 } finally { -761 decTaskCounters(multiAction.getRegions(), server); -762 if (callsInProgress != null && callable != null) { -763 callsInProgress.remove(callable); -764 } -765 } -766 } -767 } -768 -769 private final Batch.Callback<CResult> callback; -770 private final BatchErrors errors; -771 private final ConnectionImplementation.ServerErrorTracker errorsByServer; -772 private final ExecutorService pool; -773 private final Set<MultiServerCallable<Row>> callsInProgress; -774 +183 +184