hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-17646: Implement Async getRegion method
Date Thu, 02 Mar 2017 03:53:48 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 613bcb362 -> 697a55a87


HBASE-17646: Implement Async getRegion method

Signed-off-by: zhangduo <zhangduo@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/697a55a8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/697a55a8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/697a55a8

Branch: refs/heads/master
Commit: 697a55a8782d940aa4f1287c2ef4a45ba516cac1
Parents: 613bcb3
Author: huzheng <openinx@gmail.com>
Authored: Fri Feb 17 13:28:07 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Mar 2 11:52:05 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/AsyncMetaTableAccessor.java    | 34 +++++++
 .../client/AsyncConnectionConfiguration.java    |  9 ++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    | 95 ++++++++++++++++++--
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  3 +-
 .../hadoop/hbase/client/TestAsyncAdmin.java     | 49 ++++++++++
 5 files changed, 180 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
index f136d56..d09d29e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
@@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.RawAsyncTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * The asynchronous meta table accessor. Used to read/write region and assignment information
store
@@ -74,6 +77,37 @@ public class AsyncMetaTableAccessor {
     return future;
   }
 
+  public static CompletableFuture<Pair<HRegionInfo, ServerName>> getRegion(RawAsyncTable
metaTable,
+      byte[] regionName) {
+    CompletableFuture<Pair<HRegionInfo, ServerName>> future = new CompletableFuture<>();
+    byte[] row = regionName;
+    HRegionInfo parsedInfo = null;
+    try {
+      parsedInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
+      row = MetaTableAccessor.getMetaKeyForRegion(parsedInfo);
+    } catch (Exception parseEx) {
+      // Ignore if regionName is a encoded region name.
+    }
+
+    final HRegionInfo finalHRI = parsedInfo;
+    metaTable.get(new Get(row).addFamily(HConstants.CATALOG_FAMILY)).whenComplete((r, err)
-> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      RegionLocations locations = MetaTableAccessor.getRegionLocations(r);
+      HRegionLocation hrl = locations == null ? null
+          : locations.getRegionLocation(finalHRI == null ? 0 : finalHRI.getReplicaId());
+      if (hrl == null) {
+        future.complete(null);
+      } else {
+        future.complete(new Pair<>(hrl.getRegionInfo(), hrl.getServerName()));
+      }
+    });
+
+    return future;
+  }
+
   private static Optional<TableState> getTableState(Result r) throws IOException {
     Cell cell = r.getColumnLatestCell(getTableFamily(), getStateColumn());
     if (cell == null) return Optional.empty();

http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 585a104..83caea2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NU
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
@@ -31,6 +32,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
@@ -81,6 +83,8 @@ class AsyncConnectionConfiguration {
 
   private final int scannerCaching;
 
+  private final int metaScannerCaching;
+
   private final long scannerMaxResultSize;
 
   @SuppressWarnings("deprecation")
@@ -105,6 +109,7 @@ class AsyncConnectionConfiguration {
           HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
     this.scannerCaching =
         conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+    this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
     this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
   }
@@ -149,6 +154,10 @@ class AsyncConnectionConfiguration {
     return scannerCaching;
   }
 
+  int getMetaScannerCaching(){
+    return metaScannerCaching;
+  }
+
   long getScannerMaxResultSize() {
     return scannerMaxResultSize;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 9d5c509..3876570 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -32,16 +32,22 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -189,12 +195,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
         if (controller.failed()) {
           future.completeExceptionally(new IOException(controller.errorText()));
         } else {
-          if (respConverter != null) {
-            try {
-              future.complete(respConverter.convert(resp));
-            } catch (IOException e) {
-              future.completeExceptionally(e);
-            }
+          try {
+            future.complete(respConverter.convert(resp));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
           }
         }
       }
@@ -507,8 +511,81 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> closeRegion(byte[] regionname, String serverName)
{
-    throw new UnsupportedOperationException("closeRegion method depends on getRegion API,
will support soon.");
+  public CompletableFuture<Void> closeRegion(byte[] regionName, String serverName)
{
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegion(regionName).whenComplete((p, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      if (p == null || p.getFirst() == null) {
+        future.completeExceptionally(new UnknownRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      if (serverName != null) {
+        closeRegion(ServerName.valueOf(serverName), p.getFirst()).whenComplete((p2, err2)
-> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          }else{
+            future.complete(null);
+          }
+        });
+      } else {
+        if (p.getSecond() == null) {
+          future.completeExceptionally(new NotServingRegionException(regionName));
+        } else {
+          closeRegion(p.getSecond(), p.getFirst()).whenComplete((p2, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            }else{
+              future.complete(null);
+            }
+          });
+        }
+      }
+    });
+    return future;
+  }
+
+  CompletableFuture<Pair<HRegionInfo, ServerName>> getRegion(byte[] regionName)
{
+    if (regionName == null) {
+      return failedFuture(new IllegalArgumentException("Pass region name"));
+    }
+    CompletableFuture<Pair<HRegionInfo, ServerName>> future = new CompletableFuture<>();
+    AsyncMetaTableAccessor.getRegion(metaTable, regionName).whenComplete(
+      (p, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else if (p != null) {
+          future.complete(p);
+        } else {
+          metaTable.scanAll(
+            new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
+              .whenComplete((results, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                  return;
+                }
+                String encodedName = Bytes.toString(regionName);
+                if (results != null && !results.isEmpty()) {
+                  for (Result r : results) {
+                    if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
+                    RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+                    if (rl != null) {
+                      for (HRegionLocation h : rl.getRegionLocations()) {
+                        if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName()))
{
+                          future.complete(new Pair<>(h.getRegionInfo(), h.getServerName()));
+                          return;
+                        }
+                      }
+                    }
+                  }
+                }
+                future.complete(null);
+              });
+        }
+      });
+    return future;
   }
 
   @Override
@@ -530,7 +607,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
         .action(
           (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Void>
adminCall(
             controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, hri.getRegionName()),
-            (s, c, req, done) -> s.closeRegion(controller, req, done), null))
+            (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> null))
         .serverName(sn).call();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 7948b65..537a92d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -102,7 +102,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
     this.pauseNs = builder.pauseNs;
     this.maxAttempts = builder.maxAttempts;
     this.startLogErrorsCnt = builder.startLogErrorsCnt;
-    this.defaultScannerCaching = conn.connConf.getScannerCaching();
+    this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
+        : conn.connConf.getScannerCaching();
     this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/697a55a8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
index 467f6c9..950d77d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -964,6 +965,35 @@ public class TestAsyncAdmin {
   }
 
   @Test
+  public void testCloseRegionThatFetchesTheHRIFromMeta() throws Exception {
+    TableName TABLENAME = TableName.valueOf("TestHBACloseRegion2");
+    createTableWithDefaultConf(TABLENAME);
+
+    HRegionInfo info = null;
+    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME);
+    List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
+    for (HRegionInfo regionInfo : onlineRegions) {
+      if (!regionInfo.isMetaTable()) {
+
+        if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion2")) {
+          info = regionInfo;
+          admin.closeRegion(regionInfo.getRegionNameAsString(), rs.getServerName().getServerName())
+              .get();
+        }
+      }
+    }
+
+    boolean isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info);
+    long timeout = System.currentTimeMillis() + 10000;
+    while ((System.currentTimeMillis() < timeout) && (isInList)) {
+      Thread.sleep(100);
+      isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info);
+    }
+
+    assertFalse("The region should not be present in online regions list.", isInList);
+  }
+
+  @Test
   public void testCloseRegionWhenServerNameIsNull() throws Exception {
     byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion3");
     createTableWithDefaultConf(TableName.valueOf(TABLENAME));
@@ -1035,4 +1065,23 @@ public class TestAsyncAdmin {
     assertTrue("The region should be present in online regions list.",
       onlineRegions.contains(info));
   }
+
+  @Test
+  public void testGetRegion() throws Exception {
+    AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin;
+
+    final TableName tableName = TableName.valueOf("testGetRegion");
+    LOG.info("Started " + tableName);
+    TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY);
+
+    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
+      HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm"));
+      HRegionInfo region = regionLocation.getRegionInfo();
+      byte[] regionName = region.getRegionName();
+      Pair<HRegionInfo, ServerName> pair = rawAdmin.getRegion(regionName).get();
+      assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName()));
+      pair = rawAdmin.getRegion(region.getEncodedNameAsBytes()).get();
+      assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName()));
+    }
+  }
 }


Mime
View raw message