Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5A6F62004A0 for ; Wed, 16 Aug 2017 17:06:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5871F168D4C; Wed, 16 Aug 2017 15:06:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 27CA6168D3B for ; Wed, 16 Aug 2017 17:06:27 +0200 (CEST) Received: (qmail 23677 invoked by uid 500); 16 Aug 2017 15:06:25 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23664 invoked by uid 99); 16 Aug 2017 15:06:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Aug 2017 15:06:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04FCBDFA3B; Wed, 16 Aug 2017 15:06:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 16 Aug 2017 15:06:31 -0000 Message-Id: <5dd80c0fd6dc49ce9e0ca07b8d9d70f9@git.apache.org> In-Reply-To: <4b5ca733aa3a45c0a4f61b854a33bf00@git.apache.org> References: <4b5ca733aa3a45c0a4f61b854a33bf00@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/51] [partial] hbase-site git commit: Published site at . archived-at: Wed, 16 Aug 2017 15:06:30 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/1ada5f22/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.html index 13bde46..9aa8673 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.html @@ -37,18 +37,18 @@ 029 030import java.io.IOException; 031import java.util.Arrays; -032import java.util.HashMap; -033import java.util.HashSet; -034import java.util.Iterator; +032import java.util.HashSet; +033import java.util.Iterator; +034import java.util.LinkedHashMap; 035import java.util.List; 036import java.util.Map; -037import java.util.Set; -038import java.util.concurrent.CompletableFuture; -039import java.util.concurrent.ConcurrentHashMap; -040import java.util.concurrent.ConcurrentMap; -041import java.util.concurrent.ConcurrentNavigableMap; -042import java.util.concurrent.ConcurrentSkipListMap; -043import java.util.concurrent.ThreadLocalRandom; +037import java.util.Optional; +038import java.util.Set; +039import java.util.concurrent.CompletableFuture; +040import java.util.concurrent.ConcurrentHashMap; +041import java.util.concurrent.ConcurrentMap; +042import java.util.concurrent.ConcurrentNavigableMap; +043import java.util.concurrent.ConcurrentSkipListMap; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; @@ -115,7 +115,7 @@ 107 public final Set<LocateRequest> pendingRequests = new HashSet<>(); 108 109 public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests = -110 new HashMap<>(); +110 new LinkedHashMap<>(); 111 112 public boolean hasQuota(int max) { 113 return pendingRequests.size() < max; @@ -128,353 +128,358 @@ 120 public void send(LocateRequest req) { 121 pendingRequests.add(req); 122 } -123 } -124 -125 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { -126 this.conn = conn; -127 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( -128 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); -129 } -130 -131 private TableCache getTableCache(TableName tableName) { -132 return computeIfAbsent(cache, tableName, TableCache::new); -133 } -134 -135 private void removeFromCache(HRegionLocation loc) { -136 TableCache tableCache = cache.get(loc.getRegionInfo().getTable()); -137 if (tableCache == null) { -138 return; -139 } -140 tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { -141 if (oldLoc.getSeqNum() > loc.getSeqNum() || -142 !oldLoc.getServerName().equals(loc.getServerName())) { -143 return oldLoc; -144 } -145 return null; -146 }); -147 } -148 -149 // return whether we add this loc to cache -150 private boolean addToCache(TableCache tableCache, HRegionLocation loc) { -151 if (LOG.isTraceEnabled()) { -152 LOG.trace("Try adding " + loc + " to cache"); -153 } -154 byte[] startKey = loc.getRegionInfo().getStartKey(); -155 HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc); -156 if (oldLoc == null) { -157 return true; -158 } -159 if (oldLoc.getSeqNum() > loc.getSeqNum() || -160 oldLoc.getServerName().equals(loc.getServerName())) { -161 if (LOG.isTraceEnabled()) { -162 LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + -163 " is newer than us or has the same server name"); +123 +124 public Optional<LocateRequest> getCandidate() { +125 return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); +126 } +127 +128 public void clearCompletedRequests(Optional<HRegionLocation> location) { +129 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = allRequests +130 .entrySet().iterator(); iter.hasNext();) { +131 Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next(); +132 if (tryComplete(entry.getKey(), entry.getValue(), location)) { +133 iter.remove(); +134 } +135 } +136 } +137 +138 private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future, +139 Optional<HRegionLocation> location) { +140 if (future.isDone()) { +141 return true; +142 } +143 if (!location.isPresent()) { +144 return false; +145 } +146 HRegionLocation loc = location.get(); +147 boolean completed; +148 if (req.locateType.equals(RegionLocateType.BEFORE)) { +149 // for locating the row before current row, the common case is to find the previous region in +150 // reverse scan, so we check the endKey first. In general, the condition should be startKey < +151 // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row +152 // && startKey < req.row). The two conditions are equal since startKey < endKey. +153 int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row); +154 completed = +155 c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0); +156 } else { +157 completed = loc.getRegionInfo().containsRow(req.row); +158 } +159 if (completed) { +160 future.complete(loc); +161 return true; +162 } else { +163 return false; 164 } -165 return false; -166 } -167 return loc == tableCache.cache.compute(startKey, (k, oldValue) -> { -168 if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { -169 return loc; -170 } -171 if (LOG.isTraceEnabled()) { -172 LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + -173 " is newer than us or has the same server name." + -174 " Maybe it is updated before we replace it"); -175 } -176 return oldValue; -177 }); -178 } -179 -180 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", -181 justification = "Called by lambda expression") -182 private void addToCache(HRegionLocation loc) { -183 addToCache(getTableCache(loc.getRegionInfo().getTable()), loc); -184 if (LOG.isTraceEnabled()) { -185 LOG.trace("Try adding " + loc + " to cache"); -186 } -187 } -188 -189 private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future, -190 HRegionLocation loc) { -191 if (future.isDone()) { -192 return true; -193 } -194 boolean completed; -195 if (req.locateType.equals(RegionLocateType.BEFORE)) { -196 // for locating the row before current row, the common case is to find the previous region in -197 // reverse scan, so we check the endKey first. In general, the condition should be startKey < -198 // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row -199 // && startKey < req.row). The two conditions are equal since startKey < endKey. -200 int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row); -201 completed = -202 c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0); -203 } else { -204 completed = loc.getRegionInfo().containsRow(req.row); -205 } -206 if (completed) { -207 future.complete(loc); -208 return true; -209 } else { -210 return false; -211 } -212 } -213 -214 private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, -215 Throwable error) { -216 if (error != null) { -217 if (LOG.isDebugEnabled()) { -218 LOG.debug("Failed to locate region in '" + tableName + "', row='" + -219 Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, -220 error); -221 } -222 } -223 LocateRequest toSend = null; -224 TableCache tableCache = getTableCache(tableName); -225 if (loc != null) { -226 if (!addToCache(tableCache, loc)) { -227 // someone is ahead of us. -228 synchronized (tableCache) { -229 tableCache.pendingRequests.remove(req); -230 } -231 return; -232 } -233 } -234 synchronized (tableCache) { -235 tableCache.pendingRequests.remove(req); -236 if (error instanceof DoNotRetryIOException) { -237 CompletableFuture<?> future = tableCache.allRequests.remove(req); -238 if (future != null) { -239 future.completeExceptionally(error); -240 } -241 } -242 if (loc != null) { -243 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = -244 tableCache.allRequests.entrySet().iterator(); iter.hasNext();) { -245 Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next(); -246 if (tryComplete(entry.getKey(), entry.getValue(), loc)) { -247 iter.remove(); -248 } -249 } -250 } -251 if (!tableCache.allRequests.isEmpty() && -252 tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) { -253 LocateRequest[] candidates = tableCache.allRequests.keySet().stream() -254 .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new); -255 if (candidates.length > 0) { -256 // TODO: use a better algorithm to send a request which is more likely to fetch a new -257 // location. -258 toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)]; -259 tableCache.send(toSend); -260 } -261 } -262 } -263 if (toSend != null) { -264 locateInMeta(tableName, toSend); -265 } -266 } -267 -268 private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results, -269 Throwable error) { -270 if (error != null) { -271 complete(tableName, req, null, error); -272 return; -273 } -274 if (results.isEmpty()) { -275 complete(tableName, req, null, new TableNotFoundException(tableName)); -276 return; -277 } -278 RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); -279 if (LOG.isDebugEnabled()) { -280 LOG.debug("The fetched location of '" + tableName + "', row='" + -281 Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs); +165 } +166 } +167 +168 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { +169 this.conn = conn; +170 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( +171 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); +172 } +173 +174 private TableCache getTableCache(TableName tableName) { +175 return computeIfAbsent(cache, tableName, TableCache::new); +176 } +177 +178 private void removeFromCache(HRegionLocation loc) { +179 TableCache tableCache = cache.get(loc.getRegionInfo().getTable()); +180 if (tableCache == null) { +181 return; +182 } +183 tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { +184 if (oldLoc.getSeqNum() > loc.getSeqNum() || +185 !oldLoc.getServerName().equals(loc.getServerName())) { +186 return oldLoc; +187 } +188 return null; +189 }); +190 } +191 +192 // return whether we add this loc to cache +193 private boolean addToCache(TableCache tableCache, HRegionLocation loc) { +194 if (LOG.isTraceEnabled()) { +195 LOG.trace("Try adding " + loc + " to cache"); +196 } +197 byte[] startKey = loc.getRegionInfo().getStartKey(); +198 HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc); +199 if (oldLoc == null) { +200 return true; +201 } +202 if (oldLoc.getSeqNum() > loc.getSeqNum() || +203 oldLoc.getServerName().equals(loc.getServerName())) { +204 if (LOG.isTraceEnabled()) { +205 LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + +206 " is newer than us or has the same server name"); +207 } +208 return false; +209 } +210 return loc == tableCache.cache.compute(startKey, (k, oldValue) -> { +211 if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { +212 return loc; +213 } +214 if (LOG.isTraceEnabled()) { +215 LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + +216 " is newer than us or has the same server name." + +217 " Maybe it is updated before we replace it"); +218 } +219 return oldValue; +220 }); +221 } +222 +223 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", +224 justification = "Called by lambda expression") +225 private void addToCache(HRegionLocation loc) { +226 addToCache(getTableCache(loc.getRegionInfo().getTable()), loc); +227 if (LOG.isTraceEnabled()) { +228 LOG.trace("Try adding " + loc + " to cache"); +229 } +230 } +231 +232 private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, +233 Throwable error) { +234 if (error != null) { +235 LOG.warn( +236 "Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +237 + "', locateType=" + req.locateType, error); +238 } +239 Optional<LocateRequest> toSend = Optional.empty(); +240 TableCache tableCache = getTableCache(tableName); +241 if (loc != null) { +242 if (!addToCache(tableCache, loc)) { +243 // someone is ahead of us. +244 synchronized (tableCache) { +245 tableCache.pendingRequests.remove(req); +246 tableCache.clearCompletedRequests(Optional.empty()); +247 // Remove a complete locate request in a synchronized block, so the table cache must have +248 // quota to send a candidate request. +249 toSend = tableCache.getCandidate(); +250 toSend.ifPresent(r -> tableCache.send(r)); +251 } +252 toSend.ifPresent(r -> locateInMeta(tableName, r)); +253 return; +254 } +255 } +256 synchronized (tableCache) { +257 tableCache.pendingRequests.remove(req); +258 if (error instanceof DoNotRetryIOException) { +259 CompletableFuture<?> future = tableCache.allRequests.remove(req); +260 if (future != null) { +261 future.completeExceptionally(error); +262 } +263 } +264 tableCache.clearCompletedRequests(Optional.ofNullable(loc)); +265 // Remove a complete locate request in a synchronized block, so the table cache must have +266 // quota to send a candidate request. +267 toSend = tableCache.getCandidate(); +268 toSend.ifPresent(r -> tableCache.send(r)); +269 } +270 toSend.ifPresent(r -> locateInMeta(tableName, r)); +271 } +272 +273 private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results, +274 Throwable error) { +275 if (error != null) { +276 complete(tableName, req, null, error); +277 return; +278 } +279 if (results.isEmpty()) { +280 complete(tableName, req, null, new TableNotFoundException(tableName)); +281 return; 282 } -283 if (locs == null || locs.getDefaultRegionLocation() == null) { -284 complete(tableName, req, null, -285 new IOException(String.format("No location found for '%s', row='%s', locateType=%s", -286 tableName, Bytes.toStringBinary(req.row), req.locateType))); -287 return; -288 } -289 HRegionLocation loc = locs.getDefaultRegionLocation(); -290 HRegionInfo info = loc.getRegionInfo(); -291 if (info == null) { -292 complete(tableName, req, null, -293 new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", -294 tableName, Bytes.toStringBinary(req.row), req.locateType))); -295 return; -296 } -297 if (!info.getTable().equals(tableName)) { -298 complete(tableName, req, null, new TableNotFoundException( -299 "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'")); +283 RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); +284 if (LOG.isDebugEnabled()) { +285 LOG.debug("The fetched location of '" + tableName + "', row='" + +286 Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs); +287 } +288 if (locs == null || locs.getDefaultRegionLocation() == null) { +289 complete(tableName, req, null, +290 new IOException(String.format("No location found for '%s', row='%s', locateType=%s", +291 tableName, Bytes.toStringBinary(req.row), req.locateType))); +292 return; +293 } +294 HRegionLocation loc = locs.getDefaultRegionLocation(); +295 HRegionInfo info = loc.getRegionInfo(); +296 if (info == null) { +297 complete(tableName, req, null, +298 new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", +299 tableName, Bytes.toStringBinary(req.row), req.locateType))); 300 return; 301 } -302 if (info.isSplit()) { -303 complete(tableName, req, null, -304 new RegionOfflineException( -305 "the only available region for the required row is a split parent," + -306 " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); -307 return; -308 } -309 if (info.isOffline()) { -310 complete(tableName, req, null, new RegionOfflineException("the region is offline, could" + -311 " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); +302 if (!info.getTable().equals(tableName)) { +303 complete(tableName, req, null, new TableNotFoundException( +304 "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'")); +305 return; +306 } +307 if (info.isSplit()) { +308 complete(tableName, req, null, +309 new RegionOfflineException( +310 "the only available region for the required row is a split parent," + +311 " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); 312 return; 313 } -314 if (loc.getServerName() == null) { -315 complete(tableName, req, null, -316 new NoServerForRegionException( -317 String.format("No server address listed for region '%s', row='%s', locateType=%s", -318 info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType))); -319 return; -320 } -321 complete(tableName, req, loc, null); -322 } -323 -324 private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) { -325 Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row); -326 if (entry == null) { -327 return null; -328 } -329 HRegionLocation loc = entry.getValue(); -330 byte[] endKey = loc.getRegionInfo().getEndKey(); -331 if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { -332 if (LOG.isTraceEnabled()) { -333 LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + -334 Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); -335 } -336 return loc; -337 } else { -338 return null; -339 } -340 } -341 -342 private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName, -343 byte[] row) { -344 Map.Entry<byte[], HRegionLocation> entry = -345 isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); -346 if (entry == null) { -347 return null; -348 } -349 HRegionLocation loc = entry.getValue(); -350 if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) || -351 Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) { -352 if (LOG.isTraceEnabled()) { -353 LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + -354 Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); -355 } -356 return loc; -357 } else { -358 return null; -359 } -360 } -361 -362 private void locateInMeta(TableName tableName, LocateRequest req) { -363 if (LOG.isTraceEnabled()) { -364 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + -365 "', locateType=" + req.locateType + " in meta"); -366 } -367 byte[] metaKey; -368 if (req.locateType.equals(RegionLocateType.BEFORE)) { -369 if (isEmptyStopRow(req.row)) { -370 byte[] binaryTableName = tableName.getName(); -371 metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); -372 } else { -373 metaKey = createRegionName(tableName, req.row, ZEROES, false); -374 } -375 } else { -376 metaKey = createRegionName(tableName, req.row, NINES, false); -377 } -378 conn.getRawTable(META_TABLE_NAME) -379 .scanAll(new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY) -380 .setOneRowLimit()) -381 .whenComplete((results, error) -> onScanComplete(tableName, req, results, error)); -382 } -383 -384 private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, -385 RegionLocateType locateType) { -386 return locateType.equals(RegionLocateType.BEFORE) -387 ? locateRowBeforeInCache(tableCache, tableName, row) -388 : locateRowInCache(tableCache, tableName, row); -389 } -390 -391 // locateToPrevious is true means we will use the start key of a region to locate the region -392 // placed before it. Used for reverse scan. See the comment of -393 // AsyncRegionLocator.getPreviousRegionLocation. -394 private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName, -395 byte[] row, RegionLocateType locateType, boolean reload) { -396 // AFTER should be convert to CURRENT before calling this method -397 assert !locateType.equals(RegionLocateType.AFTER); -398 TableCache tableCache = getTableCache(tableName); -399 if (!reload) { -400 HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); -401 if (loc != null) { -402 return CompletableFuture.completedFuture(loc); -403 } -404 } -405 CompletableFuture<HRegionLocation> future; -406 LocateRequest req; -407 boolean sendRequest = false; -408 synchronized (tableCache) { -409 // check again -410 if (!reload) { -411 HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); -412 if (loc != null) { -413 return CompletableFuture.completedFuture(loc); -414 } -415 } -416 req = new LocateRequest(row, locateType); -417 future = tableCache.allRequests.get(req); -418 if (future == null) { -419 future = new CompletableFuture<>(); -420 tableCache.allRequests.put(req, future); -421 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { -422 tableCache.send(req); -423 sendRequest = true; -424 } -425 } -426 } -427 if (sendRequest) { -428 locateInMeta(tableName, req); -429 } -430 return future; -431 } -432 -433 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, -434 RegionLocateType locateType, boolean reload) { -435 if (locateType.equals(RegionLocateType.BEFORE)) { -436 return getRegionLocationInternal(tableName, row, locateType, reload); -437 } else { -438 // as we know the exact row after us, so we can just create the new row, and use the same -439 // algorithm to locate it. -440 if (locateType.equals(RegionLocateType.AFTER)) { -441 row = createClosestRowAfter(row); -442 } -443 return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload); -444 } -445 } -446 -447 void updateCachedLocation(HRegionLocation loc, Throwable exception) { -448 AsyncRegionLocator.updateCachedLocation(loc, exception, l -> { -449 TableCache tableCache = cache.get(l.getRegionInfo().getTable()); -450 if (tableCache == null) { -451 return null; -452 } -453 return tableCache.cache.get(l.getRegionInfo().getStartKey()); -454 }, this::addToCache, this::removeFromCache); -455 } -456 -457 void clearCache(TableName tableName) { -458 TableCache tableCache = cache.remove(tableName); -459 if (tableCache == null) { -460 return; -461 } -462 synchronized (tableCache) { -463 if (!tableCache.allRequests.isEmpty()) { -464 IOException error = new IOException("Cache cleared"); -465 tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); -466 } -467 } -468 } -469} +314 if (info.isOffline()) { +315 complete(tableName, req, null, new RegionOfflineException("the region is offline, could" + +316 " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); +317 return; +318 } +319 if (loc.getServerName() == null) { +320 complete(tableName, req, null, +321 new NoServerForRegionException( +322 String.format("No server address listed for region '%s', row='%s', locateType=%s", +323 info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType))); +324 return; +325 } +326 complete(tableName, req, loc, null); +327 } +328 +329 private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) { +330 Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row); +331 if (entry == null) { +332 return null; +333 } +334 HRegionLocation loc = entry.getValue(); +335 byte[] endKey = loc.getRegionInfo().getEndKey(); +336 if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { +337 if (LOG.isTraceEnabled()) { +338 LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + +339 Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); +340 } +341 return loc; +342 } else { +343 return null; +344 } +345 } +346 +347 private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName, +348 byte[] row) { +349 Map.Entry<byte[], HRegionLocation> entry = +350 isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); +351 if (entry == null) { +352 return null; +353 } +354 HRegionLocation loc = entry.getValue(); +355 if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) || +356 Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) { +357 if (LOG.isTraceEnabled()) { +358 LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + +359 Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); +360 } +361 return loc; +362 } else { +363 return null; +364 } +365 } +366 +367 private void locateInMeta(TableName tableName, LocateRequest req) { +368 if (LOG.isTraceEnabled()) { +369 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + +370 "', locateType=" + req.locateType + " in meta"); +371 } +372 byte[] metaKey; +373 if (req.locateType.equals(RegionLocateType.BEFORE)) { +374 if (isEmptyStopRow(req.row)) { +375 byte[] binaryTableName = tableName.getName(); +376 metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); +377 } else { +378 metaKey = createRegionName(tableName, req.row, ZEROES, false); +379 } +380 } else { +381 metaKey = createRegionName(tableName, req.row, NINES, false); +382 } +383 conn.getRawTable(META_TABLE_NAME) +384 .scanAll(new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY) +385 .setOneRowLimit()) +386 .whenComplete((results, error) -> onScanComplete(tableName, req, results, error)); +387 } +388 +389 private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, +390 RegionLocateType locateType) { +391 return locateType.equals(RegionLocateType.BEFORE) +392 ? locateRowBeforeInCache(tableCache, tableName, row) +393 : locateRowInCache(tableCache, tableName, row); +394 } +395 +396 // locateToPrevious is true means we will use the start key of a region to locate the region +397 // placed before it. Used for reverse scan. See the comment of +398 // AsyncRegionLocator.getPreviousRegionLocation. +399 private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName, +400 byte[] row, RegionLocateType locateType, boolean reload) { +401 // AFTER should be convert to CURRENT before calling this method +402 assert !locateType.equals(RegionLocateType.AFTER); +403 TableCache tableCache = getTableCache(tableName); +404 if (!reload) { +405 HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); +406 if (loc != null) { +407 return CompletableFuture.completedFuture(loc); +408 } +409 } +410 CompletableFuture<HRegionLocation> future; +411 LocateRequest req; +412 boolean sendRequest = false; +413 synchronized (tableCache) { +414 // check again +415 if (!reload) { +416 HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); +417 if (loc != null) { +418 return CompletableFuture.completedFuture(loc); +419 } +420 } +421 req = new LocateRequest(row, locateType); +422 future = tableCache.allRequests.get(req); +423 if (future == null) { +424 future = new CompletableFuture<>(); +425 tableCache.allRequests.put(req, future); +426 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { +427 tableCache.send(req); +428 sendRequest = true; +429 } +430 } +431 } +432 if (sendRequest) { +433 locateInMeta(tableName, req); +434 } +435 return future; +436 } +437 +438 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, +439 RegionLocateType locateType, boolean reload) { +440 if (locateType.equals(RegionLocateType.BEFORE)) { +441 return getRegionLocationInternal(tableName, row, locateType, reload); +442 } else { +443 // as we know the exact row after us, so we can just create the new row, and use the same +444 // algorithm to locate it. +445 if (locateType.equals(RegionLocateType.AFTER)) { +446 row = createClosestRowAfter(row); +447 } +448 return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload); +449 } +450 } +451 +452 void updateCachedLocation(HRegionLocation loc, Throwable exception) { +453 AsyncRegionLocator.updateCachedLocation(loc, exception, l -> { +454 TableCache tableCache = cache.get(l.getRegionInfo().getTable()); +455 if (tableCache == null) { +456 return null; +457 } +458 return tableCache.cache.get(l.getRegionInfo().getStartKey()); +459 }, this::addToCache, this::removeFromCache); +460 } +461 +462 void clearCache(TableName tableName) { +463 TableCache tableCache = cache.remove(tableName); +464 if (tableCache == null) { +465 return; +466 } +467 synchronized (tableCache) { +468 if (!tableCache.allRequests.isEmpty()) { +469 IOException error = new IOException("Cache cleared"); +470 tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); +471 } +472 } +473 } +474}