Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 503FB200CD6 for ; Mon, 31 Jul 2017 17:10:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4EBA4165609; Mon, 31 Jul 2017 15:10:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 373041655F8 for ; Mon, 31 Jul 2017 17:10:25 +0200 (CEST) Received: (qmail 13369 invoked by uid 500); 31 Jul 2017 15:10:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 13126 invoked by uid 99); 31 Jul 2017 15:10:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Jul 2017 15:10:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1489F3333; Mon, 31 Jul 2017 15:10:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 31 Jul 2017 15:10:31 -0000 Message-Id: <6c89f9e66b7f4394987be5da2c019d02@git.apache.org> In-Reply-To: <4b7e73b962544b8ba274a615aa246ef7@git.apache.org> References: <4b7e73b962544b8ba274a615aa246ef7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Mon, 31 Jul 2017 15:10:28 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/1837997e/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannersCloseCallBack.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannersCloseCallBack.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannersCloseCallBack.html index a691301..9a8f45d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannersCloseCallBack.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannersCloseCallBack.html @@ -165,3337 +165,3323 @@ 157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; -160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest; -161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse; -162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; -163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; -164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; -165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; -166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; -167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; -168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; -170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; -171import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -172import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; -173import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -174import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; -175import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; -176import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; -177import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; -178import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -179import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; -180import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -181import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -182import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; -183import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; -184import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; -185import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; -186import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; -187import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; -188import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; -189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; -190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; -191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; -192import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; -193import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; -194import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; -195import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; -196import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; -197import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -198import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; -199import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; -200import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; -201import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; -202import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; -203import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; -204import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -205import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; -206import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; -207import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; -208import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; -209import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -210import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -211import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; -212import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; -213import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -214import org.apache.hadoop.hbase.util.Bytes; -215import org.apache.hadoop.hbase.util.DNS; -216import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -217import org.apache.hadoop.hbase.util.Pair; -218import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; -219import org.apache.hadoop.hbase.util.Strings; -220import org.apache.hadoop.hbase.wal.WAL; -221import org.apache.hadoop.hbase.wal.WALKey; -222import org.apache.hadoop.hbase.wal.WALSplitter; -223import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -224import org.apache.zookeeper.KeeperException; +160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; +164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; +165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; +166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; +168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; +169import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +170import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; +171import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +172import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; +173import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +174import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +175import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; +176import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +177import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; +178import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +179import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +180import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +181import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +182import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; +183import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +184import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; +185import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +186import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +187import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +188import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; +189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; +192import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; +193import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; +194import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +195import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +196import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; +197import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; +198import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; +199import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; +200import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; +201import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; +202import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +203import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; +204import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; +205import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; +206import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; +207import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +208import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; +209import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; +210import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +211import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; +212import org.apache.hadoop.hbase.util.Bytes; +213import org.apache.hadoop.hbase.util.DNS; +214import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +215import org.apache.hadoop.hbase.util.Pair; +216import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +217import org.apache.hadoop.hbase.util.Strings; +218import org.apache.hadoop.hbase.wal.WAL; +219import org.apache.hadoop.hbase.wal.WALKey; +220import org.apache.hadoop.hbase.wal.WALSplitter; +221import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +222import org.apache.zookeeper.KeeperException; +223 +224import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; 225 -226import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -227 -228/** -229 * Implements the regionserver RPC services. -230 */ -231@InterfaceAudience.Private -232@SuppressWarnings("deprecation") -233public class RSRpcServices implements HBaseRPCErrorHandler, -234 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, -235 ConfigurationObserver { -236 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class); -237 -238 /** RPC scheduler to use for the region server. */ -239 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = -240 "hbase.region.server.rpc.scheduler.factory.class"; -241 -242 /** -243 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This -244 * configuration exists to prevent the scenario where a time limit is specified to be so -245 * restrictive that the time limit is reached immediately (before any cells are scanned). -246 */ -247 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = -248 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; -249 /** -250 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} -251 */ -252 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; -253 -254 /** -255 * Number of rows in a batch operation above which a warning will be logged. -256 */ -257 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; -258 /** -259 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} -260 */ -261 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; -262 -263 // Request counter. (Includes requests that are not serviced by regions.) -264 final LongAdder requestCount = new LongAdder(); -265 -266 // Request counter for rpc get -267 final LongAdder rpcGetRequestCount = new LongAdder(); -268 -269 // Request counter for rpc scan -270 final LongAdder rpcScanRequestCount = new LongAdder(); -271 -272 // Request counter for rpc multi -273 final LongAdder rpcMultiRequestCount = new LongAdder(); -274 -275 // Request counter for rpc mutate -276 final LongAdder rpcMutateRequestCount = new LongAdder(); -277 -278 // Server to handle client requests. -279 final RpcServerInterface rpcServer; -280 final InetSocketAddress isa; -281 -282 private final HRegionServer regionServer; -283 private final long maxScannerResultSize; -284 -285 // The reference to the priority extraction function -286 private final PriorityFunction priority; -287 -288 private ScannerIdGenerator scannerIdGenerator; -289 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); -290 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients -291 // which may send next or close request to a region scanner which has already been exhausted. The -292 // entries will be removed automatically after scannerLeaseTimeoutPeriod. -293 private final Cache<String, String> closedScanners; -294 /** -295 * The lease timeout period for client scanners (milliseconds). -296 */ -297 private final int scannerLeaseTimeoutPeriod; -298 -299 /** -300 * The RPC timeout period (milliseconds) -301 */ -302 private final int rpcTimeout; -303 -304 /** -305 * The minimum allowable delta to use for the scan limit -306 */ -307 private final long minimumScanTimeLimitDelta; -308 -309 /** -310 * Row size threshold for multi requests above which a warning is logged -311 */ -312 private final int rowSizeWarnThreshold; +226/** +227 * Implements the regionserver RPC services. +228 */ +229@InterfaceAudience.Private +230@SuppressWarnings("deprecation") +231public class RSRpcServices implements HBaseRPCErrorHandler, +232 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, +233 ConfigurationObserver { +234 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class); +235 +236 /** RPC scheduler to use for the region server. */ +237 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = +238 "hbase.region.server.rpc.scheduler.factory.class"; +239 +240 /** +241 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This +242 * configuration exists to prevent the scenario where a time limit is specified to be so +243 * restrictive that the time limit is reached immediately (before any cells are scanned). +244 */ +245 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = +246 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; +247 /** +248 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} +249 */ +250 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; +251 +252 /** +253 * Number of rows in a batch operation above which a warning will be logged. +254 */ +255 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; +256 /** +257 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} +258 */ +259 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; +260 +261 // Request counter. (Includes requests that are not serviced by regions.) +262 final LongAdder requestCount = new LongAdder(); +263 +264 // Request counter for rpc get +265 final LongAdder rpcGetRequestCount = new LongAdder(); +266 +267 // Request counter for rpc scan +268 final LongAdder rpcScanRequestCount = new LongAdder(); +269 +270 // Request counter for rpc multi +271 final LongAdder rpcMultiRequestCount = new LongAdder(); +272 +273 // Request counter for rpc mutate +274 final LongAdder rpcMutateRequestCount = new LongAdder(); +275 +276 // Server to handle client requests. +277 final RpcServerInterface rpcServer; +278 final InetSocketAddress isa; +279 +280 private final HRegionServer regionServer; +281 private final long maxScannerResultSize; +282 +283 // The reference to the priority extraction function +284 private final PriorityFunction priority; +285 +286 private ScannerIdGenerator scannerIdGenerator; +287 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); +288 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients +289 // which may send next or close request to a region scanner which has already been exhausted. The +290 // entries will be removed automatically after scannerLeaseTimeoutPeriod. +291 private final Cache<String, String> closedScanners; +292 /** +293 * The lease timeout period for client scanners (milliseconds). +294 */ +295 private final int scannerLeaseTimeoutPeriod; +296 +297 /** +298 * The RPC timeout period (milliseconds) +299 */ +300 private final int rpcTimeout; +301 +302 /** +303 * The minimum allowable delta to use for the scan limit +304 */ +305 private final long minimumScanTimeLimitDelta; +306 +307 /** +308 * Row size threshold for multi requests above which a warning is logged +309 */ +310 private final int rowSizeWarnThreshold; +311 +312 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 313 -314 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); -315 -316 /** -317 * An Rpc callback for closing a RegionScanner. -318 */ -319 private static final class RegionScannerCloseCallBack implements RpcCallback { +314 /** +315 * An Rpc callback for closing a RegionScanner. +316 */ +317 private static final class RegionScannerCloseCallBack implements RpcCallback { +318 +319 private final RegionScanner scanner; 320 -321 private final RegionScanner scanner; -322 -323 public RegionScannerCloseCallBack(RegionScanner scanner) { -324 this.scanner = scanner; -325 } -326 -327 @Override -328 public void run() throws IOException { -329 this.scanner.close(); -330 } -331 } -332 -333 /** -334 * An Rpc callback for doing shipped() call on a RegionScanner. -335 */ -336 private class RegionScannerShippedCallBack implements RpcCallback { -337 -338 private final String scannerName; -339 private final RegionScanner scanner; -340 private final Lease lease; -341 -342 public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) { -343 this.scannerName = scannerName; -344 this.scanner = scanner; -345 this.lease = lease; -346 } -347 -348 @Override -349 public void run() throws IOException { -350 this.scanner.shipped(); -351 // We're done. On way out re-add the above removed lease. The lease was temp removed for this -352 // Rpc call and we are at end of the call now. Time to add it back. -353 if (scanners.containsKey(scannerName)) { -354 if (lease != null) regionServer.leases.addLease(lease); -355 } -356 } -357 } -358 -359 /** -360 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on -361 * completion of multiGets. -362 */ -363 static class RegionScannersCloseCallBack implements RpcCallback { -364 private final List<RegionScanner> scanners = new ArrayList<>(); -365 -366 public void addScanner(RegionScanner scanner) { -367 this.scanners.add(scanner); -368 } -369 -370 @Override -371 public void run() { -372 for (RegionScanner scanner : scanners) { -373 try { -374 scanner.close(); -375 } catch (IOException e) { -376 LOG.error("Exception while closing the scanner " + scanner, e); -377 } -378 } -379 } -380 } -381 -382 /** -383 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. -384 */ -385 private static final class RegionScannerHolder { -386 -387 private final AtomicLong nextCallSeq = new AtomicLong(0); -388 private final String scannerName; -389 private final RegionScanner s; -390 private final Region r; -391 private final RpcCallback closeCallBack; -392 private final RpcCallback shippedCallback; -393 private byte[] rowOfLastPartialResult; -394 private boolean needCursor; -395 -396 public RegionScannerHolder(String scannerName, RegionScanner s, Region r, -397 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { -398 this.scannerName = scannerName; -399 this.s = s; -400 this.r = r; -401 this.closeCallBack = closeCallBack; -402 this.shippedCallback = shippedCallback; -403 this.needCursor = needCursor; -404 } -405 -406 public long getNextCallSeq() { -407 return nextCallSeq.get(); -408 } -409 -410 public boolean incNextCallSeq(long currentSeq) { -411 // Use CAS to prevent multiple scan request running on the same scanner. -412 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); -413 } -414 } -415 -416 /** -417 * Instantiated as a scanner lease. If the lease times out, the scanner is -418 * closed -419 */ -420 private class ScannerListener implements LeaseListener { -421 private final String scannerName; -422 -423 ScannerListener(final String n) { -424 this.scannerName = n; -425 } -426 -427 @Override -428 public void leaseExpired() { -429 RegionScannerHolder rsh = scanners.remove(this.scannerName); -430 if (rsh != null) { -431 RegionScanner s = rsh.s; -432 LOG.info("Scanner " + this.scannerName + " lease expired on region " -433 + s.getRegionInfo().getRegionNameAsString()); -434 Region region = null; -435 try { -436 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); -437 if (region != null && region.getCoprocessorHost() != null) { -438 region.getCoprocessorHost().preScannerClose(s); -439 } -440 } catch (IOException e) { -441 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); -442 } finally { -443 try { -444 s.close(); -445 if (region != null && region.getCoprocessorHost() != null) { -446 region.getCoprocessorHost().postScannerClose(s); -447 } -448 } catch (IOException e) { -449 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); -450 } -451 } -452 } else { -453 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + -454 " scanner found, hence no chance to close that related scanner!"); -455 } -456 } -457 } -458 -459 private static ResultOrException getResultOrException(final ClientProtos.Result r, -460 final int index){ -461 return getResultOrException(ResponseConverter.buildActionResult(r), index); -462 } -463 -464 private static ResultOrException getResultOrException(final Exception e, final int index) { -465 return getResultOrException(ResponseConverter.buildActionResult(e), index); -466 } -467 -468 private static ResultOrException getResultOrException( -469 final ResultOrException.Builder builder, final int index) { -470 return builder.setIndex(index).build(); -471 } -472 -473 /** -474 * Starts the nonce operation for a mutation, if needed. -475 * @param mutation Mutation. -476 * @param nonceGroup Nonce group from the request. -477 * @returns whether to proceed this mutation. -478 */ -479 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) -480 throws IOException { -481 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; -482 boolean canProceed = false; -483 try { -484 canProceed = regionServer.nonceManager.startOperation( -485 nonceGroup, mutation.getNonce(), regionServer); -486 } catch (InterruptedException ex) { -487 throw new InterruptedIOException("Nonce start operation interrupted"); -488 } -489 return canProceed; -490 } -491 -492 /** -493 * Ends nonce operation for a mutation, if needed. -494 * @param mutation Mutation. -495 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. -496 * @param success Whether the operation for this nonce has succeeded. -497 */ -498 private void endNonceOperation(final MutationProto mutation, -499 long nonceGroup, boolean success) { -500 if (regionServer.nonceManager != null && mutation.hasNonce()) { -501 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); -502 } -503 } -504 -505 private boolean isClientCellBlockSupport(RpcCallContext context) { -506 return context != null && context.isClientCellBlockSupported(); -507 } -508 -509 private void addResult(final MutateResponse.Builder builder, final Result result, -510 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { -511 if (result == null) return; -512 if (clientCellBlockSupported) { -513 builder.setResult(ProtobufUtil.toResultNoData(result)); -514 rpcc.setCellScanner(result.cellScanner()); -515 } else { -516 ClientProtos.Result pbr = ProtobufUtil.toResult(result); -517 builder.setResult(pbr); -518 } -519 } -520 -521 private void addResults(ScanResponse.Builder builder, List<Result> results, -522 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { -523 builder.setStale(!isDefaultRegion); -524 if (results.isEmpty()) { -525 return; -526 } -527 if (clientCellBlockSupported) { -528 for (Result res : results) { -529 builder.addCellsPerResult(res.size()); -530 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); -531 } -532 controller.setCellScanner(CellUtil.createCellScanner(results)); -533 } else { -534 for (Result res : results) { -535 ClientProtos.Result pbr = ProtobufUtil.toResult(res); -536 builder.addResults(pbr); -537 } -538 } -539 } -540 -541 /** -542 * Mutate a list of rows atomically. -543 * -544 * @param region -545 * @param actions -546 * @param cellScanner if non-null, the mutation data -- the Cell content. -547 * @throws IOException -548 */ -549 private void mutateRows(final Region region, -550 final List<ClientProtos.Action> actions, -551 final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { -552 if (!region.getRegionInfo().isMetaTable()) { -553 regionServer.cacheFlusher.reclaimMemStoreMemory(); -554 } -555 RowMutations rm = null; -556 int i = 0; -557 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = -558 ClientProtos.ResultOrException.newBuilder(); -559 for (ClientProtos.Action action: actions) { -560 if (action.hasGet()) { -561 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + -562 action.getGet()); -563 } -564 MutationType type = action.getMutation().getMutateType(); -565 if (rm == null) { -566 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); -567 } -568 switch (type) { -569 case PUT: -570 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); -571 checkCellSizeLimit(region, put); -572 rm.add(put); -573 break; -574 case DELETE: -575 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); -576 break; -577 default: -578 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); -579 } -580 // To unify the response format with doNonAtomicRegionMutation and read through client's -581 // AsyncProcess we have to add an empty result instance per operation -582 resultOrExceptionOrBuilder.clear(); -583 resultOrExceptionOrBuilder.setIndex(i++); -584 builder.addResultOrException( -585 resultOrExceptionOrBuilder.build()); -586 } -587 region.mutateRow(rm); -588 } -589 -590 /** -591 * Mutate a list of rows atomically. -592 * -593 * @param region -594 * @param actions -595 * @param cellScanner if non-null, the mutation data -- the Cell content. -596 * @param row -597 * @param family -598 * @param qualifier -599 * @param compareOp -600 * @param comparator @throws IOException -601 */ -602 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, -603 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, -604 CompareOp compareOp, ByteArrayComparable comparator, RegionActionResult.Builder builder, -605 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { -606 if (!region.getRegionInfo().isMetaTable()) { -607 regionServer.cacheFlusher.reclaimMemStoreMemory(); -608 } -609 RowMutations rm = null; -610 int i = 0; -611 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = -612 ClientProtos.ResultOrException.newBuilder(); -613 for (ClientProtos.Action action: actions) { -614 if (action.hasGet()) { -615 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + -616 action.getGet()); -617 } -618 MutationType type = action.getMutation().getMutateType(); -619 if (rm == null) { -620 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); -621 } -622 switch (type) { -623 case PUT: -624 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); -625 checkCellSizeLimit(region, put); -626 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); -627 rm.add(put); -628 break; -629 case DELETE: -630 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); -631 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); -632 rm.add(del); -633 break; -634 default: -635 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); -636 } -637 // To unify the response format with doNonAtomicRegionMutation and read through client's -638 // AsyncProcess we have to add an empty result instance per operation -639 resultOrExceptionOrBuilder.clear(); -640 resultOrExceptionOrBuilder.setIndex(i++); -641 builder.addResultOrException( -642 resultOrExceptionOrBuilder.build()); -643 } -644 return region.checkAndRowMutate(row, family, qualifier, compareOp, -645 comparator, rm, Boolean.TRUE); -646 } -647 -648 /** -649 * Execute an append mutation. -650 * -651 * @param region -652 * @param m -653 * @param cellScanner -654 * @return result to return to client if default operation should be -655 * bypassed as indicated by RegionObserver, null otherwise -656 * @throws IOException -657 */ -658 private Result append(final Region region, final OperationQuota quota, -659 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, -660 ActivePolicyEnforcement spaceQuota) -661 throws IOException { -662 long before = EnvironmentEdgeManager.currentTime(); -663 Append append = ProtobufUtil.toAppend(mutation, cellScanner); -664 checkCellSizeLimit(region, append); -665 spaceQuota.getPolicyEnforcement(region).check(append); -666 quota.addMutation(append); -667 Result r = null; -668 if (region.getCoprocessorHost() != null) { -669 r = region.getCoprocessorHost().preAppend(append); -670 } -671 if (r == null) { -672 boolean canProceed = startNonceOperation(mutation, nonceGroup); -673 boolean success = false; -674 try { -675 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; -676 if (canProceed) { -677 r = region.append(append, nonceGroup, nonce); -678 } else { -679 // convert duplicate append to get -680 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, -681 nonceGroup, nonce); -682 r = Result.create(results); -683 } -684 success = true; -685 } finally { -686 if (canProceed) { -687 endNonceOperation(mutation, nonceGroup, success); -688 } -689 } -690 if (region.getCoprocessorHost() != null) { -691 r = region.getCoprocessorHost().postAppend(append, r); -692 } -693 } -694 if (regionServer.metricsRegionServer != null) { -695 regionServer.metricsRegionServer.updateAppend( -696 EnvironmentEdgeManager.currentTime() - before); -697 } -698 return r; -699 } -700 -701 /** -702 * Execute an increment mutation. -703 * -704 * @param region -705 * @param mutation -706 * @return the Result -707 * @throws IOException -708 */ -709 private Result increment(final Region region, final OperationQuota quota, -710 final MutationProto mutation, final CellScanner cells, long nonceGroup, -711 ActivePolicyEnforcement spaceQuota) -712 throws IOException { -713 long before = EnvironmentEdgeManager.currentTime(); -714 Increment increment = ProtobufUtil.toIncrement(mutation, cells); -715 checkCellSizeLimit(region, increment); -716 spaceQuota.getPolicyEnforcement(region).check(increment); -717 quota.addMutation(increment); -718 Result r = null; -719 if (region.getCoprocessorHost() != null) { -720 r = region.getCoprocessorHost().preIncrement(increment); -721 } -722 if (r == null) { -723 boolean canProceed = startNonceOperation(mutation, nonceGroup); -724 boolean success = false; -725 try { -726 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; -727 if (canProceed) { -728 r = region.increment(increment, nonceGroup, nonce); -729 } else { -730 // convert duplicate increment to get -731 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, -732 nonce); -733 r = Result.create(results); -734 } -735 success = true; -736 } finally { -737 if (canProceed) { -738 endNonceOperation(mutation, nonceGroup, success); -739 } -740 } -741 if (region.getCoprocessorHost() != null) { -742 r = region.getCoprocessorHost().postIncrement(increment, r); -743 } -744 } -745 if (regionServer.metricsRegionServer != null) { -746 regionServer.metricsRegionServer.updateIncrement( -747 EnvironmentEdgeManager.currentTime() - before); -748 } -749 return r; -750 } -751 -752 /** -753 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when -754 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. -755 * @param region -756 * @param actions -757 * @param cellScanner -758 * @param builder -759 * @param cellsToReturn Could be null. May be allocated in this method. This is what this -760 * method returns as a 'result'. -761 * @param closeCallBack the callback to be used with multigets -762 * @param context the current RpcCallContext -763 * @return Return the <code>cellScanner</code> passed -764 */ -765 private List<CellScannable> doNonAtomicRegionMutation(final Region region, -766 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, -767 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, -768 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, -769 ActivePolicyEnforcement spaceQuotaEnforcement) { -770 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do -771 // one at a time, we instead pass them in batch. Be aware that the corresponding -772 // ResultOrException instance that matches each Put or Delete is then added down in the -773 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched -774 List<ClientProtos.Action> mutations = null; -775 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); -776 IOException sizeIOE = null; -777 Object lastBlock = null; -778 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); -779 boolean hasResultOrException = false; -780 for (ClientProtos.Action action : actions.getActionList()) { -781 hasResultOrException = false; -782 resultOrExceptionBuilder.clear(); -783 try { -784 Result r = null; -785 -786 if (context != null -787 && context.isRetryImmediatelySupported() -788 && (context.getResponseCellSize() > maxQuotaResultSize -789 || context.getResponseBlockSize() + context.getResponseExceptionSize() -790 > maxQuotaResultSize)) { -791 -792 // We're storing the exception since the exception and reason string won't -793 // change after the response size limit is reached. -794 if (sizeIOE == null ) { -795 // We don't need the stack un-winding do don't throw the exception. -796 // Throwing will kill the JVM's JIT. -797 // -798 // Instead just create the exception and then store it. -799 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" -800 + " CellSize: " + context.getResponseCellSize() -801 + " BlockSize: " + context.getResponseBlockSize()); -802 -803 // Only report the exception once since there's only one request that -804 // caused the exception. Otherwise this number will dominate the exceptions count. -805 rpcServer.getMetrics().exception(sizeIOE); -806 } -807 -808 // Now that there's an exception is known to be created -809 // use it for the response. -810 // -811 // This will create a copy in the builder. -812 hasResultOrException = true; -813 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); -814 resultOrExceptionBuilder.setException(pair); -815 context.incrementResponseExceptionSize(pair.getSerializedSize()); -816 resultOrExceptionBuilder.setIndex(action.getIndex()); -817 builder.addResultOrException(resultOrExceptionBuilder.build()); -818 if (cellScanner != null) { -819 skipCellsForMutation(action, cellScanner); -820 } -821 continue; -822 } -823 if (action.hasGet()) { -824 long before = EnvironmentEdgeManager.currentTime(); -825 try { -826 Get get = ProtobufUtil.toGet(action.getGet()); -827 if (context != null) { -828 r = get(get, ((HRegion) region), closeCallBack, context); -829 } else { -830 r = region.get(get); -831 } -832 } finally { -833 if (regionServer.metricsRegionServer != null) { -834 regionServer.metricsRegionServer.updateGet( -835 EnvironmentEdgeManager.currentTime() - before);