Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 671FD200C4E for ; Thu, 2 Mar 2017 04:53:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 65DF4160B78; Thu, 2 Mar 2017 03:53:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38ED9160B70 for ; Thu, 2 Mar 2017 04:53:49 +0100 (CET) Received: (qmail 65183 invoked by uid 500); 2 Mar 2017 03:53:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 65172 invoked by uid 99); 2 Mar 2017 03:53:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2017 03:53:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3C512DFDAC; Thu, 2 Mar 2017 03:53:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Message-Id: <1a484084f0124792b4276de835826306@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-17646: Implement Async getRegion method Date: Thu, 2 Mar 2017 03:53:48 +0000 (UTC) archived-at: Thu, 02 Mar 2017 03:53:50 -0000 Repository: hbase Updated Branches: refs/heads/master 613bcb362 -> 697a55a87 HBASE-17646: Implement Async getRegion method Signed-off-by: zhangduo Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/697a55a8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/697a55a8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/697a55a8 Branch: refs/heads/master Commit: 697a55a8782d940aa4f1287c2ef4a45ba516cac1 Parents: 613bcb3 Author: huzheng Authored: Fri Feb 17 13:28:07 2017 +0800 Committer: zhangduo Committed: Thu Mar 2 11:52:05 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/AsyncMetaTableAccessor.java | 34 +++++++ .../client/AsyncConnectionConfiguration.java | 9 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 95 ++++++++++++++++++-- .../hadoop/hbase/client/RawAsyncTableImpl.java | 3 +- .../hadoop/hbase/client/TestAsyncAdmin.java | 49 ++++++++++ 5 files changed, 180 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index f136d56..d09d29e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RawAsyncTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** * The asynchronous meta table accessor. Used to read/write region and assignment information store @@ -74,6 +77,37 @@ public class AsyncMetaTableAccessor { return future; } + public static CompletableFuture> getRegion(RawAsyncTable metaTable, + byte[] regionName) { + CompletableFuture> future = new CompletableFuture<>(); + byte[] row = regionName; + HRegionInfo parsedInfo = null; + try { + parsedInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); + row = MetaTableAccessor.getMetaKeyForRegion(parsedInfo); + } catch (Exception parseEx) { + // Ignore if regionName is a encoded region name. + } + + final HRegionInfo finalHRI = parsedInfo; + metaTable.get(new Get(row).addFamily(HConstants.CATALOG_FAMILY)).whenComplete((r, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + RegionLocations locations = MetaTableAccessor.getRegionLocations(r); + HRegionLocation hrl = locations == null ? null + : locations.getRegionLocation(finalHRI == null ? 0 : finalHRI.getReplicaId()); + if (hrl == null) { + future.complete(null); + } else { + future.complete(new Pair<>(hrl.getRegionInfo(), hrl.getServerName())); + } + }); + + return future; + } + private static Optional getTableState(Result r) throws IOException { Cell cell = r.getColumnLatestCell(getTableFamily(), getStateColumn()); if (cell == null) return Optional.empty(); http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 585a104..83caea2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NU import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; @@ -31,6 +32,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; @@ -81,6 +83,8 @@ class AsyncConnectionConfiguration { private final int scannerCaching; + private final int metaScannerCaching; + private final long scannerMaxResultSize; @SuppressWarnings("deprecation") @@ -105,6 +109,7 @@ class AsyncConnectionConfiguration { HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); this.scannerCaching = conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); } @@ -149,6 +154,10 @@ class AsyncConnectionConfiguration { return scannerCaching; } + int getMetaScannerCaching(){ + return metaScannerCaching; + } + long getScannerMaxResultSize() { return scannerMaxResultSize; } http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 9d5c509..3876570 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -32,16 +32,22 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -189,12 +195,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin { if (controller.failed()) { future.completeExceptionally(new IOException(controller.errorText())); } else { - if (respConverter != null) { - try { - future.complete(respConverter.convert(resp)); - } catch (IOException e) { - future.completeExceptionally(e); - } + try { + future.complete(respConverter.convert(resp)); + } catch (IOException e) { + future.completeExceptionally(e); } } } @@ -507,8 +511,81 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture closeRegion(byte[] regionname, String serverName) { - throw new UnsupportedOperationException("closeRegion method depends on getRegion API, will support soon."); + public CompletableFuture closeRegion(byte[] regionName, String serverName) { + CompletableFuture future = new CompletableFuture<>(); + getRegion(regionName).whenComplete((p, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (p == null || p.getFirst() == null) { + future.completeExceptionally(new UnknownRegionException(Bytes.toStringBinary(regionName))); + return; + } + if (serverName != null) { + closeRegion(ServerName.valueOf(serverName), p.getFirst()).whenComplete((p2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + }else{ + future.complete(null); + } + }); + } else { + if (p.getSecond() == null) { + future.completeExceptionally(new NotServingRegionException(regionName)); + } else { + closeRegion(p.getSecond(), p.getFirst()).whenComplete((p2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + }else{ + future.complete(null); + } + }); + } + } + }); + return future; + } + + CompletableFuture> getRegion(byte[] regionName) { + if (regionName == null) { + return failedFuture(new IllegalArgumentException("Pass region name")); + } + CompletableFuture> future = new CompletableFuture<>(); + AsyncMetaTableAccessor.getRegion(metaTable, regionName).whenComplete( + (p, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (p != null) { + future.complete(p); + } else { + metaTable.scanAll( + new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)) + .whenComplete((results, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + String encodedName = Bytes.toString(regionName); + if (results != null && !results.isEmpty()) { + for (Result r : results) { + if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue; + RegionLocations rl = MetaTableAccessor.getRegionLocations(r); + if (rl != null) { + for (HRegionLocation h : rl.getRegionLocations()) { + if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) { + future.complete(new Pair<>(h.getRegionInfo(), h.getServerName())); + return; + } + } + } + } + } + future.complete(null); + }); + } + }); + return future; } @Override @@ -530,7 +607,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .action( (controller, stub) -> this. adminCall( controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, hri.getRegionName()), - (s, c, req, done) -> s.closeRegion(controller, req, done), null)) + (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> null)) .serverName(sn).call(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 7948b65..537a92d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -102,7 +102,8 @@ class RawAsyncTableImpl implements RawAsyncTable { this.pauseNs = builder.pauseNs; this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; - this.defaultScannerCaching = conn.connConf.getScannerCaching(); + this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() + : conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java index 467f6c9..950d77d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -964,6 +965,35 @@ public class TestAsyncAdmin { } @Test + public void testCloseRegionThatFetchesTheHRIFromMeta() throws Exception { + TableName TABLENAME = TableName.valueOf("TestHBACloseRegion2"); + createTableWithDefaultConf(TABLENAME); + + HRegionInfo info = null; + HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME); + List onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()); + for (HRegionInfo regionInfo : onlineRegions) { + if (!regionInfo.isMetaTable()) { + + if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion2")) { + info = regionInfo; + admin.closeRegion(regionInfo.getRegionNameAsString(), rs.getServerName().getServerName()) + .get(); + } + } + } + + boolean isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info); + long timeout = System.currentTimeMillis() + 10000; + while ((System.currentTimeMillis() < timeout) && (isInList)) { + Thread.sleep(100); + isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info); + } + + assertFalse("The region should not be present in online regions list.", isInList); + } + + @Test public void testCloseRegionWhenServerNameIsNull() throws Exception { byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion3"); createTableWithDefaultConf(TableName.valueOf(TABLENAME)); @@ -1035,4 +1065,23 @@ public class TestAsyncAdmin { assertTrue("The region should be present in online regions list.", onlineRegions.contains(info)); } + + @Test + public void testGetRegion() throws Exception { + AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin; + + final TableName tableName = TableName.valueOf("testGetRegion"); + LOG.info("Started " + tableName); + TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY); + + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")); + HRegionInfo region = regionLocation.getRegionInfo(); + byte[] regionName = region.getRegionName(); + Pair pair = rawAdmin.getRegion(regionName).get(); + assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName())); + pair = rawAdmin.getRegion(region.getEncodedNameAsBytes()).get(); + assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName())); + } + } }